Codebase list golang-github-go-kit-kit / 899c5f2
Fix ttl for etcd sd miao 6 years ago
2 changed file(s) with 46 addition(s) and 24 deletion(s). Raw diff Collapse all Expand all
155155 if s.Value == "" {
156156 return ErrNoValue
157157 }
158 var err error
158159 if s.TTL != nil {
159 _, err := c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{PrevExist: etcd.PrevIgnore, TTL: s.TTL.TTL})
160 return err
160 _, err = c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{PrevExist: etcd.PrevIgnore, TTL: s.TTL.ttl})
161 } else {
162 _, err = c.keysAPI.Create(c.ctx, s.Key, s.Value)
161163 }
162 _, err := c.keysAPI.Create(c.ctx, s.Key, s.Value)
163164 return err
164165 }
165166
00 package etcd
11
22 import (
3 "sync"
4 "time"
5
36 etcd "github.com/coreos/etcd/client"
47
58 "github.com/go-kit/kit/log"
6 "time"
79 )
810
911 const (
10 MinHeartBeatTime = time.Millisecond * 500
12 minHeartBeatTime = time.Millisecond * 500
1113 )
1214
1315 // Registrar registers service instance liveness information to etcd.
1618 service Service
1719 logger log.Logger
1820 quit chan struct{}
21 sync.Mutex
1922 }
2023
2124 // Service holds the instance identifying data you want to publish to etcd. Key
2831 DeleteOptions *etcd.DeleteOptions
2932 }
3033
31 // TTLOption allow setting a key with a TTL, and regularly refreshes the lease with a goroutine
34 // TTLOption allow setting a key with a TTL. This option will be used by a loop
35 // goroutine which regularly refreshes the lease of the key.
3236 type TTLOption struct {
33 Heartbeat time.Duration
34 TTL time.Duration
37 heartbeat time.Duration // e.g. time.Second * 3
38 ttl time.Duration // e.g. time.Second * 10
3539 }
3640
37 // NewTTLOption returns a TTLOption
41 // NewTTLOption returns a TTLOption that contains proper ttl settings. param
42 // heartbeat is used to refresh lease of the key periodically by a loop goroutine,
43 // its value should be at least 500ms. param ttl definite the lease of the key,
44 // its value should be greater than heartbeat's.
45 // e.g. heartbeat: time.Second * 3, ttl: time.Second * 10.
3846 func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption {
39 if heartbeat <= MinHeartBeatTime {
40 heartbeat = MinHeartBeatTime
47 if heartbeat <= minHeartBeatTime {
48 heartbeat = minHeartBeatTime
4149 }
4250 if ttl <= heartbeat {
4351 ttl = heartbeat * 3
4452 }
45 return &TTLOption{heartbeat, ttl}
53 return &TTLOption{
54 heartbeat: heartbeat,
55 ttl: ttl,
56 }
4657 }
4758
4859 // NewRegistrar returns a etcd Registrar acting on the provided catalog
6677 } else {
6778 r.logger.Log("action", "register")
6879 }
69 if r.service.TTL == nil {
70 return
80 if r.service.TTL != nil {
81 go r.loop()
7182 }
83 }
84
85 func (r *Registrar) loop() {
86 r.Lock()
7287 r.quit = make(chan struct{})
73 go func() {
74 for {
75 select {
76 case <-r.quit:
77 return
78 case <-time.After(r.service.TTL.Heartbeat):
79 if err := r.client.Register(r.service); err != nil {
80 r.logger.Log("err", err)
81 }
88 r.Unlock()
89
90 tick := time.NewTicker(r.service.TTL.heartbeat)
91 defer tick.Stop()
92
93 for {
94 select {
95 case <-r.quit:
96 return
97 case <-tick.C:
98 if err := r.client.Register(r.service); err != nil {
99 r.logger.Log("err", err)
82100 }
83101 }
84 }()
102 }
85103 }
86104
87105 // Deregister implements the sd.Registrar interface. Call it when you want your
92110 } else {
93111 r.logger.Log("action", "deregister")
94112 }
113 r.Lock()
114 defer r.Unlock()
95115 if r.quit != nil {
96116 close(r.quit)
117 r.quit = nil
97118 }
98119 }