package main import ( "context" "encoding/json" "fmt" "log" "net" "net/http" "os" "os/signal" "syscall" "time" "go.etcd.io/etcd/client/v3" ) // ServiceInstance 服务实例信息,增加更多可视化所需字段 type ServiceInstance struct { IP string `json:"ip"` Port int `json:"port"` Service string `json:"service"` Timestamp string `json:"timestamp"` Status string `json:"status"` // 服务状态:running/stopped等 Metadata map[string]string `json:"metadata"` // 额外元数据,如服务版本等 } // 初始化etcd客户端 func initEtcdClient(endpoints []string) (*clientv3.Client, error) { client, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, }) if err != nil { return nil, fmt.Errorf("初始化etcd客户端失败: %v", err) } return client, nil } // 注册服务到etcd - 显式注册,优化路径便于可视化 func registerService(ctx context.Context, client *clientv3.Client, serviceName, ip string, port int, ttl int64) (clientv3.LeaseID, func(), error) { // 优化服务在etcd中的存储键,使用更清晰的层次结构 // 这种结构在可视化面板中会以目录树形式展示 serviceKey := fmt.Sprintf("/microservices/%s/instances/%s:%d", serviceName, ip, port) // 准备服务实例信息,包含更多可视化所需的元数据 instance := ServiceInstance{ IP: ip, Port: port, Service: serviceName, Timestamp: time.Now().Format(time.RFC3339), Status: "running", Metadata: map[string]string{ "version": "1.0.0", "protocol": "http", "endpoint": "/send", "lastCheck": time.Now().Format(time.RFC3339), }, } instanceData, err := json.MarshalIndent(instance, "", " ") // 格式化JSON,可视化更友好 if err != nil { return 0, nil, fmt.Errorf("序列化服务实例信息失败: %v", err) } // 创建租约 leaseResp, err := client.Grant(ctx, ttl) if err != nil { return 0, nil, fmt.Errorf("创建etcd租约失败: %v", err) } leaseID := leaseResp.ID // 将服务信息写入etcd _, err = client.Put(ctx, serviceKey, string(instanceData), clientv3.WithLease(leaseID)) if err != nil { return 0, nil, fmt.Errorf("写入服务信息到etcd失败: %v", err) } log.Printf("服务已显式注册到etcd, 服务键: %s, 租约ID: %d", serviceKey, leaseID) // 启动心跳保活 keepAliveChan, err := client.KeepAlive(ctx, leaseID) if err != nil { return 0, nil, fmt.Errorf("启动心跳保活失败: %v", err) } // 处理心跳响应,定期更新元数据中的lastCheck时间 go func() { for resp := range keepAliveChan { log.Printf("服务心跳保活成功, 租约续期至 %d 秒后", resp.TTL) // 定期更新服务最后检查时间(每3次心跳更新一次) if time.Now().Unix()%3 == 0 { instance.Metadata["lastCheck"] = time.Now().Format(time.RFC3339) updatedData, _ := json.MarshalIndent(instance, "", " ") client.Put(ctx, serviceKey, string(updatedData), clientv3.WithLease(leaseID)) } } log.Println("心跳保活通道已关闭, 服务可能需要重新注册") }() // 服务注销函数 unregister := func() { // 更新服务状态为stopped instance.Status = "stopped" instance.Metadata["lastCheck"] = time.Now().Format(time.RFC3339) updatedData, _ := json.MarshalIndent(instance, "", " ") client.Put(ctx, serviceKey, string(updatedData), clientv3.WithLease(leaseID)) // 短暂延迟,确保状态更新被可视化面板捕获 time.Sleep(500 * time.Millisecond) // 撤销租约 _, err := client.Revoke(ctx, leaseID) if err != nil { log.Printf("撤销租约失败: %v", err) return } // 主动删除服务信息 _, err = client.Delete(ctx, serviceKey) if err != nil { log.Printf("删除服务信息失败: %v", err) return } log.Printf("服务已从etcd注销, 服务键: %s", serviceKey) } return leaseID, unregister, nil } // 获取本地IP地址 func getLocalIP() (string, error) { addrs, err := net.InterfaceAddrs() if err != nil { return "", err } for _, addr := range addrs { if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { if ipnet.IP.To4() != nil { return ipnet.IP.String(), nil } } } return "", fmt.Errorf("无法获取本地IP地址") } // 消息发送处理函数 func sendMessageHandler(w http.ResponseWriter, r *http.Request) { // 输出"发送消息成功" fmt.Println("发送消息成功") w.WriteHeader(http.StatusOK) w.Write([]byte("发送消息成功")) } func main() { // 服务配置 serviceName := "email" servicePort := 8080 etcdEndpoints := []string{"http://127.0.0.1:2379"} leaseTTL := int64(15) // 15秒租约 // 获取本地IP localIP, err := getLocalIP() if err != nil { log.Fatalf("获取本地IP失败: %v", err) } // 初始化etcd客户端 etcdClient, err := initEtcdClient(etcdEndpoints) if err != nil { log.Fatalf("初始化etcd客户端失败: %v", err) } defer etcdClient.Close() // 显式注册服务到etcd ctx := context.Background() _, unregister, err := registerService(ctx, etcdClient, serviceName, localIP, servicePort, leaseTTL) if err != nil { log.Fatalf("服务注册失败: %v", err) } defer unregister() // 程序退出时注销服务 // 设置HTTP路由,处理消息发送请求 http.HandleFunc("/send", sendMessageHandler) // 启动HTTP服务 go func() { addr := fmt.Sprintf(":%d", servicePort) log.Printf("email微服务已启动,监听地址: %s", addr) if err := http.ListenAndServe(addr, nil); err != nil && err != http.ErrServerClosed { log.Fatalf("服务启动失败: %v", err) } }() // 等待退出信号 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan log.Println("收到退出信号,正在关闭服务...") }