Files

212 lines
5.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"context"
"fmt"
"log"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
const (
// etcd 服务器地址gRPC 端口,默认 2379
etcdEndpoint = "8.155.160.224:2379"
// 用户名
username = "admin"
// 密码(根据 Docker 配置更新)
password = "my45638my"
// 服务注册的 key 前缀
servicePrefix = "/services/"
)
// ServiceInfo 服务信息
type ServiceInfo struct {
Name string // 服务名称
Address string // 服务地址
Port int // 服务端口
Metadata map[string]string // 元数据
}
// EtcdRegistry etcd 注册器
type EtcdRegistry struct {
client *clientv3.Client
ctx context.Context
}
// NewEtcdRegistry 创建 etcd 注册器
func NewEtcdRegistry() (*EtcdRegistry, error) {
fmt.Printf("正在连接到 etcd 服务器: %s\n", etcdEndpoint)
// 先尝试不使用认证连接(因为 Docker 配置可能没有启用 RBAC
config := clientv3.Config{
Endpoints: []string{etcdEndpoint},
DialTimeout: 10 * time.Second,
// 暂时不设置用户名密码,先测试连接
}
// 创建客户端
fmt.Println("正在创建 etcd 客户端...")
client, err := clientv3.New(config)
if err != nil {
return nil, fmt.Errorf("创建 etcd 客户端失败: %w", err)
}
// 测试连接
fmt.Println("正在测试连接...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 尝试获取一个不存在的 key 来测试连接
_, err = client.Get(ctx, "/test-connection", clientv3.WithLimit(1))
if err != nil {
// 如果是 context deadline exceeded可能是网络问题
if ctx.Err() == context.DeadlineExceeded {
client.Close()
return nil, fmt.Errorf("连接 etcd 服务器超时\n可能的原因:\n1. etcd 服务器未启动或未正确运行\n2. 端口映射不正确 (Docker -p 2379:2379)\n3. 防火墙阻止了连接\n4. etcd 配置中的 --advertise-client-urls 应该使用实际 IP 而不是 0.0.0.0\n\n建议检查:\n- docker ps 查看 etcd 容器是否运行\n- docker logs etcd 查看 etcd 日志\n- 确认 Docker 端口映射正确")
}
// 如果是认证错误,尝试使用用户名密码
errStr := err.Error()
if errStr == "etcdserver: invalid auth token" ||
errStr == "etcdserver: user name is empty" ||
errStr == "etcdserver: authentication failed" {
fmt.Printf("检测到需要认证,尝试使用用户名密码连接...\n")
client.Close()
return newEtcdRegistryWithAuth()
}
// 其他错误
client.Close()
return nil, fmt.Errorf("连接测试失败: %w\n错误详情: %s", err, errStr)
}
fmt.Printf("成功连接到 etcd 服务器: %s (无需认证)\n", etcdEndpoint)
return &EtcdRegistry{
client: client,
ctx: context.Background(),
}, nil
}
// newEtcdRegistryWithAuth 使用认证创建 etcd 注册器
func newEtcdRegistryWithAuth() (*EtcdRegistry, error) {
config := clientv3.Config{
Endpoints: []string{etcdEndpoint},
DialTimeout: 10 * time.Second,
Username: username,
Password: password,
}
client, err := clientv3.New(config)
if err != nil {
return nil, fmt.Errorf("创建 etcd 客户端失败: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err = client.Get(ctx, "/test-connection", clientv3.WithLimit(1))
if err != nil {
client.Close()
return nil, fmt.Errorf("使用认证连接失败: %w (请检查用户名和密码)", err)
}
fmt.Printf("成功连接到 etcd 服务器: %s (用户: %s)\n", etcdEndpoint, username)
return &EtcdRegistry{
client: client,
ctx: context.Background(),
}, nil
}
// Register 注册服务到 etcd
func (r *EtcdRegistry) Register(service *ServiceInfo) error {
// 构建服务 key
serviceKey := fmt.Sprintf("%s%s", servicePrefix, service.Name)
// 构建服务 value可以是 JSON 格式或其他格式)
serviceValue := fmt.Sprintf("%s:%d", service.Address, service.Port)
// 设置租约,用于服务发现和健康检查
// 租约时间为 30 秒,服务需要定期续约
lease, err := r.client.Grant(r.ctx, 30)
if err != nil {
return fmt.Errorf("创建租约失败: %w", err)
}
// 将服务信息写入 etcd并关联租约
_, err = r.client.Put(r.ctx, serviceKey, serviceValue, clientv3.WithLease(lease.ID))
if err != nil {
return fmt.Errorf("注册服务失败: %w", err)
}
fmt.Printf("服务注册成功: key=%s, value=%s\n", serviceKey, serviceValue)
// 启动续约协程,保持服务在线
go r.keepAlive(lease.ID)
return nil
}
// keepAlive 保持租约活跃
func (r *EtcdRegistry) keepAlive(leaseID clientv3.LeaseID) {
ch, err := r.client.KeepAlive(r.ctx, leaseID)
if err != nil {
log.Printf("续约失败: %v\n", err)
return
}
for ka := range ch {
if ka != nil {
log.Printf("租约续约成功, ID: %d, TTL: %d\n", ka.ID, ka.TTL)
}
}
}
// Unregister 注销服务
func (r *EtcdRegistry) Unregister(serviceName string) error {
serviceKey := fmt.Sprintf("%s%s", servicePrefix, serviceName)
_, err := r.client.Delete(r.ctx, serviceKey)
if err != nil {
return fmt.Errorf("注销服务失败: %w", err)
}
fmt.Printf("服务注销成功: %s\n", serviceKey)
return nil
}
// Close 关闭连接
func (r *EtcdRegistry) Close() error {
return r.client.Close()
}
func main() {
// 创建注册器
registry, err := NewEtcdRegistry()
if err != nil {
log.Fatalf("初始化注册器失败: %v\n", err)
}
defer registry.Close()
// 创建服务信息
service := &ServiceInfo{
Name: "my-service",
Address: "localhost",
Port: 8080,
Metadata: map[string]string{
"version": "1.0.0",
"env": "production",
},
}
// 注册服务
err = registry.Register(service)
if err != nil {
log.Fatalf("注册服务失败: %v\n", err)
}
fmt.Println("服务已注册,按 Ctrl+C 退出...")
// 保持程序运行
select {}
}