204 lines
5.8 KiB
Go
204 lines
5.8 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log"
|
||
"net"
|
||
"net/http"
|
||
"os"
|
||
"os/signal"
|
||
"syscall"
|
||
"time"
|
||
|
||
"go.etcd.io/etcd/client/v3"
|
||
)
|
||
|
||
// ServiceInstance 服务实例信息,增加更多可视化所需字段
|
||
type ServiceInstance struct {
|
||
IP string `json:"ip"`
|
||
Port int `json:"port"`
|
||
Service string `json:"service"`
|
||
Timestamp string `json:"timestamp"`
|
||
Status string `json:"status"` // 服务状态:running/stopped等
|
||
Metadata map[string]string `json:"metadata"` // 额外元数据,如服务版本等
|
||
}
|
||
|
||
// 初始化etcd客户端
|
||
func initEtcdClient(endpoints []string) (*clientv3.Client, error) {
|
||
client, err := clientv3.New(clientv3.Config{
|
||
Endpoints: endpoints,
|
||
DialTimeout: 5 * time.Second,
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("初始化etcd客户端失败: %v", err)
|
||
}
|
||
return client, nil
|
||
}
|
||
|
||
// 注册服务到etcd - 显式注册,优化路径便于可视化
|
||
func registerService(ctx context.Context, client *clientv3.Client, serviceName, ip string, port int, ttl int64) (clientv3.LeaseID, func(), error) {
|
||
// 优化服务在etcd中的存储键,使用更清晰的层次结构
|
||
// 这种结构在可视化面板中会以目录树形式展示
|
||
serviceKey := fmt.Sprintf("/microservices/%s/instances/%s:%d", serviceName, ip, port)
|
||
|
||
// 准备服务实例信息,包含更多可视化所需的元数据
|
||
instance := ServiceInstance{
|
||
IP: ip,
|
||
Port: port,
|
||
Service: serviceName,
|
||
Timestamp: time.Now().Format(time.RFC3339),
|
||
Status: "running",
|
||
Metadata: map[string]string{
|
||
"version": "1.0.0",
|
||
"protocol": "http",
|
||
"endpoint": "/send",
|
||
"lastCheck": time.Now().Format(time.RFC3339),
|
||
},
|
||
}
|
||
|
||
instanceData, err := json.MarshalIndent(instance, "", " ") // 格式化JSON,可视化更友好
|
||
if err != nil {
|
||
return 0, nil, fmt.Errorf("序列化服务实例信息失败: %v", err)
|
||
}
|
||
|
||
// 创建租约
|
||
leaseResp, err := client.Grant(ctx, ttl)
|
||
if err != nil {
|
||
return 0, nil, fmt.Errorf("创建etcd租约失败: %v", err)
|
||
}
|
||
leaseID := leaseResp.ID
|
||
|
||
// 将服务信息写入etcd
|
||
_, err = client.Put(ctx, serviceKey, string(instanceData), clientv3.WithLease(leaseID))
|
||
if err != nil {
|
||
return 0, nil, fmt.Errorf("写入服务信息到etcd失败: %v", err)
|
||
}
|
||
|
||
log.Printf("服务已显式注册到etcd, 服务键: %s, 租约ID: %d", serviceKey, leaseID)
|
||
|
||
// 启动心跳保活
|
||
keepAliveChan, err := client.KeepAlive(ctx, leaseID)
|
||
if err != nil {
|
||
return 0, nil, fmt.Errorf("启动心跳保活失败: %v", err)
|
||
}
|
||
|
||
// 处理心跳响应,定期更新元数据中的lastCheck时间
|
||
go func() {
|
||
for resp := range keepAliveChan {
|
||
log.Printf("服务心跳保活成功, 租约续期至 %d 秒后", resp.TTL)
|
||
|
||
// 定期更新服务最后检查时间(每3次心跳更新一次)
|
||
if time.Now().Unix()%3 == 0 {
|
||
instance.Metadata["lastCheck"] = time.Now().Format(time.RFC3339)
|
||
updatedData, _ := json.MarshalIndent(instance, "", " ")
|
||
client.Put(ctx, serviceKey, string(updatedData), clientv3.WithLease(leaseID))
|
||
}
|
||
}
|
||
log.Println("心跳保活通道已关闭, 服务可能需要重新注册")
|
||
}()
|
||
|
||
// 服务注销函数
|
||
unregister := func() {
|
||
// 更新服务状态为stopped
|
||
instance.Status = "stopped"
|
||
instance.Metadata["lastCheck"] = time.Now().Format(time.RFC3339)
|
||
updatedData, _ := json.MarshalIndent(instance, "", " ")
|
||
client.Put(ctx, serviceKey, string(updatedData), clientv3.WithLease(leaseID))
|
||
|
||
// 短暂延迟,确保状态更新被可视化面板捕获
|
||
time.Sleep(500 * time.Millisecond)
|
||
|
||
// 撤销租约
|
||
_, err := client.Revoke(ctx, leaseID)
|
||
if err != nil {
|
||
log.Printf("撤销租约失败: %v", err)
|
||
return
|
||
}
|
||
// 主动删除服务信息
|
||
_, err = client.Delete(ctx, serviceKey)
|
||
if err != nil {
|
||
log.Printf("删除服务信息失败: %v", err)
|
||
return
|
||
}
|
||
log.Printf("服务已从etcd注销, 服务键: %s", serviceKey)
|
||
}
|
||
|
||
return leaseID, unregister, nil
|
||
}
|
||
|
||
// 获取本地IP地址
|
||
func getLocalIP() (string, error) {
|
||
addrs, err := net.InterfaceAddrs()
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
for _, addr := range addrs {
|
||
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
|
||
if ipnet.IP.To4() != nil {
|
||
return ipnet.IP.String(), nil
|
||
}
|
||
}
|
||
}
|
||
|
||
return "", fmt.Errorf("无法获取本地IP地址")
|
||
}
|
||
|
||
// 消息发送处理函数
|
||
func sendMessageHandler(w http.ResponseWriter, r *http.Request) {
|
||
// 输出"发送消息成功"
|
||
fmt.Println("发送消息成功")
|
||
w.WriteHeader(http.StatusOK)
|
||
w.Write([]byte("发送消息成功"))
|
||
}
|
||
|
||
func main() {
|
||
// 服务配置
|
||
serviceName := "email"
|
||
servicePort := 8080
|
||
etcdEndpoints := []string{"http://127.0.0.1:2379"}
|
||
leaseTTL := int64(15) // 15秒租约
|
||
|
||
// 获取本地IP
|
||
localIP, err := getLocalIP()
|
||
if err != nil {
|
||
log.Fatalf("获取本地IP失败: %v", err)
|
||
}
|
||
|
||
// 初始化etcd客户端
|
||
etcdClient, err := initEtcdClient(etcdEndpoints)
|
||
if err != nil {
|
||
log.Fatalf("初始化etcd客户端失败: %v", err)
|
||
}
|
||
defer etcdClient.Close()
|
||
|
||
// 显式注册服务到etcd
|
||
ctx := context.Background()
|
||
_, unregister, err := registerService(ctx, etcdClient, serviceName, localIP, servicePort, leaseTTL)
|
||
if err != nil {
|
||
log.Fatalf("服务注册失败: %v", err)
|
||
}
|
||
defer unregister() // 程序退出时注销服务
|
||
|
||
// 设置HTTP路由,处理消息发送请求
|
||
http.HandleFunc("/send", sendMessageHandler)
|
||
|
||
// 启动HTTP服务
|
||
go func() {
|
||
addr := fmt.Sprintf(":%d", servicePort)
|
||
log.Printf("email微服务已启动,监听地址: %s", addr)
|
||
if err := http.ListenAndServe(addr, nil); err != nil && err != http.ErrServerClosed {
|
||
log.Fatalf("服务启动失败: %v", err)
|
||
}
|
||
}()
|
||
|
||
// 等待退出信号
|
||
sigChan := make(chan os.Signal, 1)
|
||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||
<-sigChan
|
||
|
||
log.Println("收到退出信号,正在关闭服务...")
|
||
}
|