diff --git a/MAINTAINERS b/MAINTAINERS index 63246e5..a0d7100 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -11,7 +11,7 @@ [Org] [Org."Core maintainers"] people = [ - "abronan", + "aluzzardi", ] [people] @@ -21,7 +21,7 @@ # in the people section. # ADD YOURSELF HERE IN ALPHABETICAL ORDER - [people.abronan] - Name = "Alexandre Beslic" - Email = "alexandre.beslic@gmail.com" - GitHub = "abronan" + [people.aluzzardi] + Name = "Andrea Luzzardi" + Email = "al@docker.com" + GitHub = "aluzzardi" diff --git a/README.md b/README.md index f97eefd..f31d01b 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,7 @@ } underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood", 15*time.Second) -electedCh, _, err := underwood.RunForElection() -if err != nil { - log.Fatal("Cannot run for election, store is probably down") -} +electedCh, _ := underwood.RunForElection() for isElected := range electedCh { // This loop will run every time there is a change in our leadership @@ -49,14 +46,13 @@ there is a change in leadership: ```go follower := leadership.NewFollower(client, "service/swarm/leader") -leaderCh, _, err := follower.FollowElection() -if err != nil { - log.Fatal("Cannot follow the election, store is probably down") -} +leaderCh, _ := follower.FollowElection() for leader := range leaderCh { // Leader is a string containing the value passed to `NewCandidate`. log.Printf("%s is now the leader", leader) } +log.Fatal("Cannot follow the election, store is probably down") +// Recovery code or exit ``` A typical use case for this is to be able to always send requests to the current @@ -85,26 +81,24 @@ time.Sleep(waitTime) // retry } - } + }() } func run(candidate *leadership.Candidate) { - electedCh, errCh, err := candidate.RunForElection() - if err != nil { - return - } + electedCh, errCh := candidate.RunForElection() for { select { - case elected := <-electedCh: + case isElected := <-electedCh: if isElected { // Do something } else { // Do something else } - case err := <-errCh: - log.Error(err) - return + case err := <-errCh: + log.Error(err) + return + } } } ``` diff --git a/candidate.go b/candidate.go index b7af799..8b96f28 100644 --- a/candidate.go +++ b/candidate.go @@ -8,7 +8,7 @@ ) const ( - defaultLockTTL = 15 * time.Second + defaultLockTTL = 20 * time.Second ) // Candidate runs the leader election algorithm asynchronously @@ -22,6 +22,7 @@ lockTTL time.Duration leader bool stopCh chan struct{} + stopRenew chan struct{} resignCh chan bool errCh chan error } @@ -51,28 +52,13 @@ // ElectedCh is used to get a channel which delivers signals on // acquiring or losing leadership. It sends true if we become // the leader, and false if we lose it. -func (c *Candidate) RunForElection() (<-chan bool, <-chan error, error) { +func (c *Candidate) RunForElection() (<-chan bool, <-chan error) { c.electedCh = make(chan bool) c.errCh = make(chan error) - lockOpts := &store.LockOptions{ - Value: []byte(c.node), - } + go c.campaign() - if c.lockTTL != defaultLockTTL { - lockOpts.TTL = c.lockTTL - lockOpts.RenewLock = make(chan struct{}) - } - - lock, err := c.client.NewLock(c.key, lockOpts) - - if err != nil { - return nil, nil, err - } - - go c.campaign(lock) - - return c.electedCh, c.errCh, nil + return c.electedCh, c.errCh } // Stop running for election. @@ -101,13 +87,41 @@ c.electedCh <- status } -func (c *Candidate) campaign(lock store.Locker) { +func (c *Candidate) initLock() (store.Locker, error) { + // Give up on the lock session if + // we recovered from a store failure + if c.stopRenew != nil { + close(c.stopRenew) + } + + lockOpts := &store.LockOptions{ + Value: []byte(c.node), + } + + if c.lockTTL != defaultLockTTL { + lockOpts.TTL = c.lockTTL + } + + lockOpts.RenewLock = make(chan struct{}) + c.stopRenew = lockOpts.RenewLock + + lock, err := c.client.NewLock(c.key, lockOpts) + return lock, err +} + +func (c *Candidate) campaign() { defer close(c.electedCh) defer close(c.errCh) for { // Start as a follower. c.update(false) + + lock, err := c.initLock() + if err != nil { + c.errCh <- err + return + } lostCh, err := lock.Lock(nil) if err != nil { diff --git a/candidate_test.go b/candidate_test.go index d29b433..274fcce 100644 --- a/candidate_test.go +++ b/candidate_test.go @@ -25,8 +25,7 @@ mockLock.On("Unlock").Return(nil) candidate := NewCandidate(kv, "test_key", "test_node", 0) - electedCh, _, err := candidate.RunForElection() - assert.Nil(t, err) + electedCh, _ := candidate.RunForElection() // Should issue a false upon start, no matter what. assert.False(t, <-electedCh) diff --git a/follower.go b/follower.go index 12f5f46..986af9b 100644 --- a/follower.go +++ b/follower.go @@ -33,18 +33,13 @@ } // FollowElection starts monitoring the election. -func (f *Follower) FollowElection() (<-chan string, <-chan error, error) { +func (f *Follower) FollowElection() (<-chan string, <-chan error) { f.leaderCh = make(chan string) f.errCh = make(chan error) - ch, err := f.client.Watch(f.key, f.stopCh) - if err != nil { - return nil, nil, err - } + go f.follow() - go f.follow(ch) - - return f.leaderCh, f.errCh, nil + return f.leaderCh, f.errCh } // Stop stops monitoring an election. @@ -52,9 +47,14 @@ close(f.stopCh) } -func (f *Follower) follow(ch <-chan *store.KVPair) { +func (f *Follower) follow() { defer close(f.leaderCh) defer close(f.errCh) + + ch, err := f.client.Watch(f.key, f.stopCh) + if err != nil { + f.errCh <- err + } f.leader = "" for kv := range ch { diff --git a/follower_test.go b/follower_test.go index 9483252..de2066b 100644 --- a/follower_test.go +++ b/follower_test.go @@ -21,8 +21,7 @@ mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil) follower := NewFollower(kv, "test_key") - leaderCh, errCh, err := follower.FollowElection() - assert.Nil(t, err) + leaderCh, errCh := follower.FollowElection() // Simulate leader updates go func() {