项目结构更新
This commit is contained in:
176
services/order/killOrder/orderMq/consumer/killConsumer.go
Normal file
176
services/order/killOrder/orderMq/consumer/killConsumer.go
Normal file
@@ -0,0 +1,176 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"gorm.io/gorm"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
"toutoukan/init/databaseInit"
|
||||
"toutoukan/model/good"
|
||||
"toutoukan/model/usermodel/userOrder"
|
||||
|
||||
"github.com/apache/rocketmq-client-go/v2"
|
||||
"github.com/apache/rocketmq-client-go/v2/consumer"
|
||||
"github.com/apache/rocketmq-client-go/v2/primitive"
|
||||
)
|
||||
|
||||
const (
|
||||
colorRed = "\033[31m" // 红色
|
||||
colorGreen = "\033[32m" // 绿色
|
||||
colorYellow = "\033[33m" // 黄色
|
||||
colorBlue = "\033[34m" // 蓝色
|
||||
colorReset = "\033[0m" // 重置颜色
|
||||
)
|
||||
|
||||
// 与生产者对应的消息结构体
|
||||
|
||||
func main() {
|
||||
|
||||
databaseInit.DbInit()
|
||||
|
||||
defer func() {
|
||||
sqlDB, err := databaseInit.UserDB.DB()
|
||||
if err != nil {
|
||||
// 处理获取失败的情况
|
||||
log.Printf("获取底层数据库连接失败: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 2. 调用 Close() 关闭连接池
|
||||
if err := sqlDB.Close(); err != nil {
|
||||
log.Printf("关闭数据库连接池失败: %v", err)
|
||||
} else {
|
||||
log.Println("数据库连接池已成功关闭")
|
||||
}
|
||||
}()
|
||||
// 初始化消费者
|
||||
c, err := rocketmq.NewPushConsumer(
|
||||
consumer.WithNameServer([]string{"127.0.0.1:9876"}), // Namesrv 地址
|
||||
consumer.WithGroupName("order_consumer_group"), // 消费者组名
|
||||
// 消费模式:广播模式(BROADCASTING)或集群模式(CLUSTERING,默认)
|
||||
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), // 从最早消息开始消费
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("初始化消费者失败: %v", err)
|
||||
}
|
||||
|
||||
// 订阅主题并注册消息处理函数
|
||||
err = c.Subscribe(
|
||||
"killgoods", // 要消费的主题(需与生产者一致)
|
||||
consumer.MessageSelector{}, // 消息过滤规则(空表示全部消费)
|
||||
func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
|
||||
// 处理消息的回调函数(每次收到消息会触发)
|
||||
for _, msg := range msgs {
|
||||
// 解析消息体
|
||||
var orderMsg userOrder.OrderMessage
|
||||
if err := json.Unmarshal(msg.Body, &orderMsg); err != nil {
|
||||
log.Printf("解析消息失败: %v, 消息内容: %s", err, string(msg.Body))
|
||||
// 消息格式错误属于不可重试错误(重试也会失败),返回成功但记录错误
|
||||
return consumer.ConsumeSuccess, nil
|
||||
}
|
||||
|
||||
log.Printf(
|
||||
"收到消息: ID=%s, 订单ID=%s, 商品ID=%d, 用户ID=%d, 数量=%d",
|
||||
msg.MsgId, orderMsg.OrderID, orderMsg.GoodsID, orderMsg.UserID, orderMsg.Quantity,
|
||||
)
|
||||
|
||||
db := databaseInit.UserDB
|
||||
if db == nil {
|
||||
log.Printf("%sUserDB 未初始化,请先调用 InitUserDB()%s\n", colorRed, colorReset)
|
||||
// 数据库未初始化属于可恢复错误(重启服务后可能解决),返回重试
|
||||
return consumer.ConsumeRetryLater, fmt.Errorf("UserDB 未初始化")
|
||||
}
|
||||
fmt.Printf("%s用户请求处理完成%s\n", colorBlue, colorReset)
|
||||
|
||||
var goods good.Goods
|
||||
|
||||
tx := db.Begin()
|
||||
if tx.Error != nil {
|
||||
log.Printf("%s开启事务失败: %v%s\n", colorRed, tx.Error, colorReset)
|
||||
// 事务开启失败可能是临时错误(如连接池满),返回重试
|
||||
return consumer.ConsumeRetryLater, tx.Error
|
||||
}
|
||||
|
||||
// 加行锁查询商品
|
||||
// 执行库存扣减
|
||||
result := tx.Model(&good.Goods{}).
|
||||
Where("gid = ? AND stock > 0", orderMsg.GoodsID).
|
||||
Update("stock", gorm.Expr("stock - ?", 1))
|
||||
|
||||
if result.Error != nil {
|
||||
tx.Rollback()
|
||||
log.Printf("%s扣减库存失败: %v%s\n", colorRed, result.Error, colorReset)
|
||||
return consumer.ConsumeRetryLater, result.Error
|
||||
}
|
||||
|
||||
// 获取实际受影响的行数
|
||||
affected := result.RowsAffected
|
||||
|
||||
// 4. 判断库存是否充足(受影响行数=0表示库存不足)
|
||||
if affected == 0 {
|
||||
tx.Rollback()
|
||||
log.Printf("%s商品库存不足,订单ID: %s,商品GID: %d%s\n", colorYellow, orderMsg.OrderID, orderMsg.GoodsID, colorReset)
|
||||
return consumer.ConsumeSuccess, nil
|
||||
}
|
||||
|
||||
// 生成订单
|
||||
userorder := good.Order{
|
||||
OrderId: orderMsg.OrderID,
|
||||
TradeTime: time.Now(),
|
||||
}
|
||||
|
||||
result1 := tx.Where("order_id = ?", orderMsg.OrderID).FirstOrCreate(&userorder)
|
||||
if result1.Error != nil {
|
||||
tx.Rollback()
|
||||
log.Printf("%s创建/查询订单失败: %v%s\n", colorRed, result1.Error, colorReset)
|
||||
return consumer.ConsumeRetryLater, result.Error
|
||||
}
|
||||
// 若订单已存在(重复消费),直接跳过创建逻辑,继续提交事务
|
||||
if result1.RowsAffected == 0 {
|
||||
log.Printf("%s订单已存在(重复消费),订单ID: %s%s\n", colorYellow, orderMsg.OrderID, colorReset)
|
||||
}
|
||||
|
||||
if err := tx.Create(&userorder).Error; err != nil {
|
||||
tx.Rollback()
|
||||
log.Printf("%s创建订单失败: %v%s\n", colorRed, err, colorReset)
|
||||
return consumer.ConsumeRetryLater, err
|
||||
}
|
||||
|
||||
// 提交事务
|
||||
if err := tx.Commit().Error; err != nil {
|
||||
tx.Rollback()
|
||||
log.Printf("%s提交事务失败: %v%s\n", colorRed, err, colorReset)
|
||||
// 事务提交失败可能是临时错误,返回重试
|
||||
return consumer.ConsumeRetryLater, err
|
||||
}
|
||||
|
||||
log.Printf("%s订单创建成功,订单ID: %s,剩余库存: %d%s\n", colorGreen, userorder.OrderId, goods.Stock-1, colorReset)
|
||||
}
|
||||
|
||||
// 所有消息处理成功,返回成功
|
||||
return consumer.ConsumeSuccess, nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("订阅主题失败: %v", err)
|
||||
}
|
||||
|
||||
// 启动消费者(会阻塞当前 goroutine)
|
||||
if err := c.Start(); err != nil {
|
||||
log.Fatalf("启动消费者失败: %v", err)
|
||||
}
|
||||
defer c.Shutdown() // 退出时关闭消费者
|
||||
|
||||
// 等待中断信号(如 Ctrl+C),保证程序持续运行
|
||||
log.Println("消费者已启动,持续等待消息...")
|
||||
quit := make(chan os.Signal, 1)
|
||||
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-quit // 阻塞直到收到中断信号
|
||||
|
||||
log.Println("消费者正在退出...")
|
||||
}
|
||||
64
services/order/killOrder/orderMq/mqproducer/killProducer.go
Normal file
64
services/order/killOrder/orderMq/mqproducer/killProducer.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package mqproducer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/apache/rocketmq-client-go/v2"
|
||||
"github.com/apache/rocketmq-client-go/v2/primitive"
|
||||
"github.com/apache/rocketmq-client-go/v2/producer"
|
||||
"log"
|
||||
"sync"
|
||||
"toutoukan/model/usermodel/userOrder"
|
||||
)
|
||||
|
||||
var normalProducer rocketmq.Producer
|
||||
var producerInitOnce sync.Once // 用于保证生产者只初始化一次
|
||||
|
||||
// 发送普通消息
|
||||
func SendNormalMessage(OrderId string, GoodsID string, UserID int) error {
|
||||
var initErr error
|
||||
// 使用 sync.Once 保证生产者只初始化一次
|
||||
producerInitOnce.Do(func() {
|
||||
normalProducer, initErr = rocketmq.NewProducer(
|
||||
producer.WithNameServer([]string{"127.0.0.1:9876"}), // RocketMQ namesrv 地址
|
||||
producer.WithGroupName("order_producer_group"), // 生产者组名
|
||||
)
|
||||
if initErr != nil {
|
||||
log.Fatalf("初始化普通生产者失败: %v", initErr)
|
||||
return
|
||||
}
|
||||
|
||||
if initErr = normalProducer.Start(); initErr != nil {
|
||||
log.Fatalf("启动普通生产者失败: %v", initErr)
|
||||
return
|
||||
}
|
||||
})
|
||||
// 若初始化过程有错误,直接返回
|
||||
if initErr != nil {
|
||||
log.Printf("生产者初始化失败: %v", initErr)
|
||||
return initErr
|
||||
}
|
||||
|
||||
// 构造消息内容
|
||||
msgData := userOrder.OrderMessage{
|
||||
OrderID: OrderId,
|
||||
GoodsID: GoodsID,
|
||||
UserID: UserID,
|
||||
Quantity: 1,
|
||||
}
|
||||
msgBody, _ := json.Marshal(msgData)
|
||||
|
||||
// 创建消息(指定主题和内容)
|
||||
msg := primitive.NewMessage("killgoods", msgBody) // 主题需提前创建或开启自动创建
|
||||
|
||||
// 发送消息(同步发送,等待结果)
|
||||
result, err := normalProducer.SendSync(context.Background(), msg)
|
||||
if err != nil {
|
||||
log.Printf("普通消息发送失败: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("普通消息发送成功,消息ID: %s, 队列: %d, 偏移量: %d",
|
||||
result.MsgID, result.MessageQueue.QueueId, result.QueueOffset)
|
||||
return nil
|
||||
}
|
||||
5
services/order/killOrder/orderService/orderService.go
Normal file
5
services/order/killOrder/orderService/orderService.go
Normal file
@@ -0,0 +1,5 @@
|
||||
package orderService
|
||||
|
||||
func OrderService() {
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user