From 8ccf028ae4304134438e953e5fd8f260cce69b5c Mon Sep 17 00:00:00 2001 From: JACKYMYPERSON Date: Mon, 15 Sep 2025 11:09:22 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E7=BB=93=E6=9E=84=E6=9B=B4?= =?UTF-8?q?=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- {kills => controllers/kills}/kill.go | 6 +- controllers/user/userLogout.go | 4 +- .../docker-compose.yml | 0 go.mod | 49 +++++ {config => init/config}/config.go | 0 main.go | 2 +- router/setupRouter.go | 8 +- scripts/dtmScripts/dtm.go | 177 +++++++++++++++ .../connectPool/goroutine/readgo.go | 0 .../connectPool/goroutine/updatego.go | 0 .../connectPool/goroutine/writego.go | 0 .../goroutine}/connectPool/pool.go | 6 +- {goroutine => scripts/goroutine}/consumer.go | 0 {goroutine => scripts/goroutine}/producer.go | 0 {oneKUsers => scripts/oneKUsers}/mian.go | 0 services/mail/grpc/mailGrpc.go | 203 ++++++++++++++++++ .../order/createOrder/grpc/createOrderGrpc.go | 1 + .../orderMq}/consumer/killConsumer.go | 1 - .../orderMq}/mqproducer/killProducer.go | 0 .../killOrder/orderService/orderService.go | 5 + services/order/payOrder/grpc/payOrderGrpc.go | 12 ++ .../user/userLogin/grpc/user-consumer/main.go | 69 ++++++ .../user/userLogin/grpc/user-server/main.go | 76 +++++++ utill/{ => jwt}/jwtUt.go | 4 +- utill/{scripts => luaScripts}/killscripts.go | 2 +- 25 files changed, 608 insertions(+), 17 deletions(-) rename {kills => controllers/kills}/kill.go (93%) rename docker-compose.yml => docker/docker-compose.yml (100%) rename {config => init/config}/config.go (100%) create mode 100644 scripts/dtmScripts/dtm.go rename {goroutine => scripts/goroutine}/connectPool/goroutine/readgo.go (100%) rename {goroutine => scripts/goroutine}/connectPool/goroutine/updatego.go (100%) rename {goroutine => scripts/goroutine}/connectPool/goroutine/writego.go (100%) rename {goroutine => scripts/goroutine}/connectPool/pool.go (76%) rename {goroutine => scripts/goroutine}/consumer.go (100%) rename {goroutine => scripts/goroutine}/producer.go (100%) rename {oneKUsers => scripts/oneKUsers}/mian.go (100%) create mode 100644 services/mail/grpc/mailGrpc.go create mode 100644 services/order/createOrder/grpc/createOrderGrpc.go rename {rocketmq/killmq => services/order/killOrder/orderMq}/consumer/killConsumer.go (98%) rename {rocketmq/killmq => services/order/killOrder/orderMq}/mqproducer/killProducer.go (100%) create mode 100644 services/order/killOrder/orderService/orderService.go create mode 100644 services/order/payOrder/grpc/payOrderGrpc.go create mode 100644 services/user/userLogin/grpc/user-consumer/main.go create mode 100644 services/user/userLogin/grpc/user-server/main.go rename utill/{ => jwt}/jwtUt.go (98%) rename utill/{scripts => luaScripts}/killscripts.go (94%) diff --git a/kills/kill.go b/controllers/kills/kill.go similarity index 93% rename from kills/kill.go rename to controllers/kills/kill.go index b42b02e..910daea 100644 --- a/kills/kill.go +++ b/controllers/kills/kill.go @@ -11,8 +11,8 @@ import ( "time" "toutoukan/init/redisInit" "toutoukan/model/usermodel/userOrder" - "toutoukan/rocketmq/killmq/mqproducer" - "toutoukan/utill/scripts" + "toutoukan/services/order/killOrder/orderMq/mqproducer" + "toutoukan/utill/luaScripts" ) func GenerateOrderID() string { @@ -39,7 +39,7 @@ func Userkill(c *gin.Context) { OrderID := GenerateOrderID() // 1. Redis扣减库存(不变) - result, err := redisInit.RedisClient.Eval(context.Background(), scripts.Luascript_forkill, []string{req.Order, "mayiming"}, 1, OrderID).Int() + result, err := redisInit.RedisClient.Eval(context.Background(), luaScripts.Luascript_forkill, []string{req.Order, "mayiming"}, 1, OrderID).Int() if err != nil { log.Printf("Redis 脚本执行错误: %v", err) c.JSON(500, gin.H{"error": "库存操作失败", "detail": err.Error()}) diff --git a/controllers/user/userLogout.go b/controllers/user/userLogout.go index 78f35dd..59016bf 100644 --- a/controllers/user/userLogout.go +++ b/controllers/user/userLogout.go @@ -4,12 +4,12 @@ import ( "fmt" "github.com/gin-gonic/gin" "toutoukan/init/redisInit" - "toutoukan/utill" + "toutoukan/utill/jwt" ) // 登出功能(从Redis删除令牌) func LogoutHandler(c *gin.Context) { - tokenString := utill.ExtractTokenFromHeader(c) + tokenString := jwt.ExtractTokenFromHeader(c) if tokenString == "" { c.JSON(400, gin.H{"error": "令牌不存在", "code": "10038"}) return diff --git a/docker-compose.yml b/docker/docker-compose.yml similarity index 100% rename from docker-compose.yml rename to docker/docker-compose.yml diff --git a/go.mod b/go.mod index 2eefae5..48dd8b9 100644 --- a/go.mod +++ b/go.mod @@ -21,8 +21,12 @@ require ( require ( filippo.io/edwards25519 v1.1.0 // indirect + github.com/Microsoft/go-winio v0.6.0 // indirect + github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7 // indirect github.com/RoaringBitmap/roaring/v2 v2.10.0 // indirect + github.com/acomagu/bufpipe v1.0.3 // indirect github.com/benbjohnson/clock v1.3.5 // indirect + github.com/bitly/go-simplejson v0.5.0 // indirect github.com/bits-and-blooms/bitset v1.24.0 // indirect github.com/blevesearch/bleve_index_api v1.2.9 // indirect github.com/blevesearch/geo v0.2.4 // indirect @@ -47,55 +51,100 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.6 // indirect github.com/cloudwego/iasm v0.2.0 // indirect + github.com/coreos/go-semver v0.3.1 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/dtm-labs/client v1.17.3 // indirect + github.com/dtm-labs/dtmdriver v0.0.6 // indirect + github.com/dtm-labs/logger v0.0.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/emirpasic/gods v1.18.1 // indirect + github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gabriel-vasile/mimetype v1.4.10 // indirect github.com/gin-contrib/sse v1.1.0 // indirect + github.com/go-git/gcfg v1.5.0 // indirect + github.com/go-git/go-billy/v5 v5.3.1 // indirect + github.com/go-git/go-git/v5 v5.4.2 // indirect github.com/go-ini/ini v1.67.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.27.0 // indirect + github.com/go-resty/resty/v2 v2.7.0 // indirect + github.com/go-stack/stack v1.8.0 // indirect github.com/goccy/go-json v0.10.5 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v1.0.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect + github.com/imdario/mergo v0.3.12 // indirect + github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/kevinburke/ssh_config v0.0.0-20201106050909-4977a11b4351 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/miekg/dns v1.1.43 // indirect github.com/minio/crc64nvme v1.0.2 // indirect github.com/minio/md5-simd v1.1.2 // indirect github.com/minio/minio-go/v7 v7.0.95 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mschoch/smat v0.2.0 // indirect + github.com/natefinch/lumberjack v2.0.0+incompatible // indirect + github.com/nxadm/tail v1.4.8 // indirect + github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/philhofer/fwd v1.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/rs/xid v1.6.0 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/sergi/go-diff v1.1.0 // indirect github.com/tidwall/gjson v1.18.0 // indirect github.com/tidwall/match v1.2.0 // indirect github.com/tidwall/pretty v1.2.1 // indirect github.com/tinylib/msgp v1.3.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.3.0 // indirect + github.com/urfave/cli/v2 v2.25.7 // indirect + github.com/xanzy/ssh-agent v0.3.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.0.2 // indirect + github.com/xdg-go/stringprep v1.0.2 // indirect + github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + go-micro.dev/v4 v4.11.0 // indirect go.etcd.io/bbolt v1.4.3 // indirect + go.etcd.io/etcd/api/v3 v3.6.4 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.6.4 // indirect + go.etcd.io/etcd/client/v3 v3.6.4 // indirect + go.mongodb.org/mongo-driver v1.9.1 // indirect go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/arch v0.21.0 // indirect golang.org/x/crypto v0.42.0 // indirect golang.org/x/lint v0.0.0-20241112194109-818c5a804067 // indirect + golang.org/x/mod v0.28.0 // indirect golang.org/x/net v0.44.0 // indirect + golang.org/x/sync v0.17.0 // indirect golang.org/x/sys v0.36.0 // indirect golang.org/x/term v0.35.0 // indirect golang.org/x/text v0.29.0 // indirect golang.org/x/tools v0.37.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 // indirect + google.golang.org/grpc v1.71.1 // indirect google.golang.org/protobuf v1.36.9 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect + gopkg.in/warnings.v0 v0.1.2 // indirect stathat.com/c/consistent v1.0.0 // indirect ) diff --git a/config/config.go b/init/config/config.go similarity index 100% rename from config/config.go rename to init/config/config.go diff --git a/main.go b/main.go index 1790a4a..563dfec 100644 --- a/main.go +++ b/main.go @@ -3,9 +3,9 @@ package main import ( "log" "strconv" - "toutoukan/config" "toutoukan/controllers/search" "toutoukan/init/bleveInit" + "toutoukan/init/config" "toutoukan/init/databaseInit" "toutoukan/init/redisInit" "toutoukan/router" diff --git a/router/setupRouter.go b/router/setupRouter.go index 84850fa..404ce5f 100644 --- a/router/setupRouter.go +++ b/router/setupRouter.go @@ -1,14 +1,14 @@ package router import ( + "toutoukan/controllers/kills" "toutoukan/controllers/search" "toutoukan/controllers/system" "toutoukan/controllers/test" "toutoukan/controllers/user" "toutoukan/init/ratelimit" - "toutoukan/kills" "toutoukan/socket" - "toutoukan/utill" + "toutoukan/utill/jwt" "github.com/gin-gonic/gin" ) @@ -19,11 +19,11 @@ func SetupRouter() *gin.Engine { //apiGroup.Use(hystrix.CircuitBreakerMiddleware("user_api")) { apiGroup.POST("/login", user.UserLogin) - apiGroup.POST("/test", utill.JWTAuthMiddleware(), test.Testjwt) + apiGroup.POST("/test", jwt.JWTAuthMiddleware(), test.Testjwt) apiGroup.POST("/kill", ratelimit.RateLimitMiddleware(), kills.Userkill) } - r.GET("/socket", utill.JWTAuthMiddleware(), func(c *gin.Context) { + r.GET("/socket", jwt.JWTAuthMiddleware(), func(c *gin.Context) { socket.WebsocketHandler(c) }) systemGroup := r.Group("/system") diff --git a/scripts/dtmScripts/dtm.go b/scripts/dtmScripts/dtm.go new file mode 100644 index 0000000..8289034 --- /dev/null +++ b/scripts/dtmScripts/dtm.go @@ -0,0 +1,177 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "math/rand" + "net/http" + "time" + + "github.com/dtm-labs/client/dtmcli" + "github.com/gin-gonic/gin" + "go.etcd.io/etcd/client/v3" +) + +// ServiceInstance 服务实例结构 +type ServiceInstance struct { + IP string `json:"ip"` + Port int `json:"port"` + Service string `json:"service"` + Timestamp string `json:"timestamp"` + Status string `json:"status"` + Metadata map[string]string `json:"metadata"` +} + +// EmailRequest 邮件发送请求 +type EmailRequest struct { + To string `json:"to"` + Subject string `json:"subject"` + Content string `json:"content"` +} + +// LoginRequest 用户登录请求 +type LoginRequest struct { + UserID string `json:"user_id"` + Password string `json:"password"` // 实际项目中应传递加密后的凭证 +} + +// 初始化etcd客户端 +func initEtcdClient(endpoints []string) (*clientv3.Client, error) { + return clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 5 * time.Second, + }) +} + +// 从etcd发现指定服务的实例 +func discoverService(ctx context.Context, etcdClient *clientv3.Client, serviceName string) ([]ServiceInstance, error) { + // 服务在etcd中的路径(与注册时保持一致) + path := fmt.Sprintf("/microservices/%s/instances/", serviceName) + resp, err := etcdClient.Get(ctx, path, clientv3.WithPrefix()) + if err != nil { + return nil, fmt.Errorf("查询etcd失败: %v", err) + } + + var instances []ServiceInstance + for _, kv := range resp.Kvs { + var instance ServiceInstance + if err := json.Unmarshal(kv.Value, &instance); err != nil { + log.Printf("解析服务实例失败,忽略此实例: %v", err) + continue + } + if instance.Status == "running" { + instances = append(instances, instance) + } + } + + if len(instances) == 0 { + return nil, fmt.Errorf("未发现可用的%s服务实例", serviceName) + } + return instances, nil +} + +// 从可用实例中选择一个(随机负载均衡) +func selectServiceInstance(instances []ServiceInstance) (string, error) { + if len(instances) == 0 { + return "", fmt.Errorf("没有可用的服务实例") + } + rand.Seed(time.Now().UnixNano()) + selected := instances[rand.Intn(len(instances))] + return fmt.Sprintf("http://%s:%d", selected.IP, selected.Port), nil +} + +// 创建包含登录和邮件发送的分布式事务 +func createCompositeTransaction(c *gin.Context, etcdClient *clientv3.Client) { + // DTM服务器地址 + dtmServer := "http://localhost:36789/api/dtmsvr" + // 本地业务服务地址 + ctx := context.Background() + + // 1. 发现并选择user-login服务实例 + loginInstances, err := discoverService(ctx, etcdClient, "user-login") + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("发现user-login服务失败: %v", err)}) + return + } + loginServiceAddr, err := selectServiceInstance(loginInstances) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("选择login服务实例失败: %v", err)}) + return + } + log.Printf("已选择user-login服务实例: %s", loginServiceAddr) + + emailInstances, err := discoverService(ctx, etcdClient, "email") + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("发现email服务失败: %v", err)}) + return + } + emailServiceAddr, err := selectServiceInstance(emailInstances) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("选择email服务实例失败: %v", err)}) + return + } + log.Printf("已选择email服务实例: %s", emailServiceAddr) + + // 3. 创建SAGA分布式事务(包含两个步骤) + saga := dtmcli.NewSaga(dtmServer, dtmcli.MustGenGid(dtmServer)). + // 第一步:调用user-login服务进行登录验证 + Add( + loginServiceAddr+"/login", // 正向接口:用户登录 + loginServiceAddr+"/login/compensate", // 补偿接口:登录状态回滚 + LoginRequest{ + UserID: "user123", + Password: "encrypted_token_xxx", // 实际项目中使用加密凭证 + }, + ). + // 第二步:调用email服务发送登录通知邮件(依赖第一步成功) + Add( + emailServiceAddr+"/send", // 正向接口:发送邮件 + emailServiceAddr+"/send/compensate", // 补偿接口:邮件发送回滚 + EmailRequest{ + To: "user@example.com", + Subject: "登录成功通知", + Content: fmt.Sprintf("您的账号于%s成功登录系统", time.Now().Format("2006-01-02 15:04:05")), + }, + ) + + // 4. 提交事务 + if err := saga.Submit(); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("事务提交失败: %v", err)}) + return + } + + c.JSON(http.StatusOK, gin.H{ + "success": true, + "message": "分布式事务已提交", + "transactionId": saga.Gid, + "steps": []map[string]string{ + {"service": "user-login", "address": loginServiceAddr}, + {"service": "email", "address": emailServiceAddr}, + }, + }) +} + +func main() { + // 初始化etcd客户端 + etcdClient, err := initEtcdClient([]string{"http://127.0.0.1:2379"}) + if err != nil { + log.Fatalf("初始化etcd客户端失败: %v", err) + } + defer etcdClient.Close() + + // 初始化Gin引擎 + r := gin.Default() + + // 注册复合事务接口 + r.POST("/userAction", func(c *gin.Context) { + createCompositeTransaction(c, etcdClient) + }) + + // 启动业务服务 + log.Println("业务服务启动,监听端口 8081") + if err := r.Run(":8081"); err != nil { + log.Fatalf("服务启动失败: %v", err) + } +} diff --git a/goroutine/connectPool/goroutine/readgo.go b/scripts/goroutine/connectPool/goroutine/readgo.go similarity index 100% rename from goroutine/connectPool/goroutine/readgo.go rename to scripts/goroutine/connectPool/goroutine/readgo.go diff --git a/goroutine/connectPool/goroutine/updatego.go b/scripts/goroutine/connectPool/goroutine/updatego.go similarity index 100% rename from goroutine/connectPool/goroutine/updatego.go rename to scripts/goroutine/connectPool/goroutine/updatego.go diff --git a/goroutine/connectPool/goroutine/writego.go b/scripts/goroutine/connectPool/goroutine/writego.go similarity index 100% rename from goroutine/connectPool/goroutine/writego.go rename to scripts/goroutine/connectPool/goroutine/writego.go diff --git a/goroutine/connectPool/pool.go b/scripts/goroutine/connectPool/pool.go similarity index 76% rename from goroutine/connectPool/pool.go rename to scripts/goroutine/connectPool/pool.go index 31b1209..fb68c94 100644 --- a/goroutine/connectPool/pool.go +++ b/scripts/goroutine/connectPool/pool.go @@ -4,7 +4,7 @@ import ( "database/sql" "fmt" _ "github.com/go-sql-driver/mysql" - "toutoukan/goroutine/connectPool/goroutine" + goroutine2 "toutoukan/scripts/goroutine/connectPool/goroutine" ) var db *sql.DB @@ -19,9 +19,9 @@ func ConnectPool(donetitle chan struct{}) { } fmt.Println("数据库连接成功") for i := 0; i < 20; i++ { - go goroutine.Datawrite(db, donetitle) + go goroutine2.Datawrite(db, donetitle) } - go goroutine.Dataread(db, donetitle) + go goroutine2.Dataread(db, donetitle) } func DisconnectPool() { diff --git a/goroutine/consumer.go b/scripts/goroutine/consumer.go similarity index 100% rename from goroutine/consumer.go rename to scripts/goroutine/consumer.go diff --git a/goroutine/producer.go b/scripts/goroutine/producer.go similarity index 100% rename from goroutine/producer.go rename to scripts/goroutine/producer.go diff --git a/oneKUsers/mian.go b/scripts/oneKUsers/mian.go similarity index 100% rename from oneKUsers/mian.go rename to scripts/oneKUsers/mian.go diff --git a/services/mail/grpc/mailGrpc.go b/services/mail/grpc/mailGrpc.go new file mode 100644 index 0000000..3af9665 --- /dev/null +++ b/services/mail/grpc/mailGrpc.go @@ -0,0 +1,203 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "go.etcd.io/etcd/client/v3" +) + +// ServiceInstance 服务实例信息,增加更多可视化所需字段 +type ServiceInstance struct { + IP string `json:"ip"` + Port int `json:"port"` + Service string `json:"service"` + Timestamp string `json:"timestamp"` + Status string `json:"status"` // 服务状态:running/stopped等 + Metadata map[string]string `json:"metadata"` // 额外元数据,如服务版本等 +} + +// 初始化etcd客户端 +func initEtcdClient(endpoints []string) (*clientv3.Client, error) { + client, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 5 * time.Second, + }) + if err != nil { + return nil, fmt.Errorf("初始化etcd客户端失败: %v", err) + } + return client, nil +} + +// 注册服务到etcd - 显式注册,优化路径便于可视化 +func registerService(ctx context.Context, client *clientv3.Client, serviceName, ip string, port int, ttl int64) (clientv3.LeaseID, func(), error) { + // 优化服务在etcd中的存储键,使用更清晰的层次结构 + // 这种结构在可视化面板中会以目录树形式展示 + serviceKey := fmt.Sprintf("/microservices/%s/instances/%s:%d", serviceName, ip, port) + + // 准备服务实例信息,包含更多可视化所需的元数据 + instance := ServiceInstance{ + IP: ip, + Port: port, + Service: serviceName, + Timestamp: time.Now().Format(time.RFC3339), + Status: "running", + Metadata: map[string]string{ + "version": "1.0.0", + "protocol": "http", + "endpoint": "/send", + "lastCheck": time.Now().Format(time.RFC3339), + }, + } + + instanceData, err := json.MarshalIndent(instance, "", " ") // 格式化JSON,可视化更友好 + if err != nil { + return 0, nil, fmt.Errorf("序列化服务实例信息失败: %v", err) + } + + // 创建租约 + leaseResp, err := client.Grant(ctx, ttl) + if err != nil { + return 0, nil, fmt.Errorf("创建etcd租约失败: %v", err) + } + leaseID := leaseResp.ID + + // 将服务信息写入etcd + _, err = client.Put(ctx, serviceKey, string(instanceData), clientv3.WithLease(leaseID)) + if err != nil { + return 0, nil, fmt.Errorf("写入服务信息到etcd失败: %v", err) + } + + log.Printf("服务已显式注册到etcd, 服务键: %s, 租约ID: %d", serviceKey, leaseID) + + // 启动心跳保活 + keepAliveChan, err := client.KeepAlive(ctx, leaseID) + if err != nil { + return 0, nil, fmt.Errorf("启动心跳保活失败: %v", err) + } + + // 处理心跳响应,定期更新元数据中的lastCheck时间 + go func() { + for resp := range keepAliveChan { + log.Printf("服务心跳保活成功, 租约续期至 %d 秒后", resp.TTL) + + // 定期更新服务最后检查时间(每3次心跳更新一次) + if time.Now().Unix()%3 == 0 { + instance.Metadata["lastCheck"] = time.Now().Format(time.RFC3339) + updatedData, _ := json.MarshalIndent(instance, "", " ") + client.Put(ctx, serviceKey, string(updatedData), clientv3.WithLease(leaseID)) + } + } + log.Println("心跳保活通道已关闭, 服务可能需要重新注册") + }() + + // 服务注销函数 + unregister := func() { + // 更新服务状态为stopped + instance.Status = "stopped" + instance.Metadata["lastCheck"] = time.Now().Format(time.RFC3339) + updatedData, _ := json.MarshalIndent(instance, "", " ") + client.Put(ctx, serviceKey, string(updatedData), clientv3.WithLease(leaseID)) + + // 短暂延迟,确保状态更新被可视化面板捕获 + time.Sleep(500 * time.Millisecond) + + // 撤销租约 + _, err := client.Revoke(ctx, leaseID) + if err != nil { + log.Printf("撤销租约失败: %v", err) + return + } + // 主动删除服务信息 + _, err = client.Delete(ctx, serviceKey) + if err != nil { + log.Printf("删除服务信息失败: %v", err) + return + } + log.Printf("服务已从etcd注销, 服务键: %s", serviceKey) + } + + return leaseID, unregister, nil +} + +// 获取本地IP地址 +func getLocalIP() (string, error) { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "", err + } + + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + return ipnet.IP.String(), nil + } + } + } + + return "", fmt.Errorf("无法获取本地IP地址") +} + +// 消息发送处理函数 +func sendMessageHandler(w http.ResponseWriter, r *http.Request) { + // 输出"发送消息成功" + fmt.Println("发送消息成功") + w.WriteHeader(http.StatusOK) + w.Write([]byte("发送消息成功")) +} + +func main() { + // 服务配置 + serviceName := "email" + servicePort := 8080 + etcdEndpoints := []string{"http://127.0.0.1:2379"} + leaseTTL := int64(15) // 15秒租约 + + // 获取本地IP + localIP, err := getLocalIP() + if err != nil { + log.Fatalf("获取本地IP失败: %v", err) + } + + // 初始化etcd客户端 + etcdClient, err := initEtcdClient(etcdEndpoints) + if err != nil { + log.Fatalf("初始化etcd客户端失败: %v", err) + } + defer etcdClient.Close() + + // 显式注册服务到etcd + ctx := context.Background() + _, unregister, err := registerService(ctx, etcdClient, serviceName, localIP, servicePort, leaseTTL) + if err != nil { + log.Fatalf("服务注册失败: %v", err) + } + defer unregister() // 程序退出时注销服务 + + // 设置HTTP路由,处理消息发送请求 + http.HandleFunc("/send", sendMessageHandler) + + // 启动HTTP服务 + go func() { + addr := fmt.Sprintf(":%d", servicePort) + log.Printf("email微服务已启动,监听地址: %s", addr) + if err := http.ListenAndServe(addr, nil); err != nil && err != http.ErrServerClosed { + log.Fatalf("服务启动失败: %v", err) + } + }() + + // 等待退出信号 + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + log.Println("收到退出信号,正在关闭服务...") +} diff --git a/services/order/createOrder/grpc/createOrderGrpc.go b/services/order/createOrder/grpc/createOrderGrpc.go new file mode 100644 index 0000000..21e034e --- /dev/null +++ b/services/order/createOrder/grpc/createOrderGrpc.go @@ -0,0 +1 @@ +package grpc diff --git a/rocketmq/killmq/consumer/killConsumer.go b/services/order/killOrder/orderMq/consumer/killConsumer.go similarity index 98% rename from rocketmq/killmq/consumer/killConsumer.go rename to services/order/killOrder/orderMq/consumer/killConsumer.go index 84c13d7..e89eeac 100644 --- a/rocketmq/killmq/consumer/killConsumer.go +++ b/services/order/killOrder/orderMq/consumer/killConsumer.go @@ -74,7 +74,6 @@ func main() { return consumer.ConsumeSuccess, nil } - // 模拟业务处理(如更新订单状态、通知物流等) log.Printf( "收到消息: ID=%s, 订单ID=%s, 商品ID=%d, 用户ID=%d, 数量=%d", msg.MsgId, orderMsg.OrderID, orderMsg.GoodsID, orderMsg.UserID, orderMsg.Quantity, diff --git a/rocketmq/killmq/mqproducer/killProducer.go b/services/order/killOrder/orderMq/mqproducer/killProducer.go similarity index 100% rename from rocketmq/killmq/mqproducer/killProducer.go rename to services/order/killOrder/orderMq/mqproducer/killProducer.go diff --git a/services/order/killOrder/orderService/orderService.go b/services/order/killOrder/orderService/orderService.go new file mode 100644 index 0000000..3453e93 --- /dev/null +++ b/services/order/killOrder/orderService/orderService.go @@ -0,0 +1,5 @@ +package orderService + +func OrderService() { + +} diff --git a/services/order/payOrder/grpc/payOrderGrpc.go b/services/order/payOrder/grpc/payOrderGrpc.go new file mode 100644 index 0000000..83bd722 --- /dev/null +++ b/services/order/payOrder/grpc/payOrderGrpc.go @@ -0,0 +1,12 @@ +package main + +type PayOrder struct { +} + +func (P *PayOrder) GenerateOrderID() string { + return "" +} + +func PayOrderService() { + +} diff --git a/services/user/userLogin/grpc/user-consumer/main.go b/services/user/userLogin/grpc/user-consumer/main.go new file mode 100644 index 0000000..40efea2 --- /dev/null +++ b/services/user/userLogin/grpc/user-consumer/main.go @@ -0,0 +1,69 @@ +// client.go +package main + +import ( + "context" + "log" + "time" + + "go-micro.dev/v4" + "go-micro.dev/v4/client" + "go-micro.dev/v4/registry" + "go.etcd.io/etcd/client/v3" +) + +type LoginRequest struct { + Username string + Password string +} + +type LoginResponse struct { + Token string +} + +func main() { + // 1. 创建 etcd 客户端 + etcdClient, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"127.0.0.1:2379"}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + log.Fatal(err) + } + defer etcdClient.Close() + + // 2. 创建注册中心 + reg := registry.NewRegistry( + registry.Addrs("127.0.0.1:2379"), + ) + + // 3. 创建微服务客户端 + service := micro.NewService( + micro.Registry(reg), + ) + service.Init() + + // 4. 创建 RPC 客户端 + userService := micro.NewService( + micro.Name("user.client"), + micro.Registry(reg), + ) + userService.Init() + + // 5. 调用远程服务 + req := client.NewRequest( + "user.service", // 服务名 + "UserService.Login", // 方法名 + &LoginRequest{ // 请求参数 + Username: "admin", + Password: "123456", + }, + ) + + rsp := &LoginResponse{} + if err := userService.Client().Call(context.Background(), req, rsp); err != nil { + log.Fatal("调用失败:", err) + } + + log.Printf("登录成功,Token: %s", rsp.Token) +} diff --git a/services/user/userLogin/grpc/user-server/main.go b/services/user/userLogin/grpc/user-server/main.go new file mode 100644 index 0000000..71d2b05 --- /dev/null +++ b/services/user/userLogin/grpc/user-server/main.go @@ -0,0 +1,76 @@ +// server.go +package main + +import ( + "context" + "go-micro.dev/v4/errors" + "log" + "time" + + "go-micro.dev/v4" + "go-micro.dev/v4/registry" + "go.etcd.io/etcd/client/v3" +) + +// 定义请求和响应结构体 +type LoginRequest struct { + Username string + Password string +} + +type LoginResponse struct { + Token string +} + +// UserService 实现 +type UserService struct{} + +func (u *UserService) Login(ctx context.Context, req *LoginRequest, rsp *LoginResponse) error { + if req.Username == "admin" && req.Password == "123456" { + rsp.Token = "abc123" + return nil // 返回 nil 表示成功 + } + return errors.New("登录失败", "失败", 500) // 返回 error 表示失败 +} + +func main() { + // 1. 创建原生 etcd 客户端 + etcdClient, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"127.0.0.1:2379"}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + log.Fatal("创建 etcd 客户端失败:", err) + } + defer etcdClient.Close() + + // 2. 创建自定义的 etcd 注册中心 + reg := registry.NewRegistry( + registry.Addrs("127.0.0.1:2379"), + registry.Timeout(5*time.Second), + // 这里可以添加更多自定义配置 + ) + + // 3. 创建微服务实例 + service := micro.NewService( + micro.Name("user.service"), + micro.Registry(reg), + // 注入 etcd 客户端 + micro.BeforeStart(func() error { + return etcdClient.Sync(context.Background()) + }), + ) + + // 4. 初始化服务 + service.Init() + + // 5. 注册服务处理器 + if err := micro.RegisterHandler(service.Server(), new(UserService)); err != nil { + log.Fatal("注册服务处理器失败:", err) + } + + // 6. 启动服务 + if err := service.Run(); err != nil { + log.Fatal("服务启动失败:", err) + } +} diff --git a/utill/jwtUt.go b/utill/jwt/jwtUt.go similarity index 98% rename from utill/jwtUt.go rename to utill/jwt/jwtUt.go index 0b05f56..75e068b 100644 --- a/utill/jwtUt.go +++ b/utill/jwt/jwtUt.go @@ -1,11 +1,11 @@ -package utill +package jwt import ( "fmt" "github.com/gin-gonic/gin" "github.com/golang-jwt/jwt/v5" "time" - "toutoukan/config" + "toutoukan/init/config" "toutoukan/init/redisInit" "toutoukan/model/usermodel" ) diff --git a/utill/scripts/killscripts.go b/utill/luaScripts/killscripts.go similarity index 94% rename from utill/scripts/killscripts.go rename to utill/luaScripts/killscripts.go index ecee7e5..0083ef8 100644 --- a/utill/scripts/killscripts.go +++ b/utill/luaScripts/killscripts.go @@ -1,4 +1,4 @@ -package scripts +package luaScripts var Luascript_forkill = ` local key = KEYS[1]