Files
2025-09-15 11:09:22 +08:00

178 lines
5.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"time"
"github.com/dtm-labs/client/dtmcli"
"github.com/gin-gonic/gin"
"go.etcd.io/etcd/client/v3"
)
// ServiceInstance 服务实例结构
type ServiceInstance struct {
IP string `json:"ip"`
Port int `json:"port"`
Service string `json:"service"`
Timestamp string `json:"timestamp"`
Status string `json:"status"`
Metadata map[string]string `json:"metadata"`
}
// EmailRequest 邮件发送请求
type EmailRequest struct {
To string `json:"to"`
Subject string `json:"subject"`
Content string `json:"content"`
}
// LoginRequest 用户登录请求
type LoginRequest struct {
UserID string `json:"user_id"`
Password string `json:"password"` // 实际项目中应传递加密后的凭证
}
// 初始化etcd客户端
func initEtcdClient(endpoints []string) (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
}
// 从etcd发现指定服务的实例
func discoverService(ctx context.Context, etcdClient *clientv3.Client, serviceName string) ([]ServiceInstance, error) {
// 服务在etcd中的路径与注册时保持一致
path := fmt.Sprintf("/microservices/%s/instances/", serviceName)
resp, err := etcdClient.Get(ctx, path, clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("查询etcd失败: %v", err)
}
var instances []ServiceInstance
for _, kv := range resp.Kvs {
var instance ServiceInstance
if err := json.Unmarshal(kv.Value, &instance); err != nil {
log.Printf("解析服务实例失败,忽略此实例: %v", err)
continue
}
if instance.Status == "running" {
instances = append(instances, instance)
}
}
if len(instances) == 0 {
return nil, fmt.Errorf("未发现可用的%s服务实例", serviceName)
}
return instances, nil
}
// 从可用实例中选择一个(随机负载均衡)
func selectServiceInstance(instances []ServiceInstance) (string, error) {
if len(instances) == 0 {
return "", fmt.Errorf("没有可用的服务实例")
}
rand.Seed(time.Now().UnixNano())
selected := instances[rand.Intn(len(instances))]
return fmt.Sprintf("http://%s:%d", selected.IP, selected.Port), nil
}
// 创建包含登录和邮件发送的分布式事务
func createCompositeTransaction(c *gin.Context, etcdClient *clientv3.Client) {
// DTM服务器地址
dtmServer := "http://localhost:36789/api/dtmsvr"
// 本地业务服务地址
ctx := context.Background()
// 1. 发现并选择user-login服务实例
loginInstances, err := discoverService(ctx, etcdClient, "user-login")
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("发现user-login服务失败: %v", err)})
return
}
loginServiceAddr, err := selectServiceInstance(loginInstances)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("选择login服务实例失败: %v", err)})
return
}
log.Printf("已选择user-login服务实例: %s", loginServiceAddr)
emailInstances, err := discoverService(ctx, etcdClient, "email")
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("发现email服务失败: %v", err)})
return
}
emailServiceAddr, err := selectServiceInstance(emailInstances)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("选择email服务实例失败: %v", err)})
return
}
log.Printf("已选择email服务实例: %s", emailServiceAddr)
// 3. 创建SAGA分布式事务包含两个步骤
saga := dtmcli.NewSaga(dtmServer, dtmcli.MustGenGid(dtmServer)).
// 第一步调用user-login服务进行登录验证
Add(
loginServiceAddr+"/login", // 正向接口:用户登录
loginServiceAddr+"/login/compensate", // 补偿接口:登录状态回滚
LoginRequest{
UserID: "user123",
Password: "encrypted_token_xxx", // 实际项目中使用加密凭证
},
).
// 第二步调用email服务发送登录通知邮件依赖第一步成功
Add(
emailServiceAddr+"/send", // 正向接口:发送邮件
emailServiceAddr+"/send/compensate", // 补偿接口:邮件发送回滚
EmailRequest{
To: "user@example.com",
Subject: "登录成功通知",
Content: fmt.Sprintf("您的账号于%s成功登录系统", time.Now().Format("2006-01-02 15:04:05")),
},
)
// 4. 提交事务
if err := saga.Submit(); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("事务提交失败: %v", err)})
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
"message": "分布式事务已提交",
"transactionId": saga.Gid,
"steps": []map[string]string{
{"service": "user-login", "address": loginServiceAddr},
{"service": "email", "address": emailServiceAddr},
},
})
}
func main() {
// 初始化etcd客户端
etcdClient, err := initEtcdClient([]string{"http://127.0.0.1:2379"})
if err != nil {
log.Fatalf("初始化etcd客户端失败: %v", err)
}
defer etcdClient.Close()
// 初始化Gin引擎
r := gin.Default()
// 注册复合事务接口
r.POST("/userAction", func(c *gin.Context) {
createCompositeTransaction(c, etcdClient)
})
// 启动业务服务
log.Println("业务服务启动,监听端口 8081")
if err := r.Run(":8081"); err != nil {
log.Fatalf("服务启动失败: %v", err)
}
}