etcd golang实践服务注册发现

 : jank    :   : 619    : 2019-06-28 10:19  go

  1. etcd是使用go语言开发的服务注册发现的一个工具,在微服务架构中有广泛的运用,关于具体的使用介绍可以查看官方文档:https://github.com/etcd-io/etcd/tree/master/Documentation

  2. 下面主要介绍如何在go服务中实践etcd 的服务注册与发现功能,所谓的服务注册与发现。

  3. 服务注册:简单讲就是在分布式服务中,每一个服务在启动后都向etcd中心发送一个注册请求(put) ,为了能够证明该服务一直存活,需要向etcd服务申请一个租约(lease)并设定租期, 同时需要把这个租约设置成keepalive 形式,这样在租期到了,自动进行续约。(整个场景类似发送心跳包)。在使用etcd 注册时,同一组服务群,需要有一个相同的key prefix,

       如:prefix 为/server/

          key  /server/1  value 127.0.0.1:8081

            key  /server/2  value 127.0.0.1:8082

            key  /server/3  value 127.0.0.1:8083

    具体代码示例如下:

//etcd server register
package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)

const (
	ETCD_HANDLE_TIMEOUT = 3 * time.Second
)

type EtcdRegister struct {
	cli           *clientv3.Client
	Endpoints     []string
	leaseRes      *clientv3.LeaseGrantResponse
	keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
}

var TS *EtcdRegister

func main() {
	var err error
	config := clientv3.Config{
		Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
		DialTimeout: 1 * time.Second,
		Username:    "root",
		Password:    "jank1369",
	}
	TS, err = NewEtcdRegister(config)
	if err != nil {
		return
	}
	TS.StartRegister("/server/4", "127.0.0.1:8084", 1)
	time.Sleep(500 * time.Second)
}

//new
func NewEtcdRegister(config clientv3.Config) (*EtcdRegister, error) {
	cl, err := clientv3.New(config)
	if err != nil {
		return nil, err
	}
	er := &EtcdRegister{
		cli:       cl,
		Endpoints: config.Endpoints,
	}
	return er, err
}

func (er *EtcdRegister) StartRegister(key, value string, ttl int64) error {
	err := er.GrantAndSetKeepAlive(ttl)
	if err != nil {
		fmt.Println("EtcdRegister.StartRegister: grant and set keepalive failed:%s", err)
		return err
	}
	err = er.PutWithLease(key, value)
	if err != nil {
		fmt.Println("EtcdRegister.StartRegister: put with lease failed:%s", err)
		return err
	}
	go TS.ListenLease()
	return nil
}

//put
func (er *EtcdRegister) Put(key, value string) error {
	//	kv := er.NewPV()
	ctx, cancel := context.WithTimeout(context.Background(), ETCD_HANDLE_TIMEOUT)
	res, err := er.cli.Put(ctx, key, value)
	cancel()
	if err != nil {
		switch err {
		case context.Canceled:
			fmt.Printf("ctx is canceled by another routine: %v
", err)
		case context.DeadlineExceeded:
			fmt.Printf("ctx is attached with a deadline is exceeded: %v
", err)
			//	case rpctypes.ErrEmptyKey:
			//		fmt.Printf("client-side error: %v
", err)
		default:
			fmt.Printf("bad cluster endpoints, which are not etcd servers: %v
", err)
		}
		return err
	}
	fmt.Printf("put success, res: %#v 
", res)
	return nil
}

//put and with lease
func (er *EtcdRegister) PutWithLease(key, value string) error {
	//	kv := er.NewPV()
	ctx, cancel := context.WithTimeout(context.Background(), ETCD_HANDLE_TIMEOUT)
	res, err := er.cli.Put(ctx, key, value, clientv3.WithLease(er.leaseRes.ID))
	cancel()
	if err != nil {
		switch err {
		case context.Canceled:
			fmt.Printf("ctx is canceled by another routine: %v
", err)
		case context.DeadlineExceeded:
			fmt.Printf("ctx is attached with a deadline is exceeded: %v
", err)
			//	case rpctypes.ErrEmptyKey:
			//		fmt.Printf("client-side error: %v
", err)
		default:
			fmt.Printf("bad cluster endpoints, which are not etcd servers: %v
", err)
		}
		return err
	}
	fmt.Printf("put with lease success, res: %#v 
", res)
	return nil
}

//get
func (er *EtcdRegister) GetByPrefix(prefix string) (*clientv3.GetResponse, error) {
	ctx, cancel := context.WithTimeout(context.Background(), ETCD_HANDLE_TIMEOUT)
	res, err := er.cli.Get(ctx, prefix, clientv3.WithPrefix())
	cancel()
	if err != nil {
		switch err {
		case context.Canceled:
			fmt.Printf("ctx is canceled by another routine: %v
", err)
		case context.DeadlineExceeded:
			fmt.Printf("ctx is attached with a deadline is exceeded: %v
", err)
			//	case rpctypes.ErrEmptyKey:
			//		fmt.Printf("client-side error: %v
", err)
		default:
			fmt.Printf("bad cluster endpoints, which are not etcd servers: %v
", err)
		}
		return nil, err
	}
	fmt.Printf("ctx get result: %#v 
", res)
	for _, v := range res.Kvs {
		fmt.Printf("k: %s, v: %s 
", v.Key, v.Value)
	}
	return res, nil
}

//grant and set keepalive
func (er *EtcdRegister) GrantAndSetKeepAlive(ttl int64) error {
	leaseRes, err := er.cli.Lease.Grant(context.TODO(), ttl)
	if err != nil {
		return err
	}
	er.leaseRes = leaseRes
	fmt.Println("lease:  %v 
", leaseRes.ID)
	res, err := er.cli.Lease.KeepAlive(context.TODO(), er.leaseRes.ID)
	if err != nil {
		return err
	}
	er.keepAliveChan = res
	return nil
}

//listen lease
func (er *EtcdRegister) ListenLease() {
	for {
		select {
		case res := <-er.keepAliveChan:
			if res == nil {
				fmt.Println("lease close")
				return
			}
			fmt.Println("lease success")
		}
	}
}


   4.服务发现:就是在服务的使用方,对需要调用的服务进行一个监控,在etcd 中可以使用watchwithprefix, 如上所示服务群,则监控 /server/, 实践过程中,可以在服务启动的时候拉去完整的服务列表并缓存下来,然后watch 该服务群的 prefix,当出现PUT/DEL,则进行修改本地的缓存列表。

    具体代码示例如下:

//etcd server discover
package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"go.etcd.io/etcd/mvcc/mvccpb"
	"sync"
	"time"
)

const (
	ETCD_HANDLE_TIMEOUT = 3 * time.Second
)

type EtcdDiscover struct {
	cli        *clientv3.Client
	Endpoints  []string
	ServerList map[string]string
	sync.RWMutex
}

var TS *EtcdDiscover

func main() {
	var err error
	config := clientv3.Config{
		Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
		DialTimeout: 3 * time.Second,
		Username:    "root",
		Password:    "jank1369",
	}
	TS, err = NewEtcdDiscover(config)
	if err != nil {
		return
	}
	res, err := TS.GetByPrefix("/server")
	if err != nil {
		return
	}
	if res.Count > 0 {
		for _, v := range res.Kvs {
			TS.ServerList[string(v.Key)] = string(v.Value)
		}
	}
	go TS.WatchWithPrefix("/server")

	//test
	go func() {
		for {
		        TS.RLock()
			fmt.Printf("ServerList: %#v 
", TS.ServerList)
			TS.RUnlock()
			time.Sleep(3 * time.Second)
		}
	}()
	time.Sleep(500 * time.Second)
}

//new
func NewEtcdDiscover(config clientv3.Config) (*EtcdDiscover, error) {
	cl, err := clientv3.New(config)
	if err != nil {
		return nil, err
	}
	er := &EtcdDiscover{
		cli:        cl,
		Endpoints:  config.Endpoints,
		ServerList: make(map[string]string),
	}
	return er, err
}

//get by prefix
func (er *EtcdDiscover) GetByPrefix(prefix string) (*clientv3.GetResponse, error) {
	ctx, cancel := context.WithTimeout(context.Background(), ETCD_HANDLE_TIMEOUT)
	res, err := er.cli.Get(ctx, prefix, clientv3.WithPrefix())
	cancel()
	if err != nil {
		switch err {
		case context.Canceled:
			fmt.Printf("ctx is canceled by another routine: %v
", err)
		case context.DeadlineExceeded:
			fmt.Printf("ctx is attached with a deadline is exceeded: %v
", err)
			//	case rpctypes.ErrEmptyKey:
			//		fmt.Printf("client-side error: %v
", err)
		default:
			fmt.Printf("bad cluster endpoints, which are not etcd servers: %v
", err)
		}
		return nil, err
	}
	fmt.Printf("ctx get result: %#v 
", res)
	for _, v := range res.Kvs {
		fmt.Printf("k: %s, v: %s 
", v.Key, v.Value)
	}
	return res, nil
}

//watch
func (er *EtcdDiscover) WatchWithPrefix(prefix string) {
	rch := er.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
	for wresp := range rch {
		for _, ev := range wresp.Events {
			fmt.Printf("%s %q : %q
", ev.Type, ev.Kv.Key, ev.Kv.Value)
			switch ev.Type {
			case mvccpb.PUT:
				fmt.Println("get a new put")
				// add local map
				er.Add(string(ev.Kv.Key), string(ev.Kv.Value))
			case mvccpb.DELETE:
				//del local map
				er.Del(string(ev.Kv.Key))
			}
		}
	}
}

//add
func (er *EtcdDiscover) Add(key, val string) {
	er.Lock()
	er.ServerList[key] = val
	er.Unlock()
}

//del
func (er *EtcdDiscover) Del(key string) {
	er.Lock()
	delete(er.ServerList, key)
	er.Unlock()
}



    到此一个简单的go 服务发现示例就完成了,如需更多的使用场景可以访问:https://godoc.org/go.etcd.io/etcd/clientv3


     

   

备案编号:赣ICP备15011386号

联系方式:qq:1150662577    邮箱:1150662577@qq.com