Codebase list golang-github-go-kit-kit / 4ede06e
Simplify Eureka Instancer's initial cache priming Rather than first fetching a Eureka application's instances synchronously and then scheduling subsequent background updates, instead schedule the updates, wait on the first update to arrive, an then proceed. Revise the tests accordingly to no longer rely on a set of instances known to the (fake) server as distinct from any applications. Since applications are an intensional view of instances, like the real Eureka server we derive the presence of applications from the application names mentioned by instances. That is, without at least one instance declaring that it's a member of a given application, no such application exists. Steven E. Harris 6 years ago
3 changed file(s) with 130 addition(s) and 107 deletion(s). Raw diff Collapse all Expand all
3232 quitc: make(chan chan struct{}),
3333 }
3434
35 instances, err := s.getInstances()
36 if err == nil {
37 s.logger.Log("instances", len(instances))
38 } else {
39 s.logger.Log("during", "getInstances", "err", err)
40 }
41
42 s.cache.Update(sd.Event{Instances: instances, Err: err})
43 go s.loop()
35 done := make(chan struct{})
36 updates := conn.ScheduleAppUpdates(app, true, done)
37 s.consume(<-updates)
38 go s.loop(updates, done)
4439 return s
4540 }
4641
5247 s.quitc = nil
5348 }
5449
55 func (s *Instancer) loop() {
56 var (
57 await = false
58 done = make(chan struct{})
59 updatec = s.conn.ScheduleAppUpdates(s.app, await, done)
60 )
50 func (s *Instancer) consume(update fargo.AppUpdate) {
51 if update.Err != nil {
52 s.logger.Log("during", "Update", "err", update.Err)
53 s.cache.Update(sd.Event{Err: update.Err})
54 return
55 }
56 instances := convertFargoAppToInstances(update.App)
57 s.logger.Log("instances", len(instances))
58 s.cache.Update(sd.Event{Instances: instances})
59 }
60
61 func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) {
6162 defer close(done)
6263
6364 for {
6465 select {
65 case update := <-updatec:
66 if update.Err != nil {
67 s.logger.Log("during", "Update", "err", update.Err)
68 s.cache.Update(sd.Event{Err: update.Err})
69 continue
70 }
71 instances := convertFargoAppToInstances(update.App)
72 s.logger.Log("instances", len(instances))
73 s.cache.Update(sd.Event{Instances: instances})
74
66 case update := <-updates:
67 s.consume(update)
7568 case q := <-s.quitc:
7669 close(q)
7770 return
1212
1313 func TestInstancer(t *testing.T) {
1414 connection := &testConnection{
15 instances: []*fargo.Instance{instanceTest1},
16 application: appUpdateTest,
15 instances: []*fargo.Instance{instanceTest1, instanceTest2},
1716 errApplication: nil,
1817 }
1918
2019 instancer := NewInstancer(connection, appNameTest, loggerTest)
2120 defer instancer.Stop()
2221
23 state := instancer.cache.State()
22 state := instancer.state()
2423 if state.Err != nil {
2524 t.Fatal(state.Err)
2625 }
2726
28 if want, have := 1, len(state.Instances); want != have {
27 if want, have := 2, len(state.Instances); want != have {
2928 t.Errorf("want %d, have %d", want, have)
3029 }
3130 }
3231
33 func TestInstancerScheduleUpdates(t *testing.T) {
32 func TestInstancerReceivesUpdates(t *testing.T) {
3433 connection := &testConnection{
3534 instances: []*fargo.Instance{instanceTest1},
36 application: appUpdateTest,
3735 errApplication: nil,
3836 }
3937
4038 instancer := NewInstancer(connection, appNameTest, loggerTest)
4139 defer instancer.Stop()
4240
43 state := instancer.cache.State()
44 if want, have := 1, len(state.Instances); want != have {
45 t.Errorf("want %d, have %d", want, have)
41 verifyCount := func(want int) (have int, converged bool) {
42 const maxPollAttempts = 5
43 const delayPerAttempt = 200 * time.Millisecond
44 for i := 1; ; i++ {
45 state := instancer.state()
46 if have := len(state.Instances); want == have {
47 return have, true
48 } else if i == maxPollAttempts {
49 return have, false
50 }
51 time.Sleep(delayPerAttempt)
52 }
4653 }
4754
48 time.Sleep(50 * time.Millisecond)
55 if have, converged := verifyCount(1); !converged {
56 t.Fatalf("initial: want %d, have %d", 1, have)
57 }
4958
50 state = instancer.cache.State()
51 if want, have := 2, len(state.Instances); want != have {
52 t.Errorf("want %v, have %v", want, have)
59 if err := connection.RegisterInstance(instanceTest2); err != nil {
60 t.Fatalf("failed to register an instance: %v", err)
61 }
62 if have, converged := verifyCount(2); !converged {
63 t.Fatalf("after registration: want %d, have %d", 2, have)
64 }
65
66 if err := connection.DeregisterInstance(instanceTest1); err != nil {
67 t.Fatalf("failed to unregister an instance: %v", err)
68 }
69 if have, converged := verifyCount(1); !converged {
70 t.Fatalf("after deregistration: want %d, have %d", 1, have)
5371 }
5472 }
5573
56 func TestBadInstancerInstances(t *testing.T) {
74 func TestBadInstancerScheduleUpdates(t *testing.T) {
5775 connection := &testConnection{
58 instances: []*fargo.Instance{},
59 errInstances: errTest,
60 application: appUpdateTest,
61 errApplication: nil,
76 instances: []*fargo.Instance{instanceTest1},
77 errApplication: errTest,
6278 }
6379
6480 instancer := NewInstancer(connection, appNameTest, loggerTest)
6581 defer instancer.Stop()
6682
67 state := instancer.cache.State()
83 state := instancer.state()
6884 if state.Err == nil {
6985 t.Fatal("expecting error")
7086 }
7389 t.Errorf("want %d, have %d", want, have)
7490 }
7591 }
76
77 func TestBadInstancerScheduleUpdates(t *testing.T) {
78 connection := &testConnection{
79 instances: []*fargo.Instance{instanceTest1},
80 application: appUpdateTest,
81 errApplication: errTest,
82 }
83
84 instancer := NewInstancer(connection, appNameTest, loggerTest)
85 defer instancer.Stop()
86
87 state := instancer.cache.State()
88 if state.Err != nil {
89 t.Error(state.Err)
90 }
91 if want, have := 1, len(state.Instances); want != have {
92 t.Errorf("want %d, have %d", want, have)
93 }
94
95 time.Sleep(50 * time.Millisecond)
96
97 state = instancer.cache.State()
98 if state.Err == nil {
99 t.Fatal("expecting error")
100 }
101
102 if want, have := 0, len(state.Instances); want != have {
103 t.Errorf("want %v, have %v", want, have)
104 }
105 }
11
22 import (
33 "errors"
4 "fmt"
45 "reflect"
6 "sync"
7 "time"
58
69 "github.com/go-kit/kit/log"
710 "github.com/hudl/fargo"
811 )
912
1013 type testConnection struct {
11 instances []*fargo.Instance
12 application *fargo.Application
13 errInstances error
14 mu sync.RWMutex
15 instances []*fargo.Instance
16
1417 errApplication error
1518 errHeartbeat error
1619 errRegister error
2225 errNotFound = &fargoUnsuccessfulHTTPResponse{statusCode: 404, messagePrefix: "not found"}
2326 loggerTest = log.NewNopLogger()
2427 appNameTest = "go-kit"
25 appUpdateTest = &fargo.Application{
26 Name: appNameTest,
27 Instances: []*fargo.Instance{instanceTest1, instanceTest2},
28 }
2928 instanceTest1 = &fargo.Instance{
3029 HostName: "serveregistrar1.acme.org",
3130 Port: 8080,
5857 var _ fargoConnection = (*testConnection)(nil)
5958
6059 func (c *testConnection) RegisterInstance(i *fargo.Instance) error {
61 if c.errRegister == nil {
62 for _, instance := range c.instances {
63 if reflect.DeepEqual(*instance, *i) {
64 return errors.New("already registered")
65 }
60 if c.errRegister != nil {
61 return c.errRegister
62 }
63 c.mu.Lock()
64 defer c.mu.Unlock()
65 for _, instance := range c.instances {
66 if reflect.DeepEqual(*instance, *i) {
67 return errors.New("already registered")
6668 }
67
68 c.instances = append(c.instances, i)
6969 }
70 return c.errRegister
70 c.instances = append(c.instances, i)
71 return nil
7172 }
7273
7374 func (c *testConnection) HeartBeatInstance(i *fargo.Instance) error {
7576 }
7677
7778 func (c *testConnection) DeregisterInstance(i *fargo.Instance) error {
78 if c.errDeregister == nil {
79 var newInstances []*fargo.Instance
80 for _, instance := range c.instances {
81 if reflect.DeepEqual(*instance, *i) {
82 continue
83 }
84 newInstances = append(newInstances, instance)
79 if c.errDeregister != nil {
80 return c.errDeregister
81 }
82 c.mu.Lock()
83 defer c.mu.Unlock()
84 remaining := make([]*fargo.Instance, 0, len(c.instances))
85 for _, instance := range c.instances {
86 if reflect.DeepEqual(*instance, *i) {
87 continue
8588 }
86 if len(newInstances) == len(c.instances) {
87 return errors.New("not registered")
88 }
89
90 c.instances = newInstances
89 remaining = append(remaining, instance)
9190 }
92 return c.errDeregister
91 if len(remaining) == len(c.instances) {
92 return errors.New("not registered")
93 }
94 c.instances = remaining
95 return nil
9396 }
9497
9598 func (c *testConnection) ReregisterInstance(ins *fargo.Instance) error {
9699 return nil
97100 }
98101
99 func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate {
100 updatec := make(chan fargo.AppUpdate, 1)
101 updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication}
102 return updatec
102 func (c *testConnection) instancesForApplication(name string) []*fargo.Instance {
103 c.mu.RLock()
104 defer c.mu.RUnlock()
105 instances := make([]*fargo.Instance, 0, len(c.instances))
106 for _, i := range c.instances {
107 if i.App == name {
108 instances = append(instances, i)
109 }
110 }
111 return instances
103112 }
104113
105114 func (c *testConnection) GetApp(name string) (*fargo.Application, error) {
106 return &fargo.Application{Name: appNameTest, Instances: c.instances}, c.errInstances
115 if err := c.errApplication; err != nil {
116 return nil, err
117 }
118 instances := c.instancesForApplication(name)
119 if len(instances) == 0 {
120 return nil, fmt.Errorf("Application not found for name=%s", name)
121 }
122 return &fargo.Application{Name: name, Instances: instances}, nil
107123 }
124
125 func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate {
126 updatec := make(chan fargo.AppUpdate, 1)
127 send := func() {
128 app, err := c.GetApp(name)
129 select {
130 case updatec <- fargo.AppUpdate{App: app, Err: err}:
131 default:
132 }
133 }
134
135 if await {
136 send()
137 }
138 go func() {
139 ticker := time.NewTicker(100 * time.Millisecond)
140 for {
141 select {
142 case <-ticker.C:
143 send()
144 case <-done:
145 ticker.Stop()
146 return
147 }
148 }
149 }()
150 return updatec
151 }