Codebase list golang-github-docker-leadership / 51e913f
Fix problematic lock renewal after leader failure in Consul This PR changes slightly the lock management during the leader election process by removing the session in a best effort fashion after a leader or store failure. This is necessary for Consul which restores the session after the leader failure and leaves the current leader unaware of its status change, thus leaving the cluster without leader. Signed-off-by: Alexandre Beslic <alexandre.beslic@gmail.com> Alexandre Beslic 8 years ago
5 changed file(s) with 50 addition(s) and 45 deletion(s). Raw diff Collapse all Expand all
1515 }
1616
1717 underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood", 15*time.Second)
18 electedCh, _, err := underwood.RunForElection()
19 if err != nil {
20 log.Fatal("Cannot run for election, store is probably down")
21 }
18 electedCh, _ := underwood.RunForElection()
2219
2320 for isElected := range electedCh {
2421 // This loop will run every time there is a change in our leadership
4845 there is a change in leadership:
4946 ```go
5047 follower := leadership.NewFollower(client, "service/swarm/leader")
51 leaderCh, _, err := follower.FollowElection()
52 if err != nil {
53 log.Fatal("Cannot follow the election, store is probably down")
54 }
48 leaderCh, _ := follower.FollowElection()
5549 for leader := range leaderCh {
5650 // Leader is a string containing the value passed to `NewCandidate`.
5751 log.Printf("%s is now the leader", leader)
5852 }
53 log.Fatal("Cannot follow the election, store is probably down")
54 // Recovery code or exit
5955 ```
6056
6157 A typical use case for this is to be able to always send requests to the current
8884 }
8985
9086 func run(candidate *leadership.Candidate) {
91 electedCh, errCh, err := candidate.RunForElection()
92 if err != nil {
93 return
94 }
87 electedCh, errCh := candidate.RunForElection()
9588 for {
9689 select {
9790 case isElected := <-electedCh:
77 )
88
99 const (
10 defaultLockTTL = 15 * time.Second
10 defaultLockTTL = 20 * time.Second
1111 )
1212
1313 // Candidate runs the leader election algorithm asynchronously
2121 lockTTL time.Duration
2222 leader bool
2323 stopCh chan struct{}
24 stopRenew chan struct{}
2425 resignCh chan bool
2526 errCh chan error
2627 }
5051 // ElectedCh is used to get a channel which delivers signals on
5152 // acquiring or losing leadership. It sends true if we become
5253 // the leader, and false if we lose it.
53 func (c *Candidate) RunForElection() (<-chan bool, <-chan error, error) {
54 func (c *Candidate) RunForElection() (<-chan bool, <-chan error) {
5455 c.electedCh = make(chan bool)
5556 c.errCh = make(chan error)
5657
57 lockOpts := &store.LockOptions{
58 Value: []byte(c.node),
59 }
58 go c.campaign()
6059
61 if c.lockTTL != defaultLockTTL {
62 lockOpts.TTL = c.lockTTL
63 lockOpts.RenewLock = make(chan struct{})
64 }
65
66 lock, err := c.client.NewLock(c.key, lockOpts)
67
68 if err != nil {
69 return nil, nil, err
70 }
71
72 go c.campaign(lock)
73
74 return c.electedCh, c.errCh, nil
60 return c.electedCh, c.errCh
7561 }
7662
7763 // Stop running for election.
10086 c.electedCh <- status
10187 }
10288
103 func (c *Candidate) campaign(lock store.Locker) {
89 func (c *Candidate) initLock() (store.Locker, error) {
90 // Give up on the lock session if
91 // we recovered from a store failure
92 if c.stopRenew != nil {
93 close(c.stopRenew)
94 }
95
96 lockOpts := &store.LockOptions{
97 Value: []byte(c.node),
98 }
99
100 if c.lockTTL != defaultLockTTL {
101 lockOpts.TTL = c.lockTTL
102 }
103
104 lockOpts.RenewLock = make(chan struct{})
105 c.stopRenew = lockOpts.RenewLock
106
107 lock, err := c.client.NewLock(c.key, lockOpts)
108 return lock, err
109 }
110
111 func (c *Candidate) campaign() {
104112 defer close(c.electedCh)
105113 defer close(c.errCh)
106114
107115 for {
108116 // Start as a follower.
109117 c.update(false)
118
119 lock, err := c.initLock()
120 if err != nil {
121 c.errCh <- err
122 return
123 }
110124
111125 lostCh, err := lock.Lock(nil)
112126 if err != nil {
2424 mockLock.On("Unlock").Return(nil)
2525
2626 candidate := NewCandidate(kv, "test_key", "test_node", 0)
27 electedCh, _, err := candidate.RunForElection()
28 assert.Nil(t, err)
27 electedCh, _ := candidate.RunForElection()
2928
3029 // Should issue a false upon start, no matter what.
3130 assert.False(t, <-electedCh)
3232 }
3333
3434 // FollowElection starts monitoring the election.
35 func (f *Follower) FollowElection() (<-chan string, <-chan error, error) {
35 func (f *Follower) FollowElection() (<-chan string, <-chan error) {
3636 f.leaderCh = make(chan string)
3737 f.errCh = make(chan error)
3838
39 ch, err := f.client.Watch(f.key, f.stopCh)
40 if err != nil {
41 return nil, nil, err
42 }
39 go f.follow()
4340
44 go f.follow(ch)
45
46 return f.leaderCh, f.errCh, nil
41 return f.leaderCh, f.errCh
4742 }
4843
4944 // Stop stops monitoring an election.
5146 close(f.stopCh)
5247 }
5348
54 func (f *Follower) follow(ch <-chan *store.KVPair) {
49 func (f *Follower) follow() {
5550 defer close(f.leaderCh)
5651 defer close(f.errCh)
52
53 ch, err := f.client.Watch(f.key, f.stopCh)
54 if err != nil {
55 f.errCh <- err
56 }
5757
5858 f.leader = ""
5959 for kv := range ch {
2020 mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil)
2121
2222 follower := NewFollower(kv, "test_key")
23 leaderCh, errCh, err := follower.FollowElection()
24 assert.Nil(t, err)
23 leaderCh, errCh := follower.FollowElection()
2524
2625 // Simulate leader updates
2726 go func() {