Files
toutoukan/rocketmq/killmq/consumer/killConsumer.go
JACKYMYPERSON 2ab7614ea0 修改秒杀
2025-09-14 21:12:37 +08:00

178 lines
5.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"context"
"encoding/json"
"fmt"
"gorm.io/gorm"
"log"
"os"
"os/signal"
"syscall"
"time"
"toutoukan/init/databaseInit"
"toutoukan/model/good"
"toutoukan/model/usermodel/userOrder"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
const (
colorRed = "\033[31m" // 红色
colorGreen = "\033[32m" // 绿色
colorYellow = "\033[33m" // 黄色
colorBlue = "\033[34m" // 蓝色
colorReset = "\033[0m" // 重置颜色
)
// 与生产者对应的消息结构体
func main() {
databaseInit.DbInit()
defer func() {
sqlDB, err := databaseInit.UserDB.DB()
if err != nil {
// 处理获取失败的情况
log.Printf("获取底层数据库连接失败: %v", err)
return
}
// 2. 调用 Close() 关闭连接池
if err := sqlDB.Close(); err != nil {
log.Printf("关闭数据库连接池失败: %v", err)
} else {
log.Println("数据库连接池已成功关闭")
}
}()
// 初始化消费者
c, err := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{"127.0.0.1:9876"}), // Namesrv 地址
consumer.WithGroupName("order_consumer_group"), // 消费者组名
// 消费模式广播模式BROADCASTING或集群模式CLUSTERING默认
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), // 从最早消息开始消费
)
if err != nil {
log.Fatalf("初始化消费者失败: %v", err)
}
// 订阅主题并注册消息处理函数
err = c.Subscribe(
"killgoods", // 要消费的主题(需与生产者一致)
consumer.MessageSelector{}, // 消息过滤规则(空表示全部消费)
func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
// 处理消息的回调函数(每次收到消息会触发)
for _, msg := range msgs {
// 解析消息体
var orderMsg userOrder.OrderMessage
if err := json.Unmarshal(msg.Body, &orderMsg); err != nil {
log.Printf("解析消息失败: %v, 消息内容: %s", err, string(msg.Body))
// 消息格式错误属于不可重试错误(重试也会失败),返回成功但记录错误
return consumer.ConsumeSuccess, nil
}
// 模拟业务处理(如更新订单状态、通知物流等)
log.Printf(
"收到消息: ID=%s, 订单ID=%s, 商品ID=%d, 用户ID=%d, 数量=%d",
msg.MsgId, orderMsg.OrderID, orderMsg.GoodsID, orderMsg.UserID, orderMsg.Quantity,
)
db := databaseInit.UserDB
if db == nil {
log.Printf("%sUserDB 未初始化,请先调用 InitUserDB()%s\n", colorRed, colorReset)
// 数据库未初始化属于可恢复错误(重启服务后可能解决),返回重试
return consumer.ConsumeRetryLater, fmt.Errorf("UserDB 未初始化")
}
fmt.Printf("%s用户请求处理完成%s\n", colorBlue, colorReset)
var goods good.Goods
tx := db.Begin()
if tx.Error != nil {
log.Printf("%s开启事务失败: %v%s\n", colorRed, tx.Error, colorReset)
// 事务开启失败可能是临时错误(如连接池满),返回重试
return consumer.ConsumeRetryLater, tx.Error
}
// 加行锁查询商品
// 执行库存扣减
result := tx.Model(&good.Goods{}).
Where("gid = ? AND stock > 0", orderMsg.GoodsID).
Update("stock", gorm.Expr("stock - ?", 1))
if result.Error != nil {
tx.Rollback()
log.Printf("%s扣减库存失败: %v%s\n", colorRed, result.Error, colorReset)
return consumer.ConsumeRetryLater, result.Error
}
// 获取实际受影响的行数
affected := result.RowsAffected
// 4. 判断库存是否充足(受影响行数=0表示库存不足
if affected == 0 {
tx.Rollback()
log.Printf("%s商品库存不足订单ID: %s商品GID: %d%s\n", colorYellow, orderMsg.OrderID, orderMsg.GoodsID, colorReset)
return consumer.ConsumeSuccess, nil
}
// 生成订单
userorder := good.Order{
OrderId: orderMsg.OrderID,
TradeTime: time.Now(),
}
result1 := tx.Where("order_id = ?", orderMsg.OrderID).FirstOrCreate(&userorder)
if result1.Error != nil {
tx.Rollback()
log.Printf("%s创建/查询订单失败: %v%s\n", colorRed, result1.Error, colorReset)
return consumer.ConsumeRetryLater, result.Error
}
// 若订单已存在(重复消费),直接跳过创建逻辑,继续提交事务
if result1.RowsAffected == 0 {
log.Printf("%s订单已存在重复消费订单ID: %s%s\n", colorYellow, orderMsg.OrderID, colorReset)
}
if err := tx.Create(&userorder).Error; err != nil {
tx.Rollback()
log.Printf("%s创建订单失败: %v%s\n", colorRed, err, colorReset)
return consumer.ConsumeRetryLater, err
}
// 提交事务
if err := tx.Commit().Error; err != nil {
tx.Rollback()
log.Printf("%s提交事务失败: %v%s\n", colorRed, err, colorReset)
// 事务提交失败可能是临时错误,返回重试
return consumer.ConsumeRetryLater, err
}
log.Printf("%s订单创建成功订单ID: %s剩余库存: %d%s\n", colorGreen, userorder.OrderId, goods.Stock-1, colorReset)
}
// 所有消息处理成功,返回成功
return consumer.ConsumeSuccess, nil
},
)
if err != nil {
log.Fatalf("订阅主题失败: %v", err)
}
// 启动消费者(会阻塞当前 goroutine
if err := c.Start(); err != nil {
log.Fatalf("启动消费者失败: %v", err)
}
defer c.Shutdown() // 退出时关闭消费者
// 等待中断信号(如 Ctrl+C保证程序持续运行
log.Println("消费者已启动,持续等待消息...")
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit // 阻塞直到收到中断信号
log.Println("消费者正在退出...")
}