212 lines
5.8 KiB
Go
212 lines
5.8 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log"
|
||
"time"
|
||
|
||
clientv3 "go.etcd.io/etcd/client/v3"
|
||
)
|
||
|
||
const (
|
||
// etcd 服务器地址(gRPC 端口,默认 2379)
|
||
etcdEndpoint = "8.155.160.224:2379"
|
||
// 用户名
|
||
username = "admin"
|
||
// 密码(根据 Docker 配置更新)
|
||
password = "my45638my"
|
||
// 服务注册的 key 前缀
|
||
servicePrefix = "/services/"
|
||
)
|
||
|
||
// ServiceInfo 服务信息
|
||
type ServiceInfo struct {
|
||
Name string // 服务名称
|
||
Address string // 服务地址
|
||
Port int // 服务端口
|
||
Metadata map[string]string // 元数据
|
||
}
|
||
|
||
// EtcdRegistry etcd 注册器
|
||
type EtcdRegistry struct {
|
||
client *clientv3.Client
|
||
ctx context.Context
|
||
}
|
||
|
||
// NewEtcdRegistry 创建 etcd 注册器
|
||
func NewEtcdRegistry() (*EtcdRegistry, error) {
|
||
fmt.Printf("正在连接到 etcd 服务器: %s\n", etcdEndpoint)
|
||
|
||
// 先尝试不使用认证连接(因为 Docker 配置可能没有启用 RBAC)
|
||
config := clientv3.Config{
|
||
Endpoints: []string{etcdEndpoint},
|
||
DialTimeout: 10 * time.Second,
|
||
// 暂时不设置用户名密码,先测试连接
|
||
}
|
||
|
||
// 创建客户端
|
||
fmt.Println("正在创建 etcd 客户端...")
|
||
client, err := clientv3.New(config)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("创建 etcd 客户端失败: %w", err)
|
||
}
|
||
|
||
// 测试连接
|
||
fmt.Println("正在测试连接...")
|
||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||
defer cancel()
|
||
|
||
// 尝试获取一个不存在的 key 来测试连接
|
||
_, err = client.Get(ctx, "/test-connection", clientv3.WithLimit(1))
|
||
if err != nil {
|
||
// 如果是 context deadline exceeded,可能是网络问题
|
||
if ctx.Err() == context.DeadlineExceeded {
|
||
client.Close()
|
||
return nil, fmt.Errorf("连接 etcd 服务器超时\n可能的原因:\n1. etcd 服务器未启动或未正确运行\n2. 端口映射不正确 (Docker -p 2379:2379)\n3. 防火墙阻止了连接\n4. etcd 配置中的 --advertise-client-urls 应该使用实际 IP 而不是 0.0.0.0\n\n建议检查:\n- docker ps 查看 etcd 容器是否运行\n- docker logs etcd 查看 etcd 日志\n- 确认 Docker 端口映射正确")
|
||
}
|
||
// 如果是认证错误,尝试使用用户名密码
|
||
errStr := err.Error()
|
||
if errStr == "etcdserver: invalid auth token" ||
|
||
errStr == "etcdserver: user name is empty" ||
|
||
errStr == "etcdserver: authentication failed" {
|
||
fmt.Printf("检测到需要认证,尝试使用用户名密码连接...\n")
|
||
client.Close()
|
||
return newEtcdRegistryWithAuth()
|
||
}
|
||
// 其他错误
|
||
client.Close()
|
||
return nil, fmt.Errorf("连接测试失败: %w\n错误详情: %s", err, errStr)
|
||
}
|
||
|
||
fmt.Printf("成功连接到 etcd 服务器: %s (无需认证)\n", etcdEndpoint)
|
||
|
||
return &EtcdRegistry{
|
||
client: client,
|
||
ctx: context.Background(),
|
||
}, nil
|
||
}
|
||
|
||
// newEtcdRegistryWithAuth 使用认证创建 etcd 注册器
|
||
func newEtcdRegistryWithAuth() (*EtcdRegistry, error) {
|
||
config := clientv3.Config{
|
||
Endpoints: []string{etcdEndpoint},
|
||
DialTimeout: 10 * time.Second,
|
||
Username: username,
|
||
Password: password,
|
||
}
|
||
|
||
client, err := clientv3.New(config)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("创建 etcd 客户端失败: %w", err)
|
||
}
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||
defer cancel()
|
||
|
||
_, err = client.Get(ctx, "/test-connection", clientv3.WithLimit(1))
|
||
if err != nil {
|
||
client.Close()
|
||
return nil, fmt.Errorf("使用认证连接失败: %w (请检查用户名和密码)", err)
|
||
}
|
||
|
||
fmt.Printf("成功连接到 etcd 服务器: %s (用户: %s)\n", etcdEndpoint, username)
|
||
|
||
return &EtcdRegistry{
|
||
client: client,
|
||
ctx: context.Background(),
|
||
}, nil
|
||
}
|
||
|
||
// Register 注册服务到 etcd
|
||
func (r *EtcdRegistry) Register(service *ServiceInfo) error {
|
||
// 构建服务 key
|
||
serviceKey := fmt.Sprintf("%s%s", servicePrefix, service.Name)
|
||
|
||
// 构建服务 value(可以是 JSON 格式或其他格式)
|
||
serviceValue := fmt.Sprintf("%s:%d", service.Address, service.Port)
|
||
|
||
// 设置租约,用于服务发现和健康检查
|
||
// 租约时间为 30 秒,服务需要定期续约
|
||
lease, err := r.client.Grant(r.ctx, 30)
|
||
if err != nil {
|
||
return fmt.Errorf("创建租约失败: %w", err)
|
||
}
|
||
|
||
// 将服务信息写入 etcd,并关联租约
|
||
_, err = r.client.Put(r.ctx, serviceKey, serviceValue, clientv3.WithLease(lease.ID))
|
||
if err != nil {
|
||
return fmt.Errorf("注册服务失败: %w", err)
|
||
}
|
||
|
||
fmt.Printf("服务注册成功: key=%s, value=%s\n", serviceKey, serviceValue)
|
||
|
||
// 启动续约协程,保持服务在线
|
||
go r.keepAlive(lease.ID)
|
||
|
||
return nil
|
||
}
|
||
|
||
// keepAlive 保持租约活跃
|
||
func (r *EtcdRegistry) keepAlive(leaseID clientv3.LeaseID) {
|
||
ch, err := r.client.KeepAlive(r.ctx, leaseID)
|
||
if err != nil {
|
||
log.Printf("续约失败: %v\n", err)
|
||
return
|
||
}
|
||
|
||
for ka := range ch {
|
||
if ka != nil {
|
||
log.Printf("租约续约成功, ID: %d, TTL: %d\n", ka.ID, ka.TTL)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Unregister 注销服务
|
||
func (r *EtcdRegistry) Unregister(serviceName string) error {
|
||
serviceKey := fmt.Sprintf("%s%s", servicePrefix, serviceName)
|
||
_, err := r.client.Delete(r.ctx, serviceKey)
|
||
if err != nil {
|
||
return fmt.Errorf("注销服务失败: %w", err)
|
||
}
|
||
|
||
fmt.Printf("服务注销成功: %s\n", serviceKey)
|
||
return nil
|
||
}
|
||
|
||
// Close 关闭连接
|
||
func (r *EtcdRegistry) Close() error {
|
||
return r.client.Close()
|
||
}
|
||
|
||
func main() {
|
||
// 创建注册器
|
||
registry, err := NewEtcdRegistry()
|
||
if err != nil {
|
||
log.Fatalf("初始化注册器失败: %v\n", err)
|
||
}
|
||
defer registry.Close()
|
||
|
||
// 创建服务信息
|
||
service := &ServiceInfo{
|
||
Name: "my-service",
|
||
Address: "localhost",
|
||
Port: 8080,
|
||
Metadata: map[string]string{
|
||
"version": "1.0.0",
|
||
"env": "production",
|
||
},
|
||
}
|
||
|
||
// 注册服务
|
||
err = registry.Register(service)
|
||
if err != nil {
|
||
log.Fatalf("注册服务失败: %v\n", err)
|
||
}
|
||
|
||
fmt.Println("服务已注册,按 Ctrl+C 退出...")
|
||
|
||
// 保持程序运行
|
||
select {}
|
||
}
|