Imported Upstream version 0.0~git20160208.0.b545f2d
Dmitry Smirnov
8 years ago
0 | # Contributing to Docker | |
1 | ||
2 | ### Sign your work | |
3 | ||
4 | The sign-off is a simple line at the end of the explanation for the patch. Your | |
5 | signature certifies that you wrote the patch or otherwise have the right to pass | |
6 | it on as an open-source patch. The rules are pretty simple: if you can certify | |
7 | the below (from [developercertificate.org](http://developercertificate.org/)): | |
8 | ||
9 | ``` | |
10 | Developer Certificate of Origin | |
11 | Version 1.1 | |
12 | ||
13 | Copyright (C) 2004, 2006 The Linux Foundation and its contributors. | |
14 | 660 York Street, Suite 102, | |
15 | San Francisco, CA 94110 USA | |
16 | ||
17 | Everyone is permitted to copy and distribute verbatim copies of this | |
18 | license document, but changing it is not allowed. | |
19 | ||
20 | Developer's Certificate of Origin 1.1 | |
21 | ||
22 | By making a contribution to this project, I certify that: | |
23 | ||
24 | (a) The contribution was created in whole or in part by me and I | |
25 | have the right to submit it under the open source license | |
26 | indicated in the file; or | |
27 | ||
28 | (b) The contribution is based upon previous work that, to the best | |
29 | of my knowledge, is covered under an appropriate open source | |
30 | license and I have the right under that license to submit that | |
31 | work with modifications, whether created in whole or in part | |
32 | by me, under the same open source license (unless I am | |
33 | permitted to submit under a different license), as indicated | |
34 | in the file; or | |
35 | ||
36 | (c) The contribution was provided directly to me by some other | |
37 | person who certified (a), (b) or (c) and I have not modified | |
38 | it. | |
39 | ||
40 | (d) I understand and agree that this project and the contribution | |
41 | are public and that a record of the contribution (including all | |
42 | personal information I submit with it, including my sign-off) is | |
43 | maintained indefinitely and may be redistributed consistent with | |
44 | this project or the open source license(s) involved. | |
45 | ``` | |
46 | ||
47 | Then you just add a line to every git commit message: | |
48 | ||
49 | Signed-off-by: Joe Smith <joe.smith@email.com> | |
50 | ||
51 | Use your real name (sorry, no pseudonyms or anonymous contributions.) | |
52 | ||
53 | If you set your `user.name` and `user.email` git configs, you can sign your | |
54 | commit automatically with `git commit -s`. |
0 | ||
1 | Apache License | |
2 | Version 2.0, January 2004 | |
3 | https://www.apache.org/licenses/ | |
4 | ||
5 | TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION | |
6 | ||
7 | 1. Definitions. | |
8 | ||
9 | "License" shall mean the terms and conditions for use, reproduction, | |
10 | and distribution as defined by Sections 1 through 9 of this document. | |
11 | ||
12 | "Licensor" shall mean the copyright owner or entity authorized by | |
13 | the copyright owner that is granting the License. | |
14 | ||
15 | "Legal Entity" shall mean the union of the acting entity and all | |
16 | other entities that control, are controlled by, or are under common | |
17 | control with that entity. For the purposes of this definition, | |
18 | "control" means (i) the power, direct or indirect, to cause the | |
19 | direction or management of such entity, whether by contract or | |
20 | otherwise, or (ii) ownership of fifty percent (50%) or more of the | |
21 | outstanding shares, or (iii) beneficial ownership of such entity. | |
22 | ||
23 | "You" (or "Your") shall mean an individual or Legal Entity | |
24 | exercising permissions granted by this License. | |
25 | ||
26 | "Source" form shall mean the preferred form for making modifications, | |
27 | including but not limited to software source code, documentation | |
28 | source, and configuration files. | |
29 | ||
30 | "Object" form shall mean any form resulting from mechanical | |
31 | transformation or translation of a Source form, including but | |
32 | not limited to compiled object code, generated documentation, | |
33 | and conversions to other media types. | |
34 | ||
35 | "Work" shall mean the work of authorship, whether in Source or | |
36 | Object form, made available under the License, as indicated by a | |
37 | copyright notice that is included in or attached to the work | |
38 | (an example is provided in the Appendix below). | |
39 | ||
40 | "Derivative Works" shall mean any work, whether in Source or Object | |
41 | form, that is based on (or derived from) the Work and for which the | |
42 | editorial revisions, annotations, elaborations, or other modifications | |
43 | represent, as a whole, an original work of authorship. For the purposes | |
44 | of this License, Derivative Works shall not include works that remain | |
45 | separable from, or merely link (or bind by name) to the interfaces of, | |
46 | the Work and Derivative Works thereof. | |
47 | ||
48 | "Contribution" shall mean any work of authorship, including | |
49 | the original version of the Work and any modifications or additions | |
50 | to that Work or Derivative Works thereof, that is intentionally | |
51 | submitted to Licensor for inclusion in the Work by the copyright owner | |
52 | or by an individual or Legal Entity authorized to submit on behalf of | |
53 | the copyright owner. For the purposes of this definition, "submitted" | |
54 | means any form of electronic, verbal, or written communication sent | |
55 | to the Licensor or its representatives, including but not limited to | |
56 | communication on electronic mailing lists, source code control systems, | |
57 | and issue tracking systems that are managed by, or on behalf of, the | |
58 | Licensor for the purpose of discussing and improving the Work, but | |
59 | excluding communication that is conspicuously marked or otherwise | |
60 | designated in writing by the copyright owner as "Not a Contribution." | |
61 | ||
62 | "Contributor" shall mean Licensor and any individual or Legal Entity | |
63 | on behalf of whom a Contribution has been received by Licensor and | |
64 | subsequently incorporated within the Work. | |
65 | ||
66 | 2. Grant of Copyright License. Subject to the terms and conditions of | |
67 | this License, each Contributor hereby grants to You a perpetual, | |
68 | worldwide, non-exclusive, no-charge, royalty-free, irrevocable | |
69 | copyright license to reproduce, prepare Derivative Works of, | |
70 | publicly display, publicly perform, sublicense, and distribute the | |
71 | Work and such Derivative Works in Source or Object form. | |
72 | ||
73 | 3. Grant of Patent License. Subject to the terms and conditions of | |
74 | this License, each Contributor hereby grants to You a perpetual, | |
75 | worldwide, non-exclusive, no-charge, royalty-free, irrevocable | |
76 | (except as stated in this section) patent license to make, have made, | |
77 | use, offer to sell, sell, import, and otherwise transfer the Work, | |
78 | where such license applies only to those patent claims licensable | |
79 | by such Contributor that are necessarily infringed by their | |
80 | Contribution(s) alone or by combination of their Contribution(s) | |
81 | with the Work to which such Contribution(s) was submitted. If You | |
82 | institute patent litigation against any entity (including a | |
83 | cross-claim or counterclaim in a lawsuit) alleging that the Work | |
84 | or a Contribution incorporated within the Work constitutes direct | |
85 | or contributory patent infringement, then any patent licenses | |
86 | granted to You under this License for that Work shall terminate | |
87 | as of the date such litigation is filed. | |
88 | ||
89 | 4. Redistribution. You may reproduce and distribute copies of the | |
90 | Work or Derivative Works thereof in any medium, with or without | |
91 | modifications, and in Source or Object form, provided that You | |
92 | meet the following conditions: | |
93 | ||
94 | (a) You must give any other recipients of the Work or | |
95 | Derivative Works a copy of this License; and | |
96 | ||
97 | (b) You must cause any modified files to carry prominent notices | |
98 | stating that You changed the files; and | |
99 | ||
100 | (c) You must retain, in the Source form of any Derivative Works | |
101 | that You distribute, all copyright, patent, trademark, and | |
102 | attribution notices from the Source form of the Work, | |
103 | excluding those notices that do not pertain to any part of | |
104 | the Derivative Works; and | |
105 | ||
106 | (d) If the Work includes a "NOTICE" text file as part of its | |
107 | distribution, then any Derivative Works that You distribute must | |
108 | include a readable copy of the attribution notices contained | |
109 | within such NOTICE file, excluding those notices that do not | |
110 | pertain to any part of the Derivative Works, in at least one | |
111 | of the following places: within a NOTICE text file distributed | |
112 | as part of the Derivative Works; within the Source form or | |
113 | documentation, if provided along with the Derivative Works; or, | |
114 | within a display generated by the Derivative Works, if and | |
115 | wherever such third-party notices normally appear. The contents | |
116 | of the NOTICE file are for informational purposes only and | |
117 | do not modify the License. You may add Your own attribution | |
118 | notices within Derivative Works that You distribute, alongside | |
119 | or as an addendum to the NOTICE text from the Work, provided | |
120 | that such additional attribution notices cannot be construed | |
121 | as modifying the License. | |
122 | ||
123 | You may add Your own copyright statement to Your modifications and | |
124 | may provide additional or different license terms and conditions | |
125 | for use, reproduction, or distribution of Your modifications, or | |
126 | for any such Derivative Works as a whole, provided Your use, | |
127 | reproduction, and distribution of the Work otherwise complies with | |
128 | the conditions stated in this License. | |
129 | ||
130 | 5. Submission of Contributions. Unless You explicitly state otherwise, | |
131 | any Contribution intentionally submitted for inclusion in the Work | |
132 | by You to the Licensor shall be under the terms and conditions of | |
133 | this License, without any additional terms or conditions. | |
134 | Notwithstanding the above, nothing herein shall supersede or modify | |
135 | the terms of any separate license agreement you may have executed | |
136 | with Licensor regarding such Contributions. | |
137 | ||
138 | 6. Trademarks. This License does not grant permission to use the trade | |
139 | names, trademarks, service marks, or product names of the Licensor, | |
140 | except as required for reasonable and customary use in describing the | |
141 | origin of the Work and reproducing the content of the NOTICE file. | |
142 | ||
143 | 7. Disclaimer of Warranty. Unless required by applicable law or | |
144 | agreed to in writing, Licensor provides the Work (and each | |
145 | Contributor provides its Contributions) on an "AS IS" BASIS, | |
146 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
147 | implied, including, without limitation, any warranties or conditions | |
148 | of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A | |
149 | PARTICULAR PURPOSE. You are solely responsible for determining the | |
150 | appropriateness of using or redistributing the Work and assume any | |
151 | risks associated with Your exercise of permissions under this License. | |
152 | ||
153 | 8. Limitation of Liability. In no event and under no legal theory, | |
154 | whether in tort (including negligence), contract, or otherwise, | |
155 | unless required by applicable law (such as deliberate and grossly | |
156 | negligent acts) or agreed to in writing, shall any Contributor be | |
157 | liable to You for damages, including any direct, indirect, special, | |
158 | incidental, or consequential damages of any character arising as a | |
159 | result of this License or out of the use or inability to use the | |
160 | Work (including but not limited to damages for loss of goodwill, | |
161 | work stoppage, computer failure or malfunction, or any and all | |
162 | other commercial damages or losses), even if such Contributor | |
163 | has been advised of the possibility of such damages. | |
164 | ||
165 | 9. Accepting Warranty or Additional Liability. While redistributing | |
166 | the Work or Derivative Works thereof, You may choose to offer, | |
167 | and charge a fee for, acceptance of support, warranty, indemnity, | |
168 | or other liability obligations and/or rights consistent with this | |
169 | License. However, in accepting such obligations, You may act only | |
170 | on Your own behalf and on Your sole responsibility, not on behalf | |
171 | of any other Contributor, and only if You agree to indemnify, | |
172 | defend, and hold each Contributor harmless for any liability | |
173 | incurred by, or claims asserted against, such Contributor by reason | |
174 | of your accepting any such warranty or additional liability. | |
175 | ||
176 | END OF TERMS AND CONDITIONS | |
177 | ||
178 | Copyright 2015 Docker, Inc. | |
179 | ||
180 | Licensed under the Apache License, Version 2.0 (the "License"); | |
181 | you may not use this file except in compliance with the License. | |
182 | You may obtain a copy of the License at | |
183 | ||
184 | https://www.apache.org/licenses/LICENSE-2.0 | |
185 | ||
186 | Unless required by applicable law or agreed to in writing, software | |
187 | distributed under the License is distributed on an "AS IS" BASIS, | |
188 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
189 | See the License for the specific language governing permissions and | |
190 | limitations under the License. |
0 | # leadership maintainers file | |
1 | # | |
2 | # This file describes who runs the docker/leadership project and how. | |
3 | # This is a living document - if you see something out of date or missing, speak up! | |
4 | # | |
5 | # It is structured to be consumable by both humans and programs. | |
6 | # To extract its contents programmatically, use any TOML-compliant parser. | |
7 | # | |
8 | # This file is compiled into the MAINTAINERS file in docker/opensource. | |
9 | # | |
10 | [Org] | |
11 | [Org."Core maintainers"] | |
12 | people = [ | |
13 | "abronan", | |
14 | ] | |
15 | ||
16 | [people] | |
17 | ||
18 | # A reference list of all people associated with the project. | |
19 | # All other sections should refer to people by their canonical key | |
20 | # in the people section. | |
21 | ||
22 | # ADD YOURSELF HERE IN ALPHABETICAL ORDER | |
23 | [people.abronan] | |
24 | Name = "Alexandre Beslic" | |
25 | Email = "alexandre.beslic@gmail.com" | |
26 | GitHub = "abronan" |
0 | # Leadership: Distributed Leader Election for Clustered Environments. | |
1 | ||
2 | Leadership is a library for a cluster leader election on top of a distributed | |
3 | Key/Value store. | |
4 | ||
5 | It is built using the `docker/libkv` library and is designed to work across multiple | |
6 | storage backends. | |
7 | ||
8 | You can use `leadership` with `Consul`, `etcd` and `Zookeeper`. | |
9 | ||
10 | ```go | |
11 | // Create a store using pkg/store. | |
12 | client, err := store.NewStore("consul", []string{"127.0.0.1:8500"}, &store.Config{}) | |
13 | if err != nil { | |
14 | panic(err) | |
15 | } | |
16 | ||
17 | underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood", 15*time.Second) | |
18 | electedCh, _, err := underwood.RunForElection() | |
19 | if err != nil { | |
20 | log.Fatal("Cannot run for election, store is probably down") | |
21 | } | |
22 | ||
23 | for isElected := range electedCh { | |
24 | // This loop will run every time there is a change in our leadership | |
25 | // status. | |
26 | ||
27 | if isElected { | |
28 | // We won the election - we are now the leader. | |
29 | // Let's do leader stuff, for example, sleep for a while. | |
30 | log.Printf("I won the election! I'm now the leader") | |
31 | time.Sleep(10 * time.Second) | |
32 | ||
33 | // Tired of being a leader? You can resign anytime. | |
34 | candidate.Resign() | |
35 | } else { | |
36 | // We lost the election but are still running for leadership. | |
37 | // `elected == false` is the default state and is the first event | |
38 | // we'll receive from the channel. After a successful election, | |
39 | // this event can get triggered if someone else steals the | |
40 | // leadership or if we resign. | |
41 | ||
42 | log.Printf("Lost the election, let's try another time") | |
43 | } | |
44 | } | |
45 | ``` | |
46 | ||
47 | It is possible to follow an election in real-time and get notified whenever | |
48 | there is a change in leadership: | |
49 | ```go | |
50 | follower := leadership.NewFollower(client, "service/swarm/leader") | |
51 | leaderCh, _, err := follower.FollowElection() | |
52 | if err != nil { | |
53 | log.Fatal("Cannot follow the election, store is probably down") | |
54 | } | |
55 | for leader := range leaderCh { | |
56 | // Leader is a string containing the value passed to `NewCandidate`. | |
57 | log.Printf("%s is now the leader", leader) | |
58 | } | |
59 | ``` | |
60 | ||
61 | A typical use case for this is to be able to always send requests to the current | |
62 | leader. | |
63 | ||
64 | ## Fault tolerance | |
65 | ||
66 | Leadership returns an error channel for Candidates and Followers that you can use | |
67 | to be resilient to failures. For example, if the watch on the leader key fails | |
68 | because the store becomes unavailable, you can retry the process later. | |
69 | ||
70 | ```go | |
71 | func participate() { | |
72 | // Create a store using pkg/store. | |
73 | client, err := store.NewStore("consul", []string{"127.0.0.1:8500"}, &store.Config{}) | |
74 | if err != nil { | |
75 | panic(err) | |
76 | } | |
77 | ||
78 | waitTime := 10 * time.Second | |
79 | underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood", 15*time.Second) | |
80 | ||
81 | go func() { | |
82 | for { | |
83 | run(underwood) | |
84 | time.Sleep(waitTime) | |
85 | // retry | |
86 | } | |
87 | } | |
88 | } | |
89 | ||
90 | func run(candidate *leadership.Candidate) { | |
91 | electedCh, errCh, err := candidate.RunForElection() | |
92 | if err != nil { | |
93 | return | |
94 | } | |
95 | for { | |
96 | select { | |
97 | case elected := <-electedCh: | |
98 | if isElected { | |
99 | // Do something | |
100 | } else { | |
101 | // Do something else | |
102 | } | |
103 | ||
104 | case err := <-errCh: | |
105 | log.Error(err) | |
106 | return | |
107 | } | |
108 | } | |
109 | ``` | |
110 | ||
111 | ## License | |
112 | ||
113 | leadership is licensed under the Apache License, Version 2.0. See [LICENSE](LICENSE) for the full license text. |
0 | package leadership | |
1 | ||
2 | import ( | |
3 | "sync" | |
4 | "time" | |
5 | ||
6 | "github.com/docker/libkv/store" | |
7 | ) | |
8 | ||
9 | const ( | |
10 | defaultLockTTL = 15 * time.Second | |
11 | ) | |
12 | ||
13 | // Candidate runs the leader election algorithm asynchronously | |
14 | type Candidate struct { | |
15 | client store.Store | |
16 | key string | |
17 | node string | |
18 | ||
19 | electedCh chan bool | |
20 | lock sync.Mutex | |
21 | lockTTL time.Duration | |
22 | leader bool | |
23 | stopCh chan struct{} | |
24 | resignCh chan bool | |
25 | errCh chan error | |
26 | } | |
27 | ||
28 | // NewCandidate creates a new Candidate | |
29 | func NewCandidate(client store.Store, key, node string, ttl time.Duration) *Candidate { | |
30 | return &Candidate{ | |
31 | client: client, | |
32 | key: key, | |
33 | node: node, | |
34 | ||
35 | leader: false, | |
36 | lockTTL: ttl, | |
37 | resignCh: make(chan bool), | |
38 | stopCh: make(chan struct{}), | |
39 | } | |
40 | } | |
41 | ||
42 | // IsLeader returns true if the candidate is currently a leader. | |
43 | func (c *Candidate) IsLeader() bool { | |
44 | return c.leader | |
45 | } | |
46 | ||
47 | // RunForElection starts the leader election algorithm. Updates in status are | |
48 | // pushed through the ElectedCh channel. | |
49 | // | |
50 | // ElectedCh is used to get a channel which delivers signals on | |
51 | // acquiring or losing leadership. It sends true if we become | |
52 | // the leader, and false if we lose it. | |
53 | func (c *Candidate) RunForElection() (<-chan bool, <-chan error, error) { | |
54 | c.electedCh = make(chan bool) | |
55 | c.errCh = make(chan error) | |
56 | ||
57 | lockOpts := &store.LockOptions{ | |
58 | Value: []byte(c.node), | |
59 | } | |
60 | ||
61 | if c.lockTTL != defaultLockTTL { | |
62 | lockOpts.TTL = c.lockTTL | |
63 | lockOpts.RenewLock = make(chan struct{}) | |
64 | } | |
65 | ||
66 | lock, err := c.client.NewLock(c.key, lockOpts) | |
67 | ||
68 | if err != nil { | |
69 | return nil, nil, err | |
70 | } | |
71 | ||
72 | go c.campaign(lock) | |
73 | ||
74 | return c.electedCh, c.errCh, nil | |
75 | } | |
76 | ||
77 | // Stop running for election. | |
78 | func (c *Candidate) Stop() { | |
79 | close(c.stopCh) | |
80 | } | |
81 | ||
82 | // Resign forces the candidate to step-down and try again. | |
83 | // If the candidate is not a leader, it doesn't have any effect. | |
84 | // Candidate will retry immediately to acquire the leadership. If no-one else | |
85 | // took it, then the Candidate will end up being a leader again. | |
86 | func (c *Candidate) Resign() { | |
87 | c.lock.Lock() | |
88 | defer c.lock.Unlock() | |
89 | ||
90 | if c.leader { | |
91 | c.resignCh <- true | |
92 | } | |
93 | } | |
94 | ||
95 | func (c *Candidate) update(status bool) { | |
96 | c.lock.Lock() | |
97 | defer c.lock.Unlock() | |
98 | ||
99 | c.leader = status | |
100 | c.electedCh <- status | |
101 | } | |
102 | ||
103 | func (c *Candidate) campaign(lock store.Locker) { | |
104 | defer close(c.electedCh) | |
105 | defer close(c.errCh) | |
106 | ||
107 | for { | |
108 | // Start as a follower. | |
109 | c.update(false) | |
110 | ||
111 | lostCh, err := lock.Lock(nil) | |
112 | if err != nil { | |
113 | c.errCh <- err | |
114 | return | |
115 | } | |
116 | ||
117 | // Hooray! We acquired the lock therefore we are the new leader. | |
118 | c.update(true) | |
119 | ||
120 | select { | |
121 | case <-c.resignCh: | |
122 | // We were asked to resign, give up the lock and go back | |
123 | // campaigning. | |
124 | lock.Unlock() | |
125 | case <-c.stopCh: | |
126 | // Give up the leadership and quit. | |
127 | if c.leader { | |
128 | lock.Unlock() | |
129 | } | |
130 | return | |
131 | case <-lostCh: | |
132 | // We lost the lock. Someone else is the leader, try again. | |
133 | } | |
134 | } | |
135 | } |
0 | package leadership | |
1 | ||
2 | import ( | |
3 | "testing" | |
4 | "time" | |
5 | ||
6 | libkvmock "github.com/docker/libkv/store/mock" | |
7 | "github.com/stretchr/testify/assert" | |
8 | "github.com/stretchr/testify/mock" | |
9 | ) | |
10 | ||
11 | func TestCandidate(t *testing.T) { | |
12 | kv, err := libkvmock.New([]string{}, nil) | |
13 | assert.NoError(t, err) | |
14 | assert.NotNil(t, kv) | |
15 | ||
16 | mockStore := kv.(*libkvmock.Mock) | |
17 | mockLock := &libkvmock.Lock{} | |
18 | mockStore.On("NewLock", "test_key", mock.Anything).Return(mockLock, nil) | |
19 | ||
20 | // Lock and unlock always succeeds. | |
21 | lostCh := make(chan struct{}) | |
22 | var mockLostCh <-chan struct{} = lostCh | |
23 | mockLock.On("Lock", mock.Anything).Return(mockLostCh, nil) | |
24 | mockLock.On("Unlock").Return(nil) | |
25 | ||
26 | candidate := NewCandidate(kv, "test_key", "test_node", 0) | |
27 | electedCh, _, err := candidate.RunForElection() | |
28 | assert.Nil(t, err) | |
29 | ||
30 | // Should issue a false upon start, no matter what. | |
31 | assert.False(t, <-electedCh) | |
32 | ||
33 | // Since the lock always succeeeds, we should get elected. | |
34 | assert.True(t, <-electedCh) | |
35 | assert.True(t, candidate.IsLeader()) | |
36 | ||
37 | // Signaling a lost lock should get us de-elected... | |
38 | close(lostCh) | |
39 | assert.False(t, <-electedCh) | |
40 | ||
41 | // And we should attempt to get re-elected again. | |
42 | assert.True(t, <-electedCh) | |
43 | ||
44 | // When we resign, unlock will get called, we'll be notified of the | |
45 | // de-election and we'll try to get the lock again. | |
46 | go candidate.Resign() | |
47 | assert.False(t, <-electedCh) | |
48 | assert.True(t, <-electedCh) | |
49 | ||
50 | candidate.Stop() | |
51 | ||
52 | // Ensure that the chan closes after some time | |
53 | for { | |
54 | select { | |
55 | case _, open := <-electedCh: | |
56 | if !open { | |
57 | mockStore.AssertExpectations(t) | |
58 | return | |
59 | } | |
60 | case <-time.After(1 * time.Second): | |
61 | t.Fatalf("electedCh not closed correctly") | |
62 | } | |
63 | } | |
64 | } |
0 | package leadership | |
1 | ||
2 | import ( | |
3 | "errors" | |
4 | ||
5 | "github.com/docker/libkv/store" | |
6 | ) | |
7 | ||
8 | // Follower can follow an election in real-time and push notifications whenever | |
9 | // there is a change in leadership. | |
10 | type Follower struct { | |
11 | client store.Store | |
12 | key string | |
13 | ||
14 | leader string | |
15 | leaderCh chan string | |
16 | stopCh chan struct{} | |
17 | errCh chan error | |
18 | } | |
19 | ||
20 | // NewFollower creates a new follower. | |
21 | func NewFollower(client store.Store, key string) *Follower { | |
22 | return &Follower{ | |
23 | client: client, | |
24 | key: key, | |
25 | stopCh: make(chan struct{}), | |
26 | } | |
27 | } | |
28 | ||
29 | // Leader returns the current leader. | |
30 | func (f *Follower) Leader() string { | |
31 | return f.leader | |
32 | } | |
33 | ||
34 | // FollowElection starts monitoring the election. | |
35 | func (f *Follower) FollowElection() (<-chan string, <-chan error, error) { | |
36 | f.leaderCh = make(chan string) | |
37 | f.errCh = make(chan error) | |
38 | ||
39 | ch, err := f.client.Watch(f.key, f.stopCh) | |
40 | if err != nil { | |
41 | return nil, nil, err | |
42 | } | |
43 | ||
44 | go f.follow(ch) | |
45 | ||
46 | return f.leaderCh, f.errCh, nil | |
47 | } | |
48 | ||
49 | // Stop stops monitoring an election. | |
50 | func (f *Follower) Stop() { | |
51 | close(f.stopCh) | |
52 | } | |
53 | ||
54 | func (f *Follower) follow(ch <-chan *store.KVPair) { | |
55 | defer close(f.leaderCh) | |
56 | defer close(f.errCh) | |
57 | ||
58 | f.leader = "" | |
59 | for kv := range ch { | |
60 | if kv == nil { | |
61 | continue | |
62 | } | |
63 | curr := string(kv.Value) | |
64 | if curr == f.leader { | |
65 | continue | |
66 | } | |
67 | f.leader = curr | |
68 | f.leaderCh <- f.leader | |
69 | } | |
70 | ||
71 | // Channel closed, we return an error | |
72 | f.errCh <- errors.New("Leader Election: watch leader channel closed, the store may be unavailable...") | |
73 | } |
0 | package leadership | |
1 | ||
2 | import ( | |
3 | "testing" | |
4 | ||
5 | "github.com/docker/libkv/store" | |
6 | libkvmock "github.com/docker/libkv/store/mock" | |
7 | "github.com/stretchr/testify/assert" | |
8 | "github.com/stretchr/testify/mock" | |
9 | ) | |
10 | ||
11 | func TestFollower(t *testing.T) { | |
12 | kv, err := libkvmock.New([]string{}, nil) | |
13 | assert.NoError(t, err) | |
14 | assert.NotNil(t, kv) | |
15 | ||
16 | mockStore := kv.(*libkvmock.Mock) | |
17 | ||
18 | kvCh := make(chan *store.KVPair) | |
19 | var mockKVCh <-chan *store.KVPair = kvCh | |
20 | mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil) | |
21 | ||
22 | follower := NewFollower(kv, "test_key") | |
23 | leaderCh, errCh, err := follower.FollowElection() | |
24 | assert.Nil(t, err) | |
25 | ||
26 | // Simulate leader updates | |
27 | go func() { | |
28 | kvCh <- &store.KVPair{Key: "test_key", Value: []byte("leader1")} | |
29 | kvCh <- &store.KVPair{Key: "test_key", Value: []byte("leader1")} | |
30 | kvCh <- &store.KVPair{Key: "test_key", Value: []byte("leader2")} | |
31 | kvCh <- &store.KVPair{Key: "test_key", Value: []byte("leader1")} | |
32 | }() | |
33 | ||
34 | // We shouldn't see duplicate events. | |
35 | assert.Equal(t, <-leaderCh, "leader1") | |
36 | assert.Equal(t, <-leaderCh, "leader2") | |
37 | assert.Equal(t, <-leaderCh, "leader1") | |
38 | assert.Equal(t, follower.Leader(), "leader1") | |
39 | ||
40 | // Once stopped, iteration over the leader channel should stop. | |
41 | follower.Stop() | |
42 | close(kvCh) | |
43 | ||
44 | // Assert that we receive an error from the error chan to deal with the failover | |
45 | err, open := <-errCh | |
46 | assert.True(t, open) | |
47 | assert.NotNil(t, err) | |
48 | ||
49 | // Ensure that the chan is closed | |
50 | _, open = <-leaderCh | |
51 | assert.False(t, open) | |
52 | ||
53 | mockStore.AssertExpectations(t) | |
54 | } |