Files
toutoukan/services/mail/grpc/mailGrpc.go
2025-09-15 11:09:22 +08:00

204 lines
5.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"go.etcd.io/etcd/client/v3"
)
// ServiceInstance 服务实例信息,增加更多可视化所需字段
type ServiceInstance struct {
IP string `json:"ip"`
Port int `json:"port"`
Service string `json:"service"`
Timestamp string `json:"timestamp"`
Status string `json:"status"` // 服务状态running/stopped等
Metadata map[string]string `json:"metadata"` // 额外元数据,如服务版本等
}
// 初始化etcd客户端
func initEtcdClient(endpoints []string) (*clientv3.Client, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("初始化etcd客户端失败: %v", err)
}
return client, nil
}
// 注册服务到etcd - 显式注册,优化路径便于可视化
func registerService(ctx context.Context, client *clientv3.Client, serviceName, ip string, port int, ttl int64) (clientv3.LeaseID, func(), error) {
// 优化服务在etcd中的存储键使用更清晰的层次结构
// 这种结构在可视化面板中会以目录树形式展示
serviceKey := fmt.Sprintf("/microservices/%s/instances/%s:%d", serviceName, ip, port)
// 准备服务实例信息,包含更多可视化所需的元数据
instance := ServiceInstance{
IP: ip,
Port: port,
Service: serviceName,
Timestamp: time.Now().Format(time.RFC3339),
Status: "running",
Metadata: map[string]string{
"version": "1.0.0",
"protocol": "http",
"endpoint": "/send",
"lastCheck": time.Now().Format(time.RFC3339),
},
}
instanceData, err := json.MarshalIndent(instance, "", " ") // 格式化JSON可视化更友好
if err != nil {
return 0, nil, fmt.Errorf("序列化服务实例信息失败: %v", err)
}
// 创建租约
leaseResp, err := client.Grant(ctx, ttl)
if err != nil {
return 0, nil, fmt.Errorf("创建etcd租约失败: %v", err)
}
leaseID := leaseResp.ID
// 将服务信息写入etcd
_, err = client.Put(ctx, serviceKey, string(instanceData), clientv3.WithLease(leaseID))
if err != nil {
return 0, nil, fmt.Errorf("写入服务信息到etcd失败: %v", err)
}
log.Printf("服务已显式注册到etcd, 服务键: %s, 租约ID: %d", serviceKey, leaseID)
// 启动心跳保活
keepAliveChan, err := client.KeepAlive(ctx, leaseID)
if err != nil {
return 0, nil, fmt.Errorf("启动心跳保活失败: %v", err)
}
// 处理心跳响应定期更新元数据中的lastCheck时间
go func() {
for resp := range keepAliveChan {
log.Printf("服务心跳保活成功, 租约续期至 %d 秒后", resp.TTL)
// 定期更新服务最后检查时间每3次心跳更新一次
if time.Now().Unix()%3 == 0 {
instance.Metadata["lastCheck"] = time.Now().Format(time.RFC3339)
updatedData, _ := json.MarshalIndent(instance, "", " ")
client.Put(ctx, serviceKey, string(updatedData), clientv3.WithLease(leaseID))
}
}
log.Println("心跳保活通道已关闭, 服务可能需要重新注册")
}()
// 服务注销函数
unregister := func() {
// 更新服务状态为stopped
instance.Status = "stopped"
instance.Metadata["lastCheck"] = time.Now().Format(time.RFC3339)
updatedData, _ := json.MarshalIndent(instance, "", " ")
client.Put(ctx, serviceKey, string(updatedData), clientv3.WithLease(leaseID))
// 短暂延迟,确保状态更新被可视化面板捕获
time.Sleep(500 * time.Millisecond)
// 撤销租约
_, err := client.Revoke(ctx, leaseID)
if err != nil {
log.Printf("撤销租约失败: %v", err)
return
}
// 主动删除服务信息
_, err = client.Delete(ctx, serviceKey)
if err != nil {
log.Printf("删除服务信息失败: %v", err)
return
}
log.Printf("服务已从etcd注销, 服务键: %s", serviceKey)
}
return leaseID, unregister, nil
}
// 获取本地IP地址
func getLocalIP() (string, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "", err
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String(), nil
}
}
}
return "", fmt.Errorf("无法获取本地IP地址")
}
// 消息发送处理函数
func sendMessageHandler(w http.ResponseWriter, r *http.Request) {
// 输出"发送消息成功"
fmt.Println("发送消息成功")
w.WriteHeader(http.StatusOK)
w.Write([]byte("发送消息成功"))
}
func main() {
// 服务配置
serviceName := "email"
servicePort := 8080
etcdEndpoints := []string{"http://127.0.0.1:2379"}
leaseTTL := int64(15) // 15秒租约
// 获取本地IP
localIP, err := getLocalIP()
if err != nil {
log.Fatalf("获取本地IP失败: %v", err)
}
// 初始化etcd客户端
etcdClient, err := initEtcdClient(etcdEndpoints)
if err != nil {
log.Fatalf("初始化etcd客户端失败: %v", err)
}
defer etcdClient.Close()
// 显式注册服务到etcd
ctx := context.Background()
_, unregister, err := registerService(ctx, etcdClient, serviceName, localIP, servicePort, leaseTTL)
if err != nil {
log.Fatalf("服务注册失败: %v", err)
}
defer unregister() // 程序退出时注销服务
// 设置HTTP路由处理消息发送请求
http.HandleFunc("/send", sendMessageHandler)
// 启动HTTP服务
go func() {
addr := fmt.Sprintf(":%d", servicePort)
log.Printf("email微服务已启动监听地址: %s", addr)
if err := http.ListenAndServe(addr, nil); err != nil && err != http.ErrServerClosed {
log.Fatalf("服务启动失败: %v", err)
}
}()
// 等待退出信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("收到退出信号,正在关闭服务...")
}