Files
toutoukan/scripts/dtmScripts/dtm.go

178 lines
5.4 KiB
Go
Raw Normal View History

2025-09-15 11:09:22 +08:00
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"time"
"github.com/dtm-labs/client/dtmcli"
"github.com/gin-gonic/gin"
"go.etcd.io/etcd/client/v3"
)
// ServiceInstance 服务实例结构
type ServiceInstance struct {
IP string `json:"ip"`
Port int `json:"port"`
Service string `json:"service"`
Timestamp string `json:"timestamp"`
Status string `json:"status"`
Metadata map[string]string `json:"metadata"`
}
// EmailRequest 邮件发送请求
type EmailRequest struct {
To string `json:"to"`
Subject string `json:"subject"`
Content string `json:"content"`
}
// LoginRequest 用户登录请求
type LoginRequest struct {
UserID string `json:"user_id"`
Password string `json:"password"` // 实际项目中应传递加密后的凭证
}
// 初始化etcd客户端
func initEtcdClient(endpoints []string) (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
}
// 从etcd发现指定服务的实例
func discoverService(ctx context.Context, etcdClient *clientv3.Client, serviceName string) ([]ServiceInstance, error) {
// 服务在etcd中的路径与注册时保持一致
path := fmt.Sprintf("/microservices/%s/instances/", serviceName)
resp, err := etcdClient.Get(ctx, path, clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("查询etcd失败: %v", err)
}
var instances []ServiceInstance
for _, kv := range resp.Kvs {
var instance ServiceInstance
if err := json.Unmarshal(kv.Value, &instance); err != nil {
log.Printf("解析服务实例失败,忽略此实例: %v", err)
continue
}
if instance.Status == "running" {
instances = append(instances, instance)
}
}
if len(instances) == 0 {
return nil, fmt.Errorf("未发现可用的%s服务实例", serviceName)
}
return instances, nil
}
// 从可用实例中选择一个(随机负载均衡)
func selectServiceInstance(instances []ServiceInstance) (string, error) {
if len(instances) == 0 {
return "", fmt.Errorf("没有可用的服务实例")
}
rand.Seed(time.Now().UnixNano())
selected := instances[rand.Intn(len(instances))]
return fmt.Sprintf("http://%s:%d", selected.IP, selected.Port), nil
}
// 创建包含登录和邮件发送的分布式事务
func createCompositeTransaction(c *gin.Context, etcdClient *clientv3.Client) {
// DTM服务器地址
dtmServer := "http://localhost:36789/api/dtmsvr"
// 本地业务服务地址
ctx := context.Background()
// 1. 发现并选择user-login服务实例
loginInstances, err := discoverService(ctx, etcdClient, "user-login")
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("发现user-login服务失败: %v", err)})
return
}
loginServiceAddr, err := selectServiceInstance(loginInstances)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("选择login服务实例失败: %v", err)})
return
}
log.Printf("已选择user-login服务实例: %s", loginServiceAddr)
emailInstances, err := discoverService(ctx, etcdClient, "email")
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("发现email服务失败: %v", err)})
return
}
emailServiceAddr, err := selectServiceInstance(emailInstances)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("选择email服务实例失败: %v", err)})
return
}
log.Printf("已选择email服务实例: %s", emailServiceAddr)
// 3. 创建SAGA分布式事务包含两个步骤
saga := dtmcli.NewSaga(dtmServer, dtmcli.MustGenGid(dtmServer)).
// 第一步调用user-login服务进行登录验证
Add(
loginServiceAddr+"/login", // 正向接口:用户登录
loginServiceAddr+"/login/compensate", // 补偿接口:登录状态回滚
LoginRequest{
UserID: "user123",
Password: "encrypted_token_xxx", // 实际项目中使用加密凭证
},
).
// 第二步调用email服务发送登录通知邮件依赖第一步成功
Add(
emailServiceAddr+"/send", // 正向接口:发送邮件
emailServiceAddr+"/send/compensate", // 补偿接口:邮件发送回滚
EmailRequest{
To: "user@example.com",
Subject: "登录成功通知",
Content: fmt.Sprintf("您的账号于%s成功登录系统", time.Now().Format("2006-01-02 15:04:05")),
},
)
// 4. 提交事务
if err := saga.Submit(); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("事务提交失败: %v", err)})
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
"message": "分布式事务已提交",
"transactionId": saga.Gid,
"steps": []map[string]string{
{"service": "user-login", "address": loginServiceAddr},
{"service": "email", "address": emailServiceAddr},
},
})
}
func main() {
// 初始化etcd客户端
etcdClient, err := initEtcdClient([]string{"http://127.0.0.1:2379"})
if err != nil {
log.Fatalf("初始化etcd客户端失败: %v", err)
}
defer etcdClient.Close()
// 初始化Gin引擎
r := gin.Default()
// 注册复合事务接口
r.POST("/userAction", func(c *gin.Context) {
createCompositeTransaction(c, etcdClient)
})
// 启动业务服务
log.Println("业务服务启动,监听端口 8081")
if err := r.Run(":8081"); err != nil {
log.Fatalf("服务启动失败: %v", err)
}
}