New upstream version 0.1.0
Tim Potter
7 years ago
10 | 10 | [Org] |
11 | 11 | [Org."Core maintainers"] |
12 | 12 | people = [ |
13 | "abronan", | |
13 | "aluzzardi", | |
14 | 14 | ] |
15 | 15 | |
16 | 16 | [people] |
20 | 20 | # in the people section. |
21 | 21 | |
22 | 22 | # ADD YOURSELF HERE IN ALPHABETICAL ORDER |
23 | [people.abronan] | |
24 | Name = "Alexandre Beslic" | |
25 | Email = "alexandre.beslic@gmail.com" | |
26 | GitHub = "abronan" | |
23 | [people.aluzzardi] | |
24 | Name = "Andrea Luzzardi" | |
25 | Email = "al@docker.com" | |
26 | GitHub = "aluzzardi" |
15 | 15 | } |
16 | 16 | |
17 | 17 | underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood", 15*time.Second) |
18 | electedCh, _, err := underwood.RunForElection() | |
19 | if err != nil { | |
20 | log.Fatal("Cannot run for election, store is probably down") | |
21 | } | |
18 | electedCh, _ := underwood.RunForElection() | |
22 | 19 | |
23 | 20 | for isElected := range electedCh { |
24 | 21 | // This loop will run every time there is a change in our leadership |
48 | 45 | there is a change in leadership: |
49 | 46 | ```go |
50 | 47 | follower := leadership.NewFollower(client, "service/swarm/leader") |
51 | leaderCh, _, err := follower.FollowElection() | |
52 | if err != nil { | |
53 | log.Fatal("Cannot follow the election, store is probably down") | |
54 | } | |
48 | leaderCh, _ := follower.FollowElection() | |
55 | 49 | for leader := range leaderCh { |
56 | 50 | // Leader is a string containing the value passed to `NewCandidate`. |
57 | 51 | log.Printf("%s is now the leader", leader) |
58 | 52 | } |
53 | log.Fatal("Cannot follow the election, store is probably down") | |
54 | // Recovery code or exit | |
59 | 55 | ``` |
60 | 56 | |
61 | 57 | A typical use case for this is to be able to always send requests to the current |
84 | 80 | time.Sleep(waitTime) |
85 | 81 | // retry |
86 | 82 | } |
87 | } | |
83 | }() | |
88 | 84 | } |
89 | 85 | |
90 | 86 | func run(candidate *leadership.Candidate) { |
91 | electedCh, errCh, err := candidate.RunForElection() | |
92 | if err != nil { | |
93 | return | |
94 | } | |
87 | electedCh, errCh := candidate.RunForElection() | |
95 | 88 | for { |
96 | 89 | select { |
97 | case elected := <-electedCh: | |
90 | case isElected := <-electedCh: | |
98 | 91 | if isElected { |
99 | 92 | // Do something |
100 | 93 | } else { |
101 | 94 | // Do something else |
102 | 95 | } |
103 | 96 | |
104 | case err := <-errCh: | |
105 | log.Error(err) | |
106 | return | |
97 | case err := <-errCh: | |
98 | log.Error(err) | |
99 | return | |
100 | } | |
107 | 101 | } |
108 | 102 | } |
109 | 103 | ``` |
7 | 7 | ) |
8 | 8 | |
9 | 9 | const ( |
10 | defaultLockTTL = 15 * time.Second | |
10 | defaultLockTTL = 20 * time.Second | |
11 | 11 | ) |
12 | 12 | |
13 | 13 | // Candidate runs the leader election algorithm asynchronously |
21 | 21 | lockTTL time.Duration |
22 | 22 | leader bool |
23 | 23 | stopCh chan struct{} |
24 | stopRenew chan struct{} | |
24 | 25 | resignCh chan bool |
25 | 26 | errCh chan error |
26 | 27 | } |
50 | 51 | // ElectedCh is used to get a channel which delivers signals on |
51 | 52 | // acquiring or losing leadership. It sends true if we become |
52 | 53 | // the leader, and false if we lose it. |
53 | func (c *Candidate) RunForElection() (<-chan bool, <-chan error, error) { | |
54 | func (c *Candidate) RunForElection() (<-chan bool, <-chan error) { | |
54 | 55 | c.electedCh = make(chan bool) |
55 | 56 | c.errCh = make(chan error) |
56 | 57 | |
57 | lockOpts := &store.LockOptions{ | |
58 | Value: []byte(c.node), | |
59 | } | |
58 | go c.campaign() | |
60 | 59 | |
61 | if c.lockTTL != defaultLockTTL { | |
62 | lockOpts.TTL = c.lockTTL | |
63 | lockOpts.RenewLock = make(chan struct{}) | |
64 | } | |
65 | ||
66 | lock, err := c.client.NewLock(c.key, lockOpts) | |
67 | ||
68 | if err != nil { | |
69 | return nil, nil, err | |
70 | } | |
71 | ||
72 | go c.campaign(lock) | |
73 | ||
74 | return c.electedCh, c.errCh, nil | |
60 | return c.electedCh, c.errCh | |
75 | 61 | } |
76 | 62 | |
77 | 63 | // Stop running for election. |
100 | 86 | c.electedCh <- status |
101 | 87 | } |
102 | 88 | |
103 | func (c *Candidate) campaign(lock store.Locker) { | |
89 | func (c *Candidate) initLock() (store.Locker, error) { | |
90 | // Give up on the lock session if | |
91 | // we recovered from a store failure | |
92 | if c.stopRenew != nil { | |
93 | close(c.stopRenew) | |
94 | } | |
95 | ||
96 | lockOpts := &store.LockOptions{ | |
97 | Value: []byte(c.node), | |
98 | } | |
99 | ||
100 | if c.lockTTL != defaultLockTTL { | |
101 | lockOpts.TTL = c.lockTTL | |
102 | } | |
103 | ||
104 | lockOpts.RenewLock = make(chan struct{}) | |
105 | c.stopRenew = lockOpts.RenewLock | |
106 | ||
107 | lock, err := c.client.NewLock(c.key, lockOpts) | |
108 | return lock, err | |
109 | } | |
110 | ||
111 | func (c *Candidate) campaign() { | |
104 | 112 | defer close(c.electedCh) |
105 | 113 | defer close(c.errCh) |
106 | 114 | |
107 | 115 | for { |
108 | 116 | // Start as a follower. |
109 | 117 | c.update(false) |
118 | ||
119 | lock, err := c.initLock() | |
120 | if err != nil { | |
121 | c.errCh <- err | |
122 | return | |
123 | } | |
110 | 124 | |
111 | 125 | lostCh, err := lock.Lock(nil) |
112 | 126 | if err != nil { |
24 | 24 | mockLock.On("Unlock").Return(nil) |
25 | 25 | |
26 | 26 | candidate := NewCandidate(kv, "test_key", "test_node", 0) |
27 | electedCh, _, err := candidate.RunForElection() | |
28 | assert.Nil(t, err) | |
27 | electedCh, _ := candidate.RunForElection() | |
29 | 28 | |
30 | 29 | // Should issue a false upon start, no matter what. |
31 | 30 | assert.False(t, <-electedCh) |
32 | 32 | } |
33 | 33 | |
34 | 34 | // FollowElection starts monitoring the election. |
35 | func (f *Follower) FollowElection() (<-chan string, <-chan error, error) { | |
35 | func (f *Follower) FollowElection() (<-chan string, <-chan error) { | |
36 | 36 | f.leaderCh = make(chan string) |
37 | 37 | f.errCh = make(chan error) |
38 | 38 | |
39 | ch, err := f.client.Watch(f.key, f.stopCh) | |
40 | if err != nil { | |
41 | return nil, nil, err | |
42 | } | |
39 | go f.follow() | |
43 | 40 | |
44 | go f.follow(ch) | |
45 | ||
46 | return f.leaderCh, f.errCh, nil | |
41 | return f.leaderCh, f.errCh | |
47 | 42 | } |
48 | 43 | |
49 | 44 | // Stop stops monitoring an election. |
51 | 46 | close(f.stopCh) |
52 | 47 | } |
53 | 48 | |
54 | func (f *Follower) follow(ch <-chan *store.KVPair) { | |
49 | func (f *Follower) follow() { | |
55 | 50 | defer close(f.leaderCh) |
56 | 51 | defer close(f.errCh) |
52 | ||
53 | ch, err := f.client.Watch(f.key, f.stopCh) | |
54 | if err != nil { | |
55 | f.errCh <- err | |
56 | } | |
57 | 57 | |
58 | 58 | f.leader = "" |
59 | 59 | for kv := range ch { |
20 | 20 | mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil) |
21 | 21 | |
22 | 22 | follower := NewFollower(kv, "test_key") |
23 | leaderCh, errCh, err := follower.FollowElection() | |
24 | assert.Nil(t, err) | |
23 | leaderCh, errCh := follower.FollowElection() | |
25 | 24 | |
26 | 25 | // Simulate leader updates |
27 | 26 | go func() { |