package main import ( "context" "encoding/json" "log" "os" "os/signal" "syscall" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) // 与生产者对应的消息结构体 type OrderMessage struct { OrderID string `json:"order_id"` GoodsID int64 `json:"goods_id"` UserID int64 `json:"user_id"` Quantity int `json:"quantity"` } func main() { // 初始化消费者 c, err := rocketmq.NewPushConsumer( consumer.WithNameServer([]string{"127.0.0.1:9876"}), // Namesrv 地址 consumer.WithGroupName("order_consumer_group"), // 消费者组名 // 消费模式:广播模式(BROADCASTING)或集群模式(CLUSTERING,默认) consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), // 从最早消息开始消费 ) if err != nil { log.Fatalf("初始化消费者失败: %v", err) } // 订阅主题并注册消息处理函数 err = c.Subscribe( "killgoods", // 要消费的主题(需与生产者一致) consumer.MessageSelector{}, // 消息过滤规则(空表示全部消费) func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { // 处理消息的回调函数(每次收到消息会触发) for _, msg := range msgs { // 解析消息体 var orderMsg OrderMessage if err := json.Unmarshal(msg.Body, &orderMsg); err != nil { log.Printf("解析消息失败: %v, 消息内容: %s", err, string(msg.Body)) continue } // 模拟业务处理(如更新订单状态、通知物流等) log.Printf( "收到消息: ID=%s, 订单ID=%s, 商品ID=%d, 用户ID=%d, 数量=%d", msg.MsgId, orderMsg.OrderID, orderMsg.GoodsID, orderMsg.UserID, orderMsg.Quantity, ) } // 返回消费结果: // - consumer.ConsumeSuccess:消费成功 // - consumer.ConsumeRetryLater:消费失败,稍后重试 return consumer.ConsumeSuccess, nil }, ) if err != nil { log.Fatalf("订阅主题失败: %v", err) } // 启动消费者(会阻塞当前 goroutine) if err := c.Start(); err != nil { log.Fatalf("启动消费者失败: %v", err) } defer c.Shutdown() // 退出时关闭消费者 // 等待中断信号(如 Ctrl+C),保证程序持续运行 log.Println("消费者已启动,持续等待消息...") quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit // 阻塞直到收到中断信号 log.Println("消费者正在退出...") }