Codebase list golang-github-go-kit-kit / 9f4e759
Merge pull request #413 from buptmiao/master Add TTL for etcd sd Peter Bourgon authored 7 years ago GitHub committed 7 years ago
3 changed file(s) with 71 addition(s) and 2 deletion(s). Raw diff Collapse all Expand all
155155 if s.Value == "" {
156156 return ErrNoValue
157157 }
158 _, err := c.keysAPI.Create(c.ctx, s.Key, s.Value)
158 var err error
159 if s.TTL != nil {
160 _, err = c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{PrevExist: etcd.PrevIgnore, TTL: s.TTL.ttl})
161 } else {
162 _, err = c.keysAPI.Create(c.ctx, s.Key, s.Value)
163 }
159164 return err
160165 }
161166
00 package etcd
11
22 import (
3 "sync"
4 "time"
5
36 etcd "github.com/coreos/etcd/client"
47
58 "github.com/go-kit/kit/log"
9 )
10
11 const (
12 minHeartBeatTime = time.Millisecond * 500
613 )
714
815 // Registrar registers service instance liveness information to etcd.
1017 client Client
1118 service Service
1219 logger log.Logger
20 quit chan struct{}
21 sync.Mutex
1322 }
1423
1524 // Service holds the instance identifying data you want to publish to etcd. Key
1827 type Service struct {
1928 Key string // unique key, e.g. "/service/foobar/1.2.3.4:8080"
2029 Value string // returned to subscribers, e.g. "http://1.2.3.4:8080"
30 TTL *TTLOption
2131 DeleteOptions *etcd.DeleteOptions
32 }
33
34 // TTLOption allow setting a key with a TTL. This option will be used by a loop
35 // goroutine which regularly refreshes the lease of the key.
36 type TTLOption struct {
37 heartbeat time.Duration // e.g. time.Second * 3
38 ttl time.Duration // e.g. time.Second * 10
39 }
40
41 // NewTTLOption returns a TTLOption that contains proper ttl settings. param
42 // heartbeat is used to refresh lease of the key periodically by a loop goroutine,
43 // its value should be at least 500ms. param ttl definite the lease of the key,
44 // its value should be greater than heartbeat's.
45 // e.g. heartbeat: time.Second * 3, ttl: time.Second * 10.
46 func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption {
47 if heartbeat <= minHeartBeatTime {
48 heartbeat = minHeartBeatTime
49 }
50 if ttl <= heartbeat {
51 ttl = heartbeat * 3
52 }
53 return &TTLOption{
54 heartbeat: heartbeat,
55 ttl: ttl,
56 }
2257 }
2358
2459 // NewRegistrar returns a etcd Registrar acting on the provided catalog
4277 } else {
4378 r.logger.Log("action", "register")
4479 }
80 if r.service.TTL != nil {
81 go r.loop()
82 }
83 }
84
85 func (r *Registrar) loop() {
86 r.Lock()
87 r.quit = make(chan struct{})
88 r.Unlock()
89
90 tick := time.NewTicker(r.service.TTL.heartbeat)
91 defer tick.Stop()
92
93 for {
94 select {
95 case <-r.quit:
96 return
97 case <-tick.C:
98 if err := r.client.Register(r.service); err != nil {
99 r.logger.Log("err", err)
100 }
101 }
102 }
45103 }
46104
47105 // Deregister implements the sd.Registrar interface. Call it when you want your
52110 } else {
53111 r.logger.Log("action", "deregister")
54112 }
113 r.Lock()
114 defer r.Unlock()
115 if r.quit != nil {
116 close(r.quit)
117 r.quit = nil
118 }
55119 }
2929 }
3030
3131 // default service used to build registrar in our tests
32 var testService = Service{"testKey", "testValue", nil}
32 var testService = Service{"testKey", "testValue", nil, nil}
3333
3434 // NewRegistar should return a registar with a logger using the service key and value
3535 func TestNewRegistar(t *testing.T) {