Package list golang-github-go-kit-kit / 714eef7
Merge pull request #547 from seh/remove-eureka-test-race sd/eureka: remove data races in unit tests Peter Bourgon authored 4 years ago GitHub committed 4 years ago
4 changed file(s) with 156 addition(s) and 117 deletion(s). Raw diff Collapse all Expand all
3232 quitc: make(chan chan struct{}),
3333 }
3434
35 instances, err := s.getInstances()
36 if err == nil {
37 s.logger.Log("instances", len(instances))
38 } else {
39 s.logger.Log("during", "getInstances", "err", err)
40 }
41
42 s.cache.Update(sd.Event{Instances: instances, Err: err})
43 go s.loop()
35 done := make(chan struct{})
36 updates := conn.ScheduleAppUpdates(app, true, done)
37 s.consume(<-updates)
38 go s.loop(updates, done)
4439 return s
4540 }
4641
5247 s.quitc = nil
5348 }
5449
55 func (s *Instancer) loop() {
56 var (
57 await = false
58 done = make(chan struct{})
59 updatec = s.conn.ScheduleAppUpdates(s.app, await, done)
60 )
50 func (s *Instancer) consume(update fargo.AppUpdate) {
51 if update.Err != nil {
52 s.logger.Log("during", "Update", "err", update.Err)
53 s.cache.Update(sd.Event{Err: update.Err})
54 return
55 }
56 instances := convertFargoAppToInstances(update.App)
57 s.logger.Log("instances", len(instances))
58 s.cache.Update(sd.Event{Instances: instances})
59 }
60
61 func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) {
6162 defer close(done)
6263
6364 for {
6465 select {
65 case update := <-updatec:
66 if update.Err != nil {
67 s.logger.Log("during", "Update", "err", update.Err)
68 s.cache.Update(sd.Event{Err: update.Err})
69 continue
70 }
71 instances := convertFargoAppToInstances(update.App)
72 s.logger.Log("instances", len(instances))
73 s.cache.Update(sd.Event{Instances: instances})
74
66 case update := <-updates:
67 s.consume(update)
7568 case q := <-s.quitc:
7669 close(q)
7770 return
1212
1313 func TestInstancer(t *testing.T) {
1414 connection := &testConnection{
15 instances: []*fargo.Instance{instanceTest1},
16 application: appUpdateTest,
15 instances: []*fargo.Instance{instanceTest1, instanceTest2},
1716 errApplication: nil,
1817 }
1918
2019 instancer := NewInstancer(connection, appNameTest, loggerTest)
2120 defer instancer.Stop()
2221
23 state := instancer.cache.State()
22 state := instancer.state()
2423 if state.Err != nil {
2524 t.Fatal(state.Err)
2625 }
2726
28 if want, have := 1, len(state.Instances); want != have {
27 if want, have := 2, len(state.Instances); want != have {
2928 t.Errorf("want %d, have %d", want, have)
3029 }
3130 }
3231
33 func TestInstancerScheduleUpdates(t *testing.T) {
32 func TestInstancerReceivesUpdates(t *testing.T) {
3433 connection := &testConnection{
3534 instances: []*fargo.Instance{instanceTest1},
36 application: appUpdateTest,
3735 errApplication: nil,
3836 }
3937
4038 instancer := NewInstancer(connection, appNameTest, loggerTest)
4139 defer instancer.Stop()
4240
43 state := instancer.cache.State()
44 if want, have := 1, len(state.Instances); want != have {
45 t.Errorf("want %d, have %d", want, have)
41 verifyCount := func(want int) (have int, converged bool) {
42 const maxPollAttempts = 5
43 const delayPerAttempt = 200 * time.Millisecond
44 for i := 1; ; i++ {
45 state := instancer.state()
46 if have := len(state.Instances); want == have {
47 return have, true
48 } else if i == maxPollAttempts {
49 return have, false
50 }
51 time.Sleep(delayPerAttempt)
52 }
4653 }
4754
48 time.Sleep(50 * time.Millisecond)
55 if have, converged := verifyCount(1); !converged {
56 t.Fatalf("initial: want %d, have %d", 1, have)
57 }
4958
50 state = instancer.cache.State()
51 if want, have := 2, len(state.Instances); want != have {
52 t.Errorf("want %v, have %v", want, have)
59 if err := connection.RegisterInstance(instanceTest2); err != nil {
60 t.Fatalf("failed to register an instance: %v", err)
61 }
62 if have, converged := verifyCount(2); !converged {
63 t.Fatalf("after registration: want %d, have %d", 2, have)
64 }
65
66 if err := connection.DeregisterInstance(instanceTest1); err != nil {
67 t.Fatalf("failed to unregister an instance: %v", err)
68 }
69 if have, converged := verifyCount(1); !converged {
70 t.Fatalf("after deregistration: want %d, have %d", 1, have)
5371 }
5472 }
5573
56 func TestBadInstancerInstances(t *testing.T) {
74 func TestBadInstancerScheduleUpdates(t *testing.T) {
5775 connection := &testConnection{
58 instances: []*fargo.Instance{},
59 errInstances: errTest,
60 application: appUpdateTest,
61 errApplication: nil,
76 instances: []*fargo.Instance{instanceTest1},
77 errApplication: errTest,
6278 }
6379
6480 instancer := NewInstancer(connection, appNameTest, loggerTest)
6581 defer instancer.Stop()
6682
67 state := instancer.cache.State()
83 state := instancer.state()
6884 if state.Err == nil {
6985 t.Fatal("expecting error")
7086 }
7389 t.Errorf("want %d, have %d", want, have)
7490 }
7591 }
76
77 func TestBadInstancerScheduleUpdates(t *testing.T) {
78 connection := &testConnection{
79 instances: []*fargo.Instance{instanceTest1},
80 application: appUpdateTest,
81 errApplication: errTest,
82 }
83
84 instancer := NewInstancer(connection, appNameTest, loggerTest)
85 defer instancer.Stop()
86
87 state := instancer.cache.State()
88 if state.Err != nil {
89 t.Error(state.Err)
90 }
91 if want, have := 1, len(state.Instances); want != have {
92 t.Errorf("want %d, have %d", want, have)
93 }
94
95 time.Sleep(50 * time.Millisecond)
96
97 state = instancer.cache.State()
98 if state.Err == nil {
99 t.Fatal("expecting error")
100 }
101
102 if want, have := 0, len(state.Instances); want != have {
103 t.Errorf("want %v, have %v", want, have)
104 }
105 }
2727 type fargoUnsuccessfulHTTPResponse struct {
2828 statusCode int
2929 messagePrefix string
30 }
31
32 func (u *fargoUnsuccessfulHTTPResponse) Error() string {
33 return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
3034 }
3135
3236 // Registrar maintains service instance liveness information in Eureka.
109113 }
110114 }
111115
116 func httpResponseStatusCode(err error) (code int, present bool) {
117 if code, ok := fargo.HTTPResponseStatusCode(err); ok {
118 return code, true
119 }
120 // Allow injection of errors for testing.
121 if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok {
122 return u.statusCode, true
123 }
124 return 0, false
125 }
126
127 func isNotFound(err error) bool {
128 code, ok := httpResponseStatusCode(err)
129 return ok && code == http.StatusNotFound
130 }
131
112132 func (r *Registrar) heartbeat() error {
113133 err := r.conn.HeartBeatInstance(r.instance)
114 if err != nil {
115 if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok && u.statusCode == http.StatusNotFound {
116 // Instance expired (e.g. network partition). Re-register.
117 r.logger.Log("during", "heartbeat", err.Error())
118 return r.conn.ReregisterInstance(r.instance)
119 }
134 if err == nil {
135 return nil
136 }
137 if isNotFound(err) {
138 // Instance expired (e.g. network partition). Re-register.
139 return r.conn.ReregisterInstance(r.instance)
120140 }
121141 return err
122142 }
123
124 func (u *fargoUnsuccessfulHTTPResponse) Error() string {
125 return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
126 }
11
22 import (
33 "errors"
4 "fmt"
45 "reflect"
6 "sync"
7 "time"
58
69 "github.com/go-kit/kit/log"
710 "github.com/hudl/fargo"
811 )
912
1013 type testConnection struct {
11 instances []*fargo.Instance
12 application *fargo.Application
13 errInstances error
14 mu sync.RWMutex
15 instances []*fargo.Instance
16
1417 errApplication error
1518 errHeartbeat error
1619 errRegister error
2225 errNotFound = &fargoUnsuccessfulHTTPResponse{statusCode: 404, messagePrefix: "not found"}
2326 loggerTest = log.NewNopLogger()
2427 appNameTest = "go-kit"
25 appUpdateTest = &fargo.Application{
26 Name: appNameTest,
27 Instances: []*fargo.Instance{instanceTest1, instanceTest2},
28 }
2928 instanceTest1 = &fargo.Instance{
3029 HostName: "serveregistrar1.acme.org",
3130 Port: 8080,
5857 var _ fargoConnection = (*testConnection)(nil)
5958
6059 func (c *testConnection) RegisterInstance(i *fargo.Instance) error {
61 if c.errRegister == nil {
62 for _, instance := range c.instances {
63 if reflect.DeepEqual(*instance, *i) {
64 return errors.New("already registered")
65 }
60 if c.errRegister != nil {
61 return c.errRegister
62 }
63 c.mu.Lock()
64 defer c.mu.Unlock()
65 for _, instance := range c.instances {
66 if reflect.DeepEqual(*instance, *i) {
67 return errors.New("already registered")
6668 }
67
68 c.instances = append(c.instances, i)
6969 }
70 return c.errRegister
70 c.instances = append(c.instances, i)
71 return nil
7172 }
7273
7374 func (c *testConnection) HeartBeatInstance(i *fargo.Instance) error {
7576 }
7677
7778 func (c *testConnection) DeregisterInstance(i *fargo.Instance) error {
78 if c.errDeregister == nil {
79 var newInstances []*fargo.Instance
80 for _, instance := range c.instances {
81 if reflect.DeepEqual(*instance, *i) {
82 continue
83 }
84 newInstances = append(newInstances, instance)
79 if c.errDeregister != nil {
80 return c.errDeregister
81 }
82 c.mu.Lock()
83 defer c.mu.Unlock()
84 remaining := make([]*fargo.Instance, 0, len(c.instances))
85 for _, instance := range c.instances {
86 if reflect.DeepEqual(*instance, *i) {
87 continue
8588 }
86 if len(newInstances) == len(c.instances) {
87 return errors.New("not registered")
88 }
89
90 c.instances = newInstances
89 remaining = append(remaining, instance)
9190 }
92 return c.errDeregister
91 if len(remaining) == len(c.instances) {
92 return errors.New("not registered")
93 }
94 c.instances = remaining
95 return nil
9396 }
9497
9598 func (c *testConnection) ReregisterInstance(ins *fargo.Instance) error {
9699 return nil
97100 }
98101
99 func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate {
100 updatec := make(chan fargo.AppUpdate, 1)
101 updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication}
102 return updatec
102 func (c *testConnection) instancesForApplication(name string) []*fargo.Instance {
103 c.mu.RLock()
104 defer c.mu.RUnlock()
105 instances := make([]*fargo.Instance, 0, len(c.instances))
106 for _, i := range c.instances {
107 if i.App == name {
108 instances = append(instances, i)
109 }
110 }
111 return instances
103112 }
104113
105114 func (c *testConnection) GetApp(name string) (*fargo.Application, error) {
106 return &fargo.Application{Name: appNameTest, Instances: c.instances}, c.errInstances
115 if err := c.errApplication; err != nil {
116 return nil, err
117 }
118 instances := c.instancesForApplication(name)
119 if len(instances) == 0 {
120 return nil, fmt.Errorf("Application not found for name=%s", name)
121 }
122 return &fargo.Application{Name: name, Instances: instances}, nil
107123 }
124
125 func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate {
126 updatec := make(chan fargo.AppUpdate, 1)
127 send := func() {
128 app, err := c.GetApp(name)
129 select {
130 case updatec <- fargo.AppUpdate{App: app, Err: err}:
131 default:
132 }
133 }
134
135 if await {
136 send()
137 }
138 go func() {
139 ticker := time.NewTicker(100 * time.Millisecond)
140 for {
141 select {
142 case <-ticker.C:
143 send()
144 case <-done:
145 ticker.Stop()
146 return
147 }
148 }
149 }()
150 return updatec
151 }