Codebase list golang-github-go-kit-kit / e75cc1a
add test that reproduces data race in sd.endpointCache.updateCache() (#865) Jason Toffaletti authored 4 years ago Peter Bourgon committed 4 years ago
2 changed file(s) with 56 addition(s) and 4 deletion(s). Raw diff Collapse all Expand all
4040 // State returns the current state of discovery (instances or error) as sd.Event
4141 func (c *Cache) State() sd.Event {
4242 c.mtx.RLock()
43 defer c.mtx.RUnlock()
44 return c.state
43 event := c.state
44 c.mtx.RUnlock()
45 eventCopy := copyEvent(event)
46 return eventCopy
4547 }
4648
4749 // Stop implements Instancer. Since the cache is just a plain-old store of data,
5355 c.mtx.Lock()
5456 defer c.mtx.Unlock()
5557 c.reg.register(ch)
58 event := c.state
59 eventCopy := copyEvent(event)
5660 // always push the current state to new channels
57 ch <- c.state
61 ch <- eventCopy
5862 }
5963
6064 // Deregister implements Instancer.
6973
7074 func (r registry) broadcast(event sd.Event) {
7175 for c := range r {
72 c <- event
76 eventCopy := copyEvent(event)
77 c <- eventCopy
7378 }
7479 }
7580
8085 func (r registry) deregister(c chan<- sd.Event) {
8186 delete(r, c)
8287 }
88
89 // copyEvent does a deep copy on sd.Event
90 func copyEvent(e sd.Event) sd.Event {
91 // observers all need their own copy of event
92 // because they can directly modify event.Instances
93 // for example, by calling sort.Strings
94 if e.Instances == nil {
95 return e
96 }
97 instances := make([]string, len(e.Instances))
98 copy(instances, e.Instances)
99 e.Instances = instances
100 return e
101 }
00 package instance
11
22 import (
3 "context"
4 "fmt"
5 "io"
36 "reflect"
47 "testing"
58 "time"
69
10 "github.com/go-kit/kit/endpoint"
11 "github.com/go-kit/kit/log"
712 "github.com/go-kit/kit/sd"
813 )
914
7479 // if deregister didn't work, broadcast would panic on closed channels
7580 reg.broadcast(sd.Event{Instances: []string{"x", "y"}})
7681 }
82
83 // This test is meant to be run with the race detector enabled: -race.
84 // It ensures that every registered observer receives a copy
85 // of sd.Event.Instances because observers can directly modify the field.
86 // For example, endpointCache calls sort.Strings() on sd.Event.Instances.
87 func TestDataRace(t *testing.T) {
88 instances := make([]string, 0)
89 // the number of iterations here maters because we need sort.Strings to
90 // perform a Swap in doPivot -> medianOfThree to cause a data race.
91 for i := 1; i < 1000; i++ {
92 instances = append(instances, fmt.Sprintf("%v", i))
93 }
94 e1 := sd.Event{Instances: instances}
95 cache := NewCache()
96 cache.Update(e1)
97 nullEndpoint := func(_ context.Context, _ interface{}) (interface{}, error) {
98 return nil, nil
99 }
100 nullFactory := func(instance string) (endpoint.Endpoint, io.Closer, error) {
101 return nullEndpoint, nil, nil
102 }
103 logger := log.Logger(log.LoggerFunc(func(keyvals ...interface{}) error {
104 return nil
105 }))
106
107 sd.NewEndpointer(cache, nullFactory, logger)
108 sd.NewEndpointer(cache, nullFactory, logger)
109 }