diff --git a/sd/etcd/client.go b/sd/etcd/client.go index 0508723..eb604d6 100644 --- a/sd/etcd/client.go +++ b/sd/etcd/client.go @@ -156,7 +156,12 @@ if s.Value == "" { return ErrNoValue } - _, err := c.keysAPI.Create(c.ctx, s.Key, s.Value) + var err error + if s.TTL != nil { + _, err = c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{PrevExist: etcd.PrevIgnore, TTL: s.TTL.ttl}) + } else { + _, err = c.keysAPI.Create(c.ctx, s.Key, s.Value) + } return err } diff --git a/sd/etcd/registrar.go b/sd/etcd/registrar.go index 52b632a..e11c647 100644 --- a/sd/etcd/registrar.go +++ b/sd/etcd/registrar.go @@ -1,9 +1,16 @@ package etcd import ( + "sync" + "time" + etcd "github.com/coreos/etcd/client" "github.com/go-kit/kit/log" +) + +const ( + minHeartBeatTime = time.Millisecond * 500 ) // Registrar registers service instance liveness information to etcd. @@ -11,6 +18,8 @@ client Client service Service logger log.Logger + quit chan struct{} + sync.Mutex } // Service holds the instance identifying data you want to publish to etcd. Key @@ -19,7 +28,33 @@ type Service struct { Key string // unique key, e.g. "/service/foobar/1.2.3.4:8080" Value string // returned to subscribers, e.g. "http://1.2.3.4:8080" + TTL *TTLOption DeleteOptions *etcd.DeleteOptions +} + +// TTLOption allow setting a key with a TTL. This option will be used by a loop +// goroutine which regularly refreshes the lease of the key. +type TTLOption struct { + heartbeat time.Duration // e.g. time.Second * 3 + ttl time.Duration // e.g. time.Second * 10 +} + +// NewTTLOption returns a TTLOption that contains proper ttl settings. param +// heartbeat is used to refresh lease of the key periodically by a loop goroutine, +// its value should be at least 500ms. param ttl definite the lease of the key, +// its value should be greater than heartbeat's. +// e.g. heartbeat: time.Second * 3, ttl: time.Second * 10. +func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption { + if heartbeat <= minHeartBeatTime { + heartbeat = minHeartBeatTime + } + if ttl <= heartbeat { + ttl = heartbeat * 3 + } + return &TTLOption{ + heartbeat: heartbeat, + ttl: ttl, + } } // NewRegistrar returns a etcd Registrar acting on the provided catalog @@ -43,6 +78,29 @@ } else { r.logger.Log("action", "register") } + if r.service.TTL != nil { + go r.loop() + } +} + +func (r *Registrar) loop() { + r.Lock() + r.quit = make(chan struct{}) + r.Unlock() + + tick := time.NewTicker(r.service.TTL.heartbeat) + defer tick.Stop() + + for { + select { + case <-r.quit: + return + case <-tick.C: + if err := r.client.Register(r.service); err != nil { + r.logger.Log("err", err) + } + } + } } // Deregister implements the sd.Registrar interface. Call it when you want your @@ -53,4 +111,10 @@ } else { r.logger.Log("action", "deregister") } + r.Lock() + defer r.Unlock() + if r.quit != nil { + close(r.quit) + r.quit = nil + } } diff --git a/sd/etcd/registrar_test.go b/sd/etcd/registrar_test.go index bea6f91..9426f0b 100644 --- a/sd/etcd/registrar_test.go +++ b/sd/etcd/registrar_test.go @@ -30,7 +30,7 @@ } // default service used to build registrar in our tests -var testService = Service{"testKey", "testValue", nil} +var testService = Service{"testKey", "testValue", nil, nil} // NewRegistar should return a registar with a logger using the service key and value func TestNewRegistar(t *testing.T) {