package main import ( "context" "encoding/json" "fmt" "log" "math/rand" "net/http" "time" "github.com/dtm-labs/client/dtmcli" "github.com/gin-gonic/gin" "go.etcd.io/etcd/client/v3" ) // ServiceInstance 服务实例结构 type ServiceInstance struct { IP string `json:"ip"` Port int `json:"port"` Service string `json:"service"` Timestamp string `json:"timestamp"` Status string `json:"status"` Metadata map[string]string `json:"metadata"` } // EmailRequest 邮件发送请求 type EmailRequest struct { To string `json:"to"` Subject string `json:"subject"` Content string `json:"content"` } // LoginRequest 用户登录请求 type LoginRequest struct { UserID string `json:"user_id"` Password string `json:"password"` // 实际项目中应传递加密后的凭证 } // 初始化etcd客户端 func initEtcdClient(endpoints []string) (*clientv3.Client, error) { return clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, }) } // 从etcd发现指定服务的实例 func discoverService(ctx context.Context, etcdClient *clientv3.Client, serviceName string) ([]ServiceInstance, error) { // 服务在etcd中的路径(与注册时保持一致) path := fmt.Sprintf("/microservices/%s/instances/", serviceName) resp, err := etcdClient.Get(ctx, path, clientv3.WithPrefix()) if err != nil { return nil, fmt.Errorf("查询etcd失败: %v", err) } var instances []ServiceInstance for _, kv := range resp.Kvs { var instance ServiceInstance if err := json.Unmarshal(kv.Value, &instance); err != nil { log.Printf("解析服务实例失败,忽略此实例: %v", err) continue } if instance.Status == "running" { instances = append(instances, instance) } } if len(instances) == 0 { return nil, fmt.Errorf("未发现可用的%s服务实例", serviceName) } return instances, nil } // 从可用实例中选择一个(随机负载均衡) func selectServiceInstance(instances []ServiceInstance) (string, error) { if len(instances) == 0 { return "", fmt.Errorf("没有可用的服务实例") } rand.Seed(time.Now().UnixNano()) selected := instances[rand.Intn(len(instances))] return fmt.Sprintf("http://%s:%d", selected.IP, selected.Port), nil } // 创建包含登录和邮件发送的分布式事务 func createCompositeTransaction(c *gin.Context, etcdClient *clientv3.Client) { // DTM服务器地址 dtmServer := "http://localhost:36789/api/dtmsvr" // 本地业务服务地址 ctx := context.Background() // 1. 发现并选择user-login服务实例 loginInstances, err := discoverService(ctx, etcdClient, "user-login") if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("发现user-login服务失败: %v", err)}) return } loginServiceAddr, err := selectServiceInstance(loginInstances) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("选择login服务实例失败: %v", err)}) return } log.Printf("已选择user-login服务实例: %s", loginServiceAddr) emailInstances, err := discoverService(ctx, etcdClient, "email") if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("发现email服务失败: %v", err)}) return } emailServiceAddr, err := selectServiceInstance(emailInstances) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("选择email服务实例失败: %v", err)}) return } log.Printf("已选择email服务实例: %s", emailServiceAddr) // 3. 创建SAGA分布式事务(包含两个步骤) saga := dtmcli.NewSaga(dtmServer, dtmcli.MustGenGid(dtmServer)). // 第一步:调用user-login服务进行登录验证 Add( loginServiceAddr+"/login", // 正向接口:用户登录 loginServiceAddr+"/login/compensate", // 补偿接口:登录状态回滚 LoginRequest{ UserID: "user123", Password: "encrypted_token_xxx", // 实际项目中使用加密凭证 }, ). // 第二步:调用email服务发送登录通知邮件(依赖第一步成功) Add( emailServiceAddr+"/send", // 正向接口:发送邮件 emailServiceAddr+"/send/compensate", // 补偿接口:邮件发送回滚 EmailRequest{ To: "user@example.com", Subject: "登录成功通知", Content: fmt.Sprintf("您的账号于%s成功登录系统", time.Now().Format("2006-01-02 15:04:05")), }, ) // 4. 提交事务 if err := saga.Submit(); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("事务提交失败: %v", err)}) return } c.JSON(http.StatusOK, gin.H{ "success": true, "message": "分布式事务已提交", "transactionId": saga.Gid, "steps": []map[string]string{ {"service": "user-login", "address": loginServiceAddr}, {"service": "email", "address": emailServiceAddr}, }, }) } func main() { // 初始化etcd客户端 etcdClient, err := initEtcdClient([]string{"http://127.0.0.1:2379"}) if err != nil { log.Fatalf("初始化etcd客户端失败: %v", err) } defer etcdClient.Close() // 初始化Gin引擎 r := gin.Default() // 注册复合事务接口 r.POST("/userAction", func(c *gin.Context) { createCompositeTransaction(c, etcdClient) }) // 启动业务服务 log.Println("业务服务启动,监听端口 8081") if err := r.Run(":8081"); err != nil { log.Fatalf("服务启动失败: %v", err) } }