diff --git a/kills/kill.go b/kills/kill.go index 5e4c85f..b42b02e 100644 --- a/kills/kill.go +++ b/kills/kill.go @@ -7,40 +7,14 @@ import ( "github.com/gin-gonic/gin" "log" "math/rand" + "net/http" "time" "toutoukan/init/redisInit" + "toutoukan/model/usermodel/userOrder" + "toutoukan/rocketmq/killmq/mqproducer" "toutoukan/utill/scripts" ) -const ( - colorRed = "\033[31m" // 红色 - colorGreen = "\033[32m" // 绿色 - colorYellow = "\033[33m" // 黄色 - colorBlue = "\033[34m" // 蓝色 - colorReset = "\033[0m" // 重置颜色 -) - -type Goods struct { - GID uint `gorm:"column:gid"` - Stock int `gorm:"column:stock"` - StartTime time.Time `gorm:"column:start_time"` - EndTime time.Time `gorm:"column:end_time"` -} - -// Order 订单模型,对应 orders_list 表 -type Order struct { - OrderId string `gorm:"column:order_id"` // 关联商品ID - TradeTime time.Time `gorm:"column:trade_time"` -} - -// 为模型指定数据库表名 -func (Goods) TableName() string { - return "goods_list" -} - -func (Order) TableName() string { - return "oders_list" -} func GenerateOrderID() string { timestamp := time.Now().Format("20060102150405") @@ -55,89 +29,49 @@ func GenerateOrderID() string { } func Userkill(c *gin.Context) { + var req userOrder.UserRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数错误", "detail": err.Error()}) + return + } + fmt.Printf("用户%d请求购买%s", req.UserID, req.Order) - result, err := redisInit.RedisClient.Eval(context.Background(), scripts.Luascript_forkill, []string{"stock:10000", "mayiming"}, 1, GenerateOrderID()).Int() + OrderID := GenerateOrderID() + + // 1. Redis扣减库存(不变) + result, err := redisInit.RedisClient.Eval(context.Background(), scripts.Luascript_forkill, []string{req.Order, "mayiming"}, 1, OrderID).Int() if err != nil { log.Printf("Redis 脚本执行错误: %v", err) c.JSON(500, gin.H{"error": "库存操作失败", "detail": err.Error()}) return } - if result == 1 { - c.JSON(200, gin.H{"result": "库存扣减成功"}) - } else { + if result != 1 { c.JSON(200, gin.H{"result": "库存不足,扣减失败"}) + return } -} + // 2. 发送MQ消息(关键:发送失败需回滚Redis) + msgSent := false + defer func() { + // 若消息未发送成功,回滚Redis库存 + if !msgSent { + log.Printf("消息发送失败,回滚Redis库存,商品: %s,用户: %d", req.Order, req.UserID) + // 回滚Redis:库存+1,同时移除可能的订单记录(根据Lua脚本逻辑调整) + redisInit.RedisClient.Incr(context.Background(), req.Order) + // 若Lua脚本中记录了订单ID(如存到set中),也需删除 + //redisInit.RedisClient.SRem(context.Background(), req.Order, OrderID) + } + }() -//func Userkill(c *gin.Context) { -// // 直接使用全局初始化好的 UserDB 连接池 -// db := databaseInit.UserDB -// if db == nil { -// log.Printf("%sUserDB 未初始化,请先调用 InitUserDB()%s\n", colorRed, colorReset) -// c.JSON(500, gin.H{"error": "系统数据库未初始化"}) -// return -// } -// fmt.Printf("%s用户请求处理完成%s\n", colorBlue, colorReset) -// -// // 定义要查询的商品ID -// var targetGID uint = 1 -// var goods Goods -// -// tx := db.Begin() -// if tx.Error != nil { -// log.Printf("%s开启事务失败: %v%s\n", colorRed, tx.Error, colorReset) -// c.JSON(500, gin.H{"body": "事务开启失败"}) -// return -// } -// result := tx.Set("gorm:query_option", "FOR UPDATE").Where("gid = ?", targetGID).First(&goods) -// if result.Error != nil { -// tx.Rollback() // 失败回滚事务 -// log.Printf("%s查询商品失败: %v%s\n", colorRed, result.Error, colorReset) -// c.JSON(404, gin.H{"error": "商品不存在"}) -// return -// } -// -// // 2. 检查库存是否充足 -// if goods.Stock <= 0 { -// tx.Rollback() -// log.Printf("%s商品库存不足%s\n", colorYellow, colorReset) -// c.JSON(400, gin.H{"error": "商品库存不足"}) -// return -// } -// -// // 3. 生成订单 -// userorder := Order{ -// OrderId: GenerateOrderID(), -// TradeTime: time.Now(), -// } -// if err := tx.Create(&userorder).Error; err != nil { -// tx.Rollback() -// log.Printf("%s创建订单失败: %v%s\n", colorRed, err, colorReset) -// c.JSON(500, gin.H{"error": "创建订单失败"}) -// return -// } -// -// // 4. 扣减库存(库存-1) -// if err := tx.Model(&Goods{}).Where("gid = ?", targetGID).Update("stock", goods.Stock-1).Error; err != nil { -// tx.Rollback() -// log.Printf("%s扣减库存失败: %v%s\n", colorRed, err, colorReset) -// c.JSON(500, gin.H{"error": "库存更新失败"}) -// return -// } -// -// // 5. 提交事务 -// if err := tx.Commit().Error; err != nil { -// tx.Rollback() -// log.Printf("%s提交事务失败: %v%s\n", colorRed, err, colorReset) -// c.JSON(500, gin.H{"error": "系统错误"}) -// return -// } -// -// log.Printf("%s订单创建成功,订单ID: %s,剩余库存: %d%s\n", colorGreen, userorder.OrderId, goods.Stock-1, colorReset) -// c.JSON(200, gin.H{ -// "message": "下单成功", -// "order_id": userorder.OrderId, -// "stock": goods.Stock - 1, -// }) -//} + // 发送消息(同步发送,确保拿到发送结果) + sendErr := mqproducer.SendNormalMessage(OrderID, req.Order, req.UserID) + if sendErr != nil { + log.Printf("MQ消息发送失败: %v", sendErr) + c.JSON(500, gin.H{"error": "消息发送失败", "detail": sendErr.Error()}) + return + } + + // 消息发送成功,标记为已发送(避免defer回滚) + msgSent = true + c.JSON(200, gin.H{"result": "库存扣减成功"}) +} diff --git a/model/good/goods.go b/model/good/goods.go new file mode 100644 index 0000000..092af0d --- /dev/null +++ b/model/good/goods.go @@ -0,0 +1,25 @@ +package good + +import "time" + +type Goods struct { + GID string `gorm:"column:gid"` + Stock int `gorm:"column:stock"` + StartTime time.Time `gorm:"column:start_time"` + EndTime time.Time `gorm:"column:end_time"` +} + +// Order 订单模型,对应 orders_list 表 +type Order struct { + OrderId string `gorm:"column:order_id"` // 关联商品ID + TradeTime time.Time `gorm:"column:trade_time"` +} + +// 为模型指定数据库表名 +func (Goods) TableName() string { + return "goods_list" +} + +func (Order) TableName() string { + return "oders_list" +} diff --git a/model/usermodel/userOrder/userorder.go b/model/usermodel/userOrder/userorder.go index 49022e4..2a277a1 100644 --- a/model/usermodel/userOrder/userorder.go +++ b/model/usermodel/userOrder/userorder.go @@ -4,3 +4,10 @@ type UserRequest struct { UserID int `json:"user_id"` Order string `json:"userorder"` // 对应商品标识:stock10000 ~ stock10004 } + +type OrderMessage struct { + OrderID string `json:"order_id"` + GoodsID string `json:"goods_id"` + UserID int `json:"user_id"` + Quantity int `json:"quantity"` +} diff --git a/mq/killmq/consumer/killConsumer.go b/mq/killmq/consumer/killConsumer.go deleted file mode 100644 index d341968..0000000 --- a/mq/killmq/consumer/killConsumer.go +++ /dev/null @@ -1,80 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "log" - "os" - "os/signal" - "syscall" - - "github.com/apache/rocketmq-client-go/v2" - "github.com/apache/rocketmq-client-go/v2/consumer" - "github.com/apache/rocketmq-client-go/v2/primitive" -) - -// 与生产者对应的消息结构体 -type OrderMessage struct { - OrderID string `json:"order_id"` - GoodsID int64 `json:"goods_id"` - UserID int64 `json:"user_id"` - Quantity int `json:"quantity"` -} - -func main() { - // 初始化消费者 - c, err := rocketmq.NewPushConsumer( - consumer.WithNameServer([]string{"127.0.0.1:9876"}), // Namesrv 地址 - consumer.WithGroupName("order_consumer_group"), // 消费者组名 - // 消费模式:广播模式(BROADCASTING)或集群模式(CLUSTERING,默认) - consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), // 从最早消息开始消费 - ) - if err != nil { - log.Fatalf("初始化消费者失败: %v", err) - } - - // 订阅主题并注册消息处理函数 - err = c.Subscribe( - "killgoods", // 要消费的主题(需与生产者一致) - consumer.MessageSelector{}, // 消息过滤规则(空表示全部消费) - func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { - // 处理消息的回调函数(每次收到消息会触发) - for _, msg := range msgs { - // 解析消息体 - var orderMsg OrderMessage - if err := json.Unmarshal(msg.Body, &orderMsg); err != nil { - log.Printf("解析消息失败: %v, 消息内容: %s", err, string(msg.Body)) - continue - } - - // 模拟业务处理(如更新订单状态、通知物流等) - log.Printf( - "收到消息: ID=%s, 订单ID=%s, 商品ID=%d, 用户ID=%d, 数量=%d", - msg.MsgId, orderMsg.OrderID, orderMsg.GoodsID, orderMsg.UserID, orderMsg.Quantity, - ) - } - - // 返回消费结果: - // - consumer.ConsumeSuccess:消费成功 - // - consumer.ConsumeRetryLater:消费失败,稍后重试 - return consumer.ConsumeSuccess, nil - }, - ) - if err != nil { - log.Fatalf("订阅主题失败: %v", err) - } - - // 启动消费者(会阻塞当前 goroutine) - if err := c.Start(); err != nil { - log.Fatalf("启动消费者失败: %v", err) - } - defer c.Shutdown() // 退出时关闭消费者 - - // 等待中断信号(如 Ctrl+C),保证程序持续运行 - log.Println("消费者已启动,持续等待消息...") - quit := make(chan os.Signal, 1) - signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - <-quit // 阻塞直到收到中断信号 - - log.Println("消费者正在退出...") -} diff --git a/mq/killmq/producer/killProducer.go b/mq/killmq/producer/killProducer.go deleted file mode 100644 index ac8450b..0000000 --- a/mq/killmq/producer/killProducer.go +++ /dev/null @@ -1,65 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "github.com/apache/rocketmq-client-go/v2" - "github.com/apache/rocketmq-client-go/v2/primitive" - "github.com/apache/rocketmq-client-go/v2/producer" - "log" - "time" -) - -type OrderMessage struct { - OrderID string `json:"order_id"` - GoodsID int64 `json:"goods_id"` - UserID int64 `json:"user_id"` - Quantity int `json:"quantity"` -} - -func main() { - // 1. 初始化普通消息生产者 - normalProducer, err := rocketmq.NewProducer( - producer.WithNameServer([]string{"127.0.0.1:9876"}), // RocketMQ namesrv 地址 - producer.WithGroupName("order_producer_group"), // 生产者组名 - ) - if err != nil { - log.Fatalf("初始化普通生产者失败: %v", err) - } - - // 启动生产者 - if err := normalProducer.Start(); err != nil { - log.Fatalf("启动普通生产者失败: %v", err) - } - defer normalProducer.Shutdown() - - // 2. 发送普通消息示例 - sendNormalMessage(normalProducer) -} - -// 发送普通消息 -func sendNormalMessage(p rocketmq.Producer) { - // 构造消息内容 - msgData := OrderMessage{ - OrderID: fmt.Sprintf("ORD%d", time.Now().UnixNano()), - GoodsID: 1001, - UserID: 10001, - Quantity: 1, - } - msgBody, _ := json.Marshal(msgData) - - // 创建消息(指定主题和内容) - msg := primitive.NewMessage("killgoods", msgBody) // 主题需提前创建或开启自动创建 - - // 发送消息(同步发送,等待结果) - result, err := p.SendSync(context.Background(), msg) - if err != nil { - log.Printf("普通消息发送失败: %v", err) - return - } - - log.Printf("普通消息发送成功,消息ID: %s, 队列: %d, 偏移量: %d", - result.MsgID, result.MessageQueue.QueueId, result.QueueOffset) - -} diff --git a/oneKUsers/mian.go b/oneKUsers/mian.go index 005b02e..626421a 100644 --- a/oneKUsers/mian.go +++ b/oneKUsers/mian.go @@ -20,7 +20,7 @@ type Response struct { } func main() { - const totalBatches = 5 // 固定发送5批请求 + const totalBatches = 1 // 固定发送5批请求 const batchSize = 150 // 每批固定150个请求 const interval = 5 * time.Second // 每批请求的时间间隔(可按需调整) var wg sync.WaitGroup @@ -30,7 +30,7 @@ func main() { // 循环发送5批请求,每批对应不同的商品order(stock10000 ~ stock10004) for batch := 1; batch <= totalBatches; batch++ { // 计算当前批次的目标商品order:第1批→stock10000,第2批→stock10001...第5批→stock10004 - targetOrder := fmt.Sprintf("stock1000%d", batch-1) + targetOrder := fmt.Sprintf("stock:1000%d", batch-1) fmt.Printf("=== 开始第 %d 批请求 ===\n", batch) fmt.Printf("当前批次目标商品: %s | 请求数量: %d 个 | 开始时间: %v\n", targetOrder, batchSize, time.Now().Format("15:04:05")) @@ -72,11 +72,6 @@ func main() { fmt.Println("🎉 所有5批请求已全部完成!") } -type UserRequest1 struct { - UserID int `json:"user_id"` - Order string `json:"userorder"` // 对应商品标识:stock10000 ~ stock10004 -} - // 发送POST请求(接收用户ID和商品order,构造请求体) func sendPostRequest(userID int, order string) (*Response, error) { // 目标接口地址(根据实际部署调整) diff --git a/rocketmq/killmq/consumer/killConsumer.go b/rocketmq/killmq/consumer/killConsumer.go new file mode 100644 index 0000000..84c13d7 --- /dev/null +++ b/rocketmq/killmq/consumer/killConsumer.go @@ -0,0 +1,177 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "gorm.io/gorm" + "log" + "os" + "os/signal" + "syscall" + "time" + "toutoukan/init/databaseInit" + "toutoukan/model/good" + "toutoukan/model/usermodel/userOrder" + + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +const ( + colorRed = "\033[31m" // 红色 + colorGreen = "\033[32m" // 绿色 + colorYellow = "\033[33m" // 黄色 + colorBlue = "\033[34m" // 蓝色 + colorReset = "\033[0m" // 重置颜色 +) + +// 与生产者对应的消息结构体 + +func main() { + + databaseInit.DbInit() + + defer func() { + sqlDB, err := databaseInit.UserDB.DB() + if err != nil { + // 处理获取失败的情况 + log.Printf("获取底层数据库连接失败: %v", err) + return + } + + // 2. 调用 Close() 关闭连接池 + if err := sqlDB.Close(); err != nil { + log.Printf("关闭数据库连接池失败: %v", err) + } else { + log.Println("数据库连接池已成功关闭") + } + }() + // 初始化消费者 + c, err := rocketmq.NewPushConsumer( + consumer.WithNameServer([]string{"127.0.0.1:9876"}), // Namesrv 地址 + consumer.WithGroupName("order_consumer_group"), // 消费者组名 + // 消费模式:广播模式(BROADCASTING)或集群模式(CLUSTERING,默认) + consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), // 从最早消息开始消费 + ) + if err != nil { + log.Fatalf("初始化消费者失败: %v", err) + } + + // 订阅主题并注册消息处理函数 + err = c.Subscribe( + "killgoods", // 要消费的主题(需与生产者一致) + consumer.MessageSelector{}, // 消息过滤规则(空表示全部消费) + func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { + // 处理消息的回调函数(每次收到消息会触发) + for _, msg := range msgs { + // 解析消息体 + var orderMsg userOrder.OrderMessage + if err := json.Unmarshal(msg.Body, &orderMsg); err != nil { + log.Printf("解析消息失败: %v, 消息内容: %s", err, string(msg.Body)) + // 消息格式错误属于不可重试错误(重试也会失败),返回成功但记录错误 + return consumer.ConsumeSuccess, nil + } + + // 模拟业务处理(如更新订单状态、通知物流等) + log.Printf( + "收到消息: ID=%s, 订单ID=%s, 商品ID=%d, 用户ID=%d, 数量=%d", + msg.MsgId, orderMsg.OrderID, orderMsg.GoodsID, orderMsg.UserID, orderMsg.Quantity, + ) + + db := databaseInit.UserDB + if db == nil { + log.Printf("%sUserDB 未初始化,请先调用 InitUserDB()%s\n", colorRed, colorReset) + // 数据库未初始化属于可恢复错误(重启服务后可能解决),返回重试 + return consumer.ConsumeRetryLater, fmt.Errorf("UserDB 未初始化") + } + fmt.Printf("%s用户请求处理完成%s\n", colorBlue, colorReset) + + var goods good.Goods + + tx := db.Begin() + if tx.Error != nil { + log.Printf("%s开启事务失败: %v%s\n", colorRed, tx.Error, colorReset) + // 事务开启失败可能是临时错误(如连接池满),返回重试 + return consumer.ConsumeRetryLater, tx.Error + } + + // 加行锁查询商品 + // 执行库存扣减 + result := tx.Model(&good.Goods{}). + Where("gid = ? AND stock > 0", orderMsg.GoodsID). + Update("stock", gorm.Expr("stock - ?", 1)) + + if result.Error != nil { + tx.Rollback() + log.Printf("%s扣减库存失败: %v%s\n", colorRed, result.Error, colorReset) + return consumer.ConsumeRetryLater, result.Error + } + + // 获取实际受影响的行数 + affected := result.RowsAffected + + // 4. 判断库存是否充足(受影响行数=0表示库存不足) + if affected == 0 { + tx.Rollback() + log.Printf("%s商品库存不足,订单ID: %s,商品GID: %d%s\n", colorYellow, orderMsg.OrderID, orderMsg.GoodsID, colorReset) + return consumer.ConsumeSuccess, nil + } + + // 生成订单 + userorder := good.Order{ + OrderId: orderMsg.OrderID, + TradeTime: time.Now(), + } + + result1 := tx.Where("order_id = ?", orderMsg.OrderID).FirstOrCreate(&userorder) + if result1.Error != nil { + tx.Rollback() + log.Printf("%s创建/查询订单失败: %v%s\n", colorRed, result1.Error, colorReset) + return consumer.ConsumeRetryLater, result.Error + } + // 若订单已存在(重复消费),直接跳过创建逻辑,继续提交事务 + if result1.RowsAffected == 0 { + log.Printf("%s订单已存在(重复消费),订单ID: %s%s\n", colorYellow, orderMsg.OrderID, colorReset) + } + + if err := tx.Create(&userorder).Error; err != nil { + tx.Rollback() + log.Printf("%s创建订单失败: %v%s\n", colorRed, err, colorReset) + return consumer.ConsumeRetryLater, err + } + + // 提交事务 + if err := tx.Commit().Error; err != nil { + tx.Rollback() + log.Printf("%s提交事务失败: %v%s\n", colorRed, err, colorReset) + // 事务提交失败可能是临时错误,返回重试 + return consumer.ConsumeRetryLater, err + } + + log.Printf("%s订单创建成功,订单ID: %s,剩余库存: %d%s\n", colorGreen, userorder.OrderId, goods.Stock-1, colorReset) + } + + // 所有消息处理成功,返回成功 + return consumer.ConsumeSuccess, nil + }, + ) + if err != nil { + log.Fatalf("订阅主题失败: %v", err) + } + + // 启动消费者(会阻塞当前 goroutine) + if err := c.Start(); err != nil { + log.Fatalf("启动消费者失败: %v", err) + } + defer c.Shutdown() // 退出时关闭消费者 + + // 等待中断信号(如 Ctrl+C),保证程序持续运行 + log.Println("消费者已启动,持续等待消息...") + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit // 阻塞直到收到中断信号 + + log.Println("消费者正在退出...") +} diff --git a/rocketmq/killmq/mqproducer/killProducer.go b/rocketmq/killmq/mqproducer/killProducer.go new file mode 100644 index 0000000..aa9bbb4 --- /dev/null +++ b/rocketmq/killmq/mqproducer/killProducer.go @@ -0,0 +1,64 @@ +package mqproducer + +import ( + "context" + "encoding/json" + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/apache/rocketmq-client-go/v2/producer" + "log" + "sync" + "toutoukan/model/usermodel/userOrder" +) + +var normalProducer rocketmq.Producer +var producerInitOnce sync.Once // 用于保证生产者只初始化一次 + +// 发送普通消息 +func SendNormalMessage(OrderId string, GoodsID string, UserID int) error { + var initErr error + // 使用 sync.Once 保证生产者只初始化一次 + producerInitOnce.Do(func() { + normalProducer, initErr = rocketmq.NewProducer( + producer.WithNameServer([]string{"127.0.0.1:9876"}), // RocketMQ namesrv 地址 + producer.WithGroupName("order_producer_group"), // 生产者组名 + ) + if initErr != nil { + log.Fatalf("初始化普通生产者失败: %v", initErr) + return + } + + if initErr = normalProducer.Start(); initErr != nil { + log.Fatalf("启动普通生产者失败: %v", initErr) + return + } + }) + // 若初始化过程有错误,直接返回 + if initErr != nil { + log.Printf("生产者初始化失败: %v", initErr) + return initErr + } + + // 构造消息内容 + msgData := userOrder.OrderMessage{ + OrderID: OrderId, + GoodsID: GoodsID, + UserID: UserID, + Quantity: 1, + } + msgBody, _ := json.Marshal(msgData) + + // 创建消息(指定主题和内容) + msg := primitive.NewMessage("killgoods", msgBody) // 主题需提前创建或开启自动创建 + + // 发送消息(同步发送,等待结果) + result, err := normalProducer.SendSync(context.Background(), msg) + if err != nil { + log.Printf("普通消息发送失败: %v", err) + return err + } + + log.Printf("普通消息发送成功,消息ID: %s, 队列: %d, 偏移量: %d", + result.MsgID, result.MessageQueue.QueueId, result.QueueOffset) + return nil +}