178 lines
5.4 KiB
Go
178 lines
5.4 KiB
Go
|
|
package main
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"context"
|
|||
|
|
"encoding/json"
|
|||
|
|
"fmt"
|
|||
|
|
"log"
|
|||
|
|
"math/rand"
|
|||
|
|
"net/http"
|
|||
|
|
"time"
|
|||
|
|
|
|||
|
|
"github.com/dtm-labs/client/dtmcli"
|
|||
|
|
"github.com/gin-gonic/gin"
|
|||
|
|
"go.etcd.io/etcd/client/v3"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// ServiceInstance 服务实例结构
|
|||
|
|
type ServiceInstance struct {
|
|||
|
|
IP string `json:"ip"`
|
|||
|
|
Port int `json:"port"`
|
|||
|
|
Service string `json:"service"`
|
|||
|
|
Timestamp string `json:"timestamp"`
|
|||
|
|
Status string `json:"status"`
|
|||
|
|
Metadata map[string]string `json:"metadata"`
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// EmailRequest 邮件发送请求
|
|||
|
|
type EmailRequest struct {
|
|||
|
|
To string `json:"to"`
|
|||
|
|
Subject string `json:"subject"`
|
|||
|
|
Content string `json:"content"`
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// LoginRequest 用户登录请求
|
|||
|
|
type LoginRequest struct {
|
|||
|
|
UserID string `json:"user_id"`
|
|||
|
|
Password string `json:"password"` // 实际项目中应传递加密后的凭证
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 初始化etcd客户端
|
|||
|
|
func initEtcdClient(endpoints []string) (*clientv3.Client, error) {
|
|||
|
|
return clientv3.New(clientv3.Config{
|
|||
|
|
Endpoints: endpoints,
|
|||
|
|
DialTimeout: 5 * time.Second,
|
|||
|
|
})
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 从etcd发现指定服务的实例
|
|||
|
|
func discoverService(ctx context.Context, etcdClient *clientv3.Client, serviceName string) ([]ServiceInstance, error) {
|
|||
|
|
// 服务在etcd中的路径(与注册时保持一致)
|
|||
|
|
path := fmt.Sprintf("/microservices/%s/instances/", serviceName)
|
|||
|
|
resp, err := etcdClient.Get(ctx, path, clientv3.WithPrefix())
|
|||
|
|
if err != nil {
|
|||
|
|
return nil, fmt.Errorf("查询etcd失败: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
var instances []ServiceInstance
|
|||
|
|
for _, kv := range resp.Kvs {
|
|||
|
|
var instance ServiceInstance
|
|||
|
|
if err := json.Unmarshal(kv.Value, &instance); err != nil {
|
|||
|
|
log.Printf("解析服务实例失败,忽略此实例: %v", err)
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
if instance.Status == "running" {
|
|||
|
|
instances = append(instances, instance)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if len(instances) == 0 {
|
|||
|
|
return nil, fmt.Errorf("未发现可用的%s服务实例", serviceName)
|
|||
|
|
}
|
|||
|
|
return instances, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 从可用实例中选择一个(随机负载均衡)
|
|||
|
|
func selectServiceInstance(instances []ServiceInstance) (string, error) {
|
|||
|
|
if len(instances) == 0 {
|
|||
|
|
return "", fmt.Errorf("没有可用的服务实例")
|
|||
|
|
}
|
|||
|
|
rand.Seed(time.Now().UnixNano())
|
|||
|
|
selected := instances[rand.Intn(len(instances))]
|
|||
|
|
return fmt.Sprintf("http://%s:%d", selected.IP, selected.Port), nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 创建包含登录和邮件发送的分布式事务
|
|||
|
|
func createCompositeTransaction(c *gin.Context, etcdClient *clientv3.Client) {
|
|||
|
|
// DTM服务器地址
|
|||
|
|
dtmServer := "http://localhost:36789/api/dtmsvr"
|
|||
|
|
// 本地业务服务地址
|
|||
|
|
ctx := context.Background()
|
|||
|
|
|
|||
|
|
// 1. 发现并选择user-login服务实例
|
|||
|
|
loginInstances, err := discoverService(ctx, etcdClient, "user-login")
|
|||
|
|
if err != nil {
|
|||
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("发现user-login服务失败: %v", err)})
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
loginServiceAddr, err := selectServiceInstance(loginInstances)
|
|||
|
|
if err != nil {
|
|||
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("选择login服务实例失败: %v", err)})
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
log.Printf("已选择user-login服务实例: %s", loginServiceAddr)
|
|||
|
|
|
|||
|
|
emailInstances, err := discoverService(ctx, etcdClient, "email")
|
|||
|
|
if err != nil {
|
|||
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("发现email服务失败: %v", err)})
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
emailServiceAddr, err := selectServiceInstance(emailInstances)
|
|||
|
|
if err != nil {
|
|||
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("选择email服务实例失败: %v", err)})
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
log.Printf("已选择email服务实例: %s", emailServiceAddr)
|
|||
|
|
|
|||
|
|
// 3. 创建SAGA分布式事务(包含两个步骤)
|
|||
|
|
saga := dtmcli.NewSaga(dtmServer, dtmcli.MustGenGid(dtmServer)).
|
|||
|
|
// 第一步:调用user-login服务进行登录验证
|
|||
|
|
Add(
|
|||
|
|
loginServiceAddr+"/login", // 正向接口:用户登录
|
|||
|
|
loginServiceAddr+"/login/compensate", // 补偿接口:登录状态回滚
|
|||
|
|
LoginRequest{
|
|||
|
|
UserID: "user123",
|
|||
|
|
Password: "encrypted_token_xxx", // 实际项目中使用加密凭证
|
|||
|
|
},
|
|||
|
|
).
|
|||
|
|
// 第二步:调用email服务发送登录通知邮件(依赖第一步成功)
|
|||
|
|
Add(
|
|||
|
|
emailServiceAddr+"/send", // 正向接口:发送邮件
|
|||
|
|
emailServiceAddr+"/send/compensate", // 补偿接口:邮件发送回滚
|
|||
|
|
EmailRequest{
|
|||
|
|
To: "user@example.com",
|
|||
|
|
Subject: "登录成功通知",
|
|||
|
|
Content: fmt.Sprintf("您的账号于%s成功登录系统", time.Now().Format("2006-01-02 15:04:05")),
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// 4. 提交事务
|
|||
|
|
if err := saga.Submit(); err != nil {
|
|||
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("事务提交失败: %v", err)})
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
c.JSON(http.StatusOK, gin.H{
|
|||
|
|
"success": true,
|
|||
|
|
"message": "分布式事务已提交",
|
|||
|
|
"transactionId": saga.Gid,
|
|||
|
|
"steps": []map[string]string{
|
|||
|
|
{"service": "user-login", "address": loginServiceAddr},
|
|||
|
|
{"service": "email", "address": emailServiceAddr},
|
|||
|
|
},
|
|||
|
|
})
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func main() {
|
|||
|
|
// 初始化etcd客户端
|
|||
|
|
etcdClient, err := initEtcdClient([]string{"http://127.0.0.1:2379"})
|
|||
|
|
if err != nil {
|
|||
|
|
log.Fatalf("初始化etcd客户端失败: %v", err)
|
|||
|
|
}
|
|||
|
|
defer etcdClient.Close()
|
|||
|
|
|
|||
|
|
// 初始化Gin引擎
|
|||
|
|
r := gin.Default()
|
|||
|
|
|
|||
|
|
// 注册复合事务接口
|
|||
|
|
r.POST("/userAction", func(c *gin.Context) {
|
|||
|
|
createCompositeTransaction(c, etcdClient)
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
// 启动业务服务
|
|||
|
|
log.Println("业务服务启动,监听端口 8081")
|
|||
|
|
if err := r.Run(":8081"); err != nil {
|
|||
|
|
log.Fatalf("服务启动失败: %v", err)
|
|||
|
|
}
|
|||
|
|
}
|