Codebase list golang-github-docker-leadership / upstream/latest
New upstream version 0.1.0 Tim Potter 7 years ago
6 changed file(s) with 61 addition(s) and 55 deletion(s). Raw diff Collapse all Expand all
1010 [Org]
1111 [Org."Core maintainers"]
1212 people = [
13 "abronan",
13 "aluzzardi",
1414 ]
1515
1616 [people]
2020 # in the people section.
2121
2222 # ADD YOURSELF HERE IN ALPHABETICAL ORDER
23 [people.abronan]
24 Name = "Alexandre Beslic"
25 Email = "alexandre.beslic@gmail.com"
26 GitHub = "abronan"
23 [people.aluzzardi]
24 Name = "Andrea Luzzardi"
25 Email = "al@docker.com"
26 GitHub = "aluzzardi"
1515 }
1616
1717 underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood", 15*time.Second)
18 electedCh, _, err := underwood.RunForElection()
19 if err != nil {
20 log.Fatal("Cannot run for election, store is probably down")
21 }
18 electedCh, _ := underwood.RunForElection()
2219
2320 for isElected := range electedCh {
2421 // This loop will run every time there is a change in our leadership
4845 there is a change in leadership:
4946 ```go
5047 follower := leadership.NewFollower(client, "service/swarm/leader")
51 leaderCh, _, err := follower.FollowElection()
52 if err != nil {
53 log.Fatal("Cannot follow the election, store is probably down")
54 }
48 leaderCh, _ := follower.FollowElection()
5549 for leader := range leaderCh {
5650 // Leader is a string containing the value passed to `NewCandidate`.
5751 log.Printf("%s is now the leader", leader)
5852 }
53 log.Fatal("Cannot follow the election, store is probably down")
54 // Recovery code or exit
5955 ```
6056
6157 A typical use case for this is to be able to always send requests to the current
8480 time.Sleep(waitTime)
8581 // retry
8682 }
87 }
83 }()
8884 }
8985
9086 func run(candidate *leadership.Candidate) {
91 electedCh, errCh, err := candidate.RunForElection()
92 if err != nil {
93 return
94 }
87 electedCh, errCh := candidate.RunForElection()
9588 for {
9689 select {
97 case elected := <-electedCh:
90 case isElected := <-electedCh:
9891 if isElected {
9992 // Do something
10093 } else {
10194 // Do something else
10295 }
10396
104 case err := <-errCh:
105 log.Error(err)
106 return
97 case err := <-errCh:
98 log.Error(err)
99 return
100 }
107101 }
108102 }
109103 ```
77 )
88
99 const (
10 defaultLockTTL = 15 * time.Second
10 defaultLockTTL = 20 * time.Second
1111 )
1212
1313 // Candidate runs the leader election algorithm asynchronously
2121 lockTTL time.Duration
2222 leader bool
2323 stopCh chan struct{}
24 stopRenew chan struct{}
2425 resignCh chan bool
2526 errCh chan error
2627 }
5051 // ElectedCh is used to get a channel which delivers signals on
5152 // acquiring or losing leadership. It sends true if we become
5253 // the leader, and false if we lose it.
53 func (c *Candidate) RunForElection() (<-chan bool, <-chan error, error) {
54 func (c *Candidate) RunForElection() (<-chan bool, <-chan error) {
5455 c.electedCh = make(chan bool)
5556 c.errCh = make(chan error)
5657
57 lockOpts := &store.LockOptions{
58 Value: []byte(c.node),
59 }
58 go c.campaign()
6059
61 if c.lockTTL != defaultLockTTL {
62 lockOpts.TTL = c.lockTTL
63 lockOpts.RenewLock = make(chan struct{})
64 }
65
66 lock, err := c.client.NewLock(c.key, lockOpts)
67
68 if err != nil {
69 return nil, nil, err
70 }
71
72 go c.campaign(lock)
73
74 return c.electedCh, c.errCh, nil
60 return c.electedCh, c.errCh
7561 }
7662
7763 // Stop running for election.
10086 c.electedCh <- status
10187 }
10288
103 func (c *Candidate) campaign(lock store.Locker) {
89 func (c *Candidate) initLock() (store.Locker, error) {
90 // Give up on the lock session if
91 // we recovered from a store failure
92 if c.stopRenew != nil {
93 close(c.stopRenew)
94 }
95
96 lockOpts := &store.LockOptions{
97 Value: []byte(c.node),
98 }
99
100 if c.lockTTL != defaultLockTTL {
101 lockOpts.TTL = c.lockTTL
102 }
103
104 lockOpts.RenewLock = make(chan struct{})
105 c.stopRenew = lockOpts.RenewLock
106
107 lock, err := c.client.NewLock(c.key, lockOpts)
108 return lock, err
109 }
110
111 func (c *Candidate) campaign() {
104112 defer close(c.electedCh)
105113 defer close(c.errCh)
106114
107115 for {
108116 // Start as a follower.
109117 c.update(false)
118
119 lock, err := c.initLock()
120 if err != nil {
121 c.errCh <- err
122 return
123 }
110124
111125 lostCh, err := lock.Lock(nil)
112126 if err != nil {
2424 mockLock.On("Unlock").Return(nil)
2525
2626 candidate := NewCandidate(kv, "test_key", "test_node", 0)
27 electedCh, _, err := candidate.RunForElection()
28 assert.Nil(t, err)
27 electedCh, _ := candidate.RunForElection()
2928
3029 // Should issue a false upon start, no matter what.
3130 assert.False(t, <-electedCh)
3232 }
3333
3434 // FollowElection starts monitoring the election.
35 func (f *Follower) FollowElection() (<-chan string, <-chan error, error) {
35 func (f *Follower) FollowElection() (<-chan string, <-chan error) {
3636 f.leaderCh = make(chan string)
3737 f.errCh = make(chan error)
3838
39 ch, err := f.client.Watch(f.key, f.stopCh)
40 if err != nil {
41 return nil, nil, err
42 }
39 go f.follow()
4340
44 go f.follow(ch)
45
46 return f.leaderCh, f.errCh, nil
41 return f.leaderCh, f.errCh
4742 }
4843
4944 // Stop stops monitoring an election.
5146 close(f.stopCh)
5247 }
5348
54 func (f *Follower) follow(ch <-chan *store.KVPair) {
49 func (f *Follower) follow() {
5550 defer close(f.leaderCh)
5651 defer close(f.errCh)
52
53 ch, err := f.client.Watch(f.key, f.stopCh)
54 if err != nil {
55 f.errCh <- err
56 }
5757
5858 f.leader = ""
5959 for kv := range ch {
2020 mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil)
2121
2222 follower := NewFollower(kv, "test_key")
23 leaderCh, errCh, err := follower.FollowElection()
24 assert.Nil(t, err)
23 leaderCh, errCh := follower.FollowElection()
2524
2625 // Simulate leader updates
2726 go func() {