178 lines
5.4 KiB
Go
178 lines
5.4 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log"
|
||
"math/rand"
|
||
"net/http"
|
||
"time"
|
||
|
||
"github.com/dtm-labs/client/dtmcli"
|
||
"github.com/gin-gonic/gin"
|
||
"go.etcd.io/etcd/client/v3"
|
||
)
|
||
|
||
// ServiceInstance 服务实例结构
|
||
type ServiceInstance struct {
|
||
IP string `json:"ip"`
|
||
Port int `json:"port"`
|
||
Service string `json:"service"`
|
||
Timestamp string `json:"timestamp"`
|
||
Status string `json:"status"`
|
||
Metadata map[string]string `json:"metadata"`
|
||
}
|
||
|
||
// EmailRequest 邮件发送请求
|
||
type EmailRequest struct {
|
||
To string `json:"to"`
|
||
Subject string `json:"subject"`
|
||
Content string `json:"content"`
|
||
}
|
||
|
||
// LoginRequest 用户登录请求
|
||
type LoginRequest struct {
|
||
UserID string `json:"user_id"`
|
||
Password string `json:"password"` // 实际项目中应传递加密后的凭证
|
||
}
|
||
|
||
// 初始化etcd客户端
|
||
func initEtcdClient(endpoints []string) (*clientv3.Client, error) {
|
||
return clientv3.New(clientv3.Config{
|
||
Endpoints: endpoints,
|
||
DialTimeout: 5 * time.Second,
|
||
})
|
||
}
|
||
|
||
// 从etcd发现指定服务的实例
|
||
func discoverService(ctx context.Context, etcdClient *clientv3.Client, serviceName string) ([]ServiceInstance, error) {
|
||
// 服务在etcd中的路径(与注册时保持一致)
|
||
path := fmt.Sprintf("/microservices/%s/instances/", serviceName)
|
||
resp, err := etcdClient.Get(ctx, path, clientv3.WithPrefix())
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询etcd失败: %v", err)
|
||
}
|
||
|
||
var instances []ServiceInstance
|
||
for _, kv := range resp.Kvs {
|
||
var instance ServiceInstance
|
||
if err := json.Unmarshal(kv.Value, &instance); err != nil {
|
||
log.Printf("解析服务实例失败,忽略此实例: %v", err)
|
||
continue
|
||
}
|
||
if instance.Status == "running" {
|
||
instances = append(instances, instance)
|
||
}
|
||
}
|
||
|
||
if len(instances) == 0 {
|
||
return nil, fmt.Errorf("未发现可用的%s服务实例", serviceName)
|
||
}
|
||
return instances, nil
|
||
}
|
||
|
||
// 从可用实例中选择一个(随机负载均衡)
|
||
func selectServiceInstance(instances []ServiceInstance) (string, error) {
|
||
if len(instances) == 0 {
|
||
return "", fmt.Errorf("没有可用的服务实例")
|
||
}
|
||
rand.Seed(time.Now().UnixNano())
|
||
selected := instances[rand.Intn(len(instances))]
|
||
return fmt.Sprintf("http://%s:%d", selected.IP, selected.Port), nil
|
||
}
|
||
|
||
// 创建包含登录和邮件发送的分布式事务
|
||
func createCompositeTransaction(c *gin.Context, etcdClient *clientv3.Client) {
|
||
// DTM服务器地址
|
||
dtmServer := "http://localhost:36789/api/dtmsvr"
|
||
// 本地业务服务地址
|
||
ctx := context.Background()
|
||
|
||
// 1. 发现并选择user-login服务实例
|
||
loginInstances, err := discoverService(ctx, etcdClient, "user-login")
|
||
if err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("发现user-login服务失败: %v", err)})
|
||
return
|
||
}
|
||
loginServiceAddr, err := selectServiceInstance(loginInstances)
|
||
if err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("选择login服务实例失败: %v", err)})
|
||
return
|
||
}
|
||
log.Printf("已选择user-login服务实例: %s", loginServiceAddr)
|
||
|
||
emailInstances, err := discoverService(ctx, etcdClient, "email")
|
||
if err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("发现email服务失败: %v", err)})
|
||
return
|
||
}
|
||
emailServiceAddr, err := selectServiceInstance(emailInstances)
|
||
if err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("选择email服务实例失败: %v", err)})
|
||
return
|
||
}
|
||
log.Printf("已选择email服务实例: %s", emailServiceAddr)
|
||
|
||
// 3. 创建SAGA分布式事务(包含两个步骤)
|
||
saga := dtmcli.NewSaga(dtmServer, dtmcli.MustGenGid(dtmServer)).
|
||
// 第一步:调用user-login服务进行登录验证
|
||
Add(
|
||
loginServiceAddr+"/login", // 正向接口:用户登录
|
||
loginServiceAddr+"/login/compensate", // 补偿接口:登录状态回滚
|
||
LoginRequest{
|
||
UserID: "user123",
|
||
Password: "encrypted_token_xxx", // 实际项目中使用加密凭证
|
||
},
|
||
).
|
||
// 第二步:调用email服务发送登录通知邮件(依赖第一步成功)
|
||
Add(
|
||
emailServiceAddr+"/send", // 正向接口:发送邮件
|
||
emailServiceAddr+"/send/compensate", // 补偿接口:邮件发送回滚
|
||
EmailRequest{
|
||
To: "user@example.com",
|
||
Subject: "登录成功通知",
|
||
Content: fmt.Sprintf("您的账号于%s成功登录系统", time.Now().Format("2006-01-02 15:04:05")),
|
||
},
|
||
)
|
||
|
||
// 4. 提交事务
|
||
if err := saga.Submit(); err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("事务提交失败: %v", err)})
|
||
return
|
||
}
|
||
|
||
c.JSON(http.StatusOK, gin.H{
|
||
"success": true,
|
||
"message": "分布式事务已提交",
|
||
"transactionId": saga.Gid,
|
||
"steps": []map[string]string{
|
||
{"service": "user-login", "address": loginServiceAddr},
|
||
{"service": "email", "address": emailServiceAddr},
|
||
},
|
||
})
|
||
}
|
||
|
||
func main() {
|
||
// 初始化etcd客户端
|
||
etcdClient, err := initEtcdClient([]string{"http://127.0.0.1:2379"})
|
||
if err != nil {
|
||
log.Fatalf("初始化etcd客户端失败: %v", err)
|
||
}
|
||
defer etcdClient.Close()
|
||
|
||
// 初始化Gin引擎
|
||
r := gin.Default()
|
||
|
||
// 注册复合事务接口
|
||
r.POST("/userAction", func(c *gin.Context) {
|
||
createCompositeTransaction(c, etcdClient)
|
||
})
|
||
|
||
// 启动业务服务
|
||
log.Println("业务服务启动,监听端口 8081")
|
||
if err := r.Run(":8081"); err != nil {
|
||
log.Fatalf("服务启动失败: %v", err)
|
||
}
|
||
}
|