diff --git a/mq/killmq/consumer/killConsumer.go b/mq/killmq/consumer/killConsumer.go new file mode 100644 index 0000000..d341968 --- /dev/null +++ b/mq/killmq/consumer/killConsumer.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "os" + "os/signal" + "syscall" + + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +// 与生产者对应的消息结构体 +type OrderMessage struct { + OrderID string `json:"order_id"` + GoodsID int64 `json:"goods_id"` + UserID int64 `json:"user_id"` + Quantity int `json:"quantity"` +} + +func main() { + // 初始化消费者 + c, err := rocketmq.NewPushConsumer( + consumer.WithNameServer([]string{"127.0.0.1:9876"}), // Namesrv 地址 + consumer.WithGroupName("order_consumer_group"), // 消费者组名 + // 消费模式:广播模式(BROADCASTING)或集群模式(CLUSTERING,默认) + consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), // 从最早消息开始消费 + ) + if err != nil { + log.Fatalf("初始化消费者失败: %v", err) + } + + // 订阅主题并注册消息处理函数 + err = c.Subscribe( + "killgoods", // 要消费的主题(需与生产者一致) + consumer.MessageSelector{}, // 消息过滤规则(空表示全部消费) + func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { + // 处理消息的回调函数(每次收到消息会触发) + for _, msg := range msgs { + // 解析消息体 + var orderMsg OrderMessage + if err := json.Unmarshal(msg.Body, &orderMsg); err != nil { + log.Printf("解析消息失败: %v, 消息内容: %s", err, string(msg.Body)) + continue + } + + // 模拟业务处理(如更新订单状态、通知物流等) + log.Printf( + "收到消息: ID=%s, 订单ID=%s, 商品ID=%d, 用户ID=%d, 数量=%d", + msg.MsgId, orderMsg.OrderID, orderMsg.GoodsID, orderMsg.UserID, orderMsg.Quantity, + ) + } + + // 返回消费结果: + // - consumer.ConsumeSuccess:消费成功 + // - consumer.ConsumeRetryLater:消费失败,稍后重试 + return consumer.ConsumeSuccess, nil + }, + ) + if err != nil { + log.Fatalf("订阅主题失败: %v", err) + } + + // 启动消费者(会阻塞当前 goroutine) + if err := c.Start(); err != nil { + log.Fatalf("启动消费者失败: %v", err) + } + defer c.Shutdown() // 退出时关闭消费者 + + // 等待中断信号(如 Ctrl+C),保证程序持续运行 + log.Println("消费者已启动,持续等待消息...") + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit // 阻塞直到收到中断信号 + + log.Println("消费者正在退出...") +} diff --git a/mq/killmq/producer/killProducer.go b/mq/killmq/producer/killProducer.go new file mode 100644 index 0000000..ac8450b --- /dev/null +++ b/mq/killmq/producer/killProducer.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/apache/rocketmq-client-go/v2/producer" + "log" + "time" +) + +type OrderMessage struct { + OrderID string `json:"order_id"` + GoodsID int64 `json:"goods_id"` + UserID int64 `json:"user_id"` + Quantity int `json:"quantity"` +} + +func main() { + // 1. 初始化普通消息生产者 + normalProducer, err := rocketmq.NewProducer( + producer.WithNameServer([]string{"127.0.0.1:9876"}), // RocketMQ namesrv 地址 + producer.WithGroupName("order_producer_group"), // 生产者组名 + ) + if err != nil { + log.Fatalf("初始化普通生产者失败: %v", err) + } + + // 启动生产者 + if err := normalProducer.Start(); err != nil { + log.Fatalf("启动普通生产者失败: %v", err) + } + defer normalProducer.Shutdown() + + // 2. 发送普通消息示例 + sendNormalMessage(normalProducer) +} + +// 发送普通消息 +func sendNormalMessage(p rocketmq.Producer) { + // 构造消息内容 + msgData := OrderMessage{ + OrderID: fmt.Sprintf("ORD%d", time.Now().UnixNano()), + GoodsID: 1001, + UserID: 10001, + Quantity: 1, + } + msgBody, _ := json.Marshal(msgData) + + // 创建消息(指定主题和内容) + msg := primitive.NewMessage("killgoods", msgBody) // 主题需提前创建或开启自动创建 + + // 发送消息(同步发送,等待结果) + result, err := p.SendSync(context.Background(), msg) + if err != nil { + log.Printf("普通消息发送失败: %v", err) + return + } + + log.Printf("普通消息发送成功,消息ID: %s, 队列: %d, 偏移量: %d", + result.MsgID, result.MessageQueue.QueueId, result.QueueOffset) + +}