Codebase list golang-github-hashicorp-serf / f6be1b4
Adds adjustment factor back in and fixes random phantom with non-zero mean. James Phillips 8 years ago
6 changed file(s) with 96 addition(s) and 38 deletion(s). Raw diff Collapse all Expand all
1818 // the algorithm.
1919 config *Config
2020
21 // adjustmentIndex is the current index into the adjustmentSamples slice.
22 adjustmentIndex uint
23
24 // adjustment is used to store samples for the adjustment calculation.
25 adjustmentSamples []float64
26
2127 // mutex enables safe concurrent access to the client.
2228 mutex *sync.RWMutex
2329 }
3642 return &Client{
3743 coord: NewCoordinate(config),
3844 config: config,
45 adjustmentIndex: 0,
46 adjustmentSamples: make([]float64, config.AdjustmentWindowSize),
3947 mutex: &sync.RWMutex{},
4048 }, nil
4149 }
4856 return c.coord.Clone()
4957 }
5058
51 // Update takes other, a coordinate for another node, and rtt, a round trip
52 // time observation for a ping to that node, and updates the estimated position of
53 // the client's coordinate.
54 func (c *Client) Update(other *Coordinate, rtt time.Duration) {
55 c.mutex.Lock()
56 defer c.mutex.Unlock()
57
59 // updateVivialdi updates the Vivaldi portion of the client's coordinate. This
60 // assumes that the mutex has been locked already.
61 func (c *Client) updateVivaldi(other *Coordinate, rttSeconds float64) {
5862 const zeroThreshold = 1.0e-6
5963
6064 dist := c.coord.DistanceTo(other)
61 rttSeconds := rtt.Seconds()
6265 if rttSeconds < zeroThreshold {
6366 rttSeconds = zeroThreshold
6467 }
7073 }
7174 weight := c.coord.Error / totalError
7275
73 c.coord.Error = c.config.VivaldiCE*weight*wrongness + c.coord.Error*(1-c.config.VivaldiCE*weight)
76 c.coord.Error = c.config.VivaldiCE*weight*wrongness + c.coord.Error*(1.0-c.config.VivaldiCE*weight)
7477 if c.coord.Error > c.config.VivaldiErrorMax {
7578 c.coord.Error = c.config.VivaldiErrorMax
7679 }
8083 c.coord = c.coord.ApplyForce(force, other)
8184 }
8285
86 // updateAdjustment updates the adjustment portion of the client's coordinate, if
87 // the feature is enabled. This assumes that the mutex has been locked already.
88 func (c *Client) updateAdjustment(other *Coordinate, rttSeconds float64) {
89 if c.config.AdjustmentWindowSize == 0 {
90 return
91 }
92
93 dist := c.coord.DistanceTo(other)
94 c.adjustmentSamples[c.adjustmentIndex] = rttSeconds - dist
95 c.adjustmentIndex = (c.adjustmentIndex + 1) % c.config.AdjustmentWindowSize
96
97 sum := 0.0
98 for _, sample := range c.adjustmentSamples {
99 sum += sample
100 }
101 c.coord.Adjustment = sum / (2.0*float64(c.config.AdjustmentWindowSize))
102 }
103
104 // Update takes other, a coordinate for another node, and rtt, a round trip
105 // time observation for a ping to that node, and updates the estimated position of
106 // the client's coordinate.
107 func (c *Client) Update(other *Coordinate, rtt time.Duration) {
108 c.mutex.Lock()
109 defer c.mutex.Unlock()
110
111 rttSeconds := rtt.Seconds()
112 c.updateVivaldi(other, rttSeconds)
113 c.updateAdjustment(other, rttSeconds)
114 }
115
83116 // DistanceTo returns the estimated RTT from the client's coordinate to other, the
84117 // coordinate for another node.
85118 func (c *Client) DistanceTo(other *Coordinate) time.Duration {
86119 c.mutex.RLock()
87120 defer c.mutex.RUnlock()
88121
89 dist := c.coord.DistanceTo(other) * secondsToNanoseconds
90 return time.Duration(dist)
122 // It's important that the adjustment values are summed here, and not down
123 // in the coordinate's DistanceTo() function, because the calculation of
124 // the adjustment is based only on the current Vivaldi distance, and not
125 // the current adjustment factors.
126 dist := c.coord.DistanceTo(other)
127 adjustedDist := dist + c.coord.Adjustment + other.Adjustment
128 if adjustedDist > 0.0 {
129 dist = adjustedDist
130 }
131 return time.Duration(dist*secondsToNanoseconds)
91132 }
4444 // client expects, given its distance.
4545 other := NewCoordinate(config)
4646 other.Vec[2] = 0.001
47 rtt := time.Duration(2.0 * other.Vec[2] * secondsToNanoseconds)
47 rtt := time.Duration(2.0*other.Vec[2]*secondsToNanoseconds)
4848 client.Update(other, rtt)
4949
5050 // The client should have scooted down to get away from it.
6666 // Fiddle a raw coordinate to put it a specific number of seconds away.
6767 other := NewCoordinate(config)
6868 other.Vec[2] = 12.345
69 expected := time.Duration(other.Vec[2] * secondsToNanoseconds)
69 expected := time.Duration(other.Vec[2]*secondsToNanoseconds)
7070 dist := client.DistanceTo(other)
7171 if dist != expected {
7272 t.Fatalf("distance doesn't match %9.6f != %9.6f", dist.Seconds(), expected.Seconds())
7373 }
74
75 // Make sure negative adjustment factors are ignored.
76 client.coord.Adjustment = -(other.Vec[2] + 0.1)
77 dist = client.DistanceTo(other)
78 if dist != expected {
79 t.Fatalf("distance doesn't match %9.6f != %9.6f", dist.Seconds(), expected.Seconds())
80 }
81
82 // Make sure positive adjustment factors affect the distance.
83 client.coord.Adjustment = 0.1
84 expected = time.Duration((other.Vec[2] + 0.1)*secondsToNanoseconds)
85 dist = client.DistanceTo(other)
86 if dist != expected {
87 t.Fatalf("distance doesn't match %9.6f != %9.6f", dist.Seconds(), expected.Seconds())
88 }
7489 }
3030 // VivaldiCC is a tuning factor that controls the maximum impact an
3131 // observation can have on a node's coordinate. See [1] for more details.
3232 VivaldiCC float64
33
34 // AdjustmentWindowSize is a tuning factor that determines how many samples
35 // we retain to calculate the adjustment factor as discussed in [3]. Setting
36 // this to zero disables this feature.
37 AdjustmentWindowSize uint
3338 }
3439
3540 // DefaultConfig returns a Config that has some default values suitable for
3641 // basic testing of the algorithm, but not tuned to any particular type of cluster.
3742 func DefaultConfig() *Config {
3843 return &Config{
39 Dimensionality: 8,
40 VivaldiErrorMax: 1.5,
41 VivaldiCE: 0.25,
42 VivaldiCC: 0.25,
44 Dimensionality: 8,
45 VivaldiErrorMax: 1.5,
46 VivaldiCE: 0.25,
47 VivaldiCC: 0.25,
48 AdjustmentWindowSize: 20,
4349 }
4450 }
6969 panic(ErrDimensionalityConflict)
7070 }
7171
72 euclidianPart := magnitude(diff(c.Vec, other.Vec))
73 adjustmentPart := c.Adjustment + other.Adjustment
74 return euclidianPart + adjustmentPart
72 return magnitude(diff(c.Vec, other.Vec))
7573 }
7674
7775 // add returns the sum of vec1 and vec2. This assumes the dimensions have
1515 truth := GenerateLine(nodes, spacing)
1616 Simulate(clients, truth, cycles, nil)
1717 stats := Evaluate(clients, truth)
18 if stats.ErrorAvg > 0.005 {
19 t.Fatalf("average error is too large, %9.6f", stats.ErrorAvg)
18 if stats.ErrorAvg > 0.004 || stats.ErrorMax > 0.015 {
19 t.Fatalf("performance stats are out of spec: %v", stats)
2020 }
2121 }
2222
3131 truth := GenerateGrid(nodes, spacing)
3232 Simulate(clients, truth, cycles, nil)
3333 stats := Evaluate(clients, truth)
34 if stats.ErrorAvg > 0.006 {
35 t.Fatalf("average error is too large, %9.6f", stats.ErrorAvg)
34 if stats.ErrorAvg > 0.005 || stats.ErrorMax > 0.051 {
35 t.Fatalf("performance stats are out of spec: %v", stats)
3636 }
3737 }
3838
4747 truth := GenerateSplit(nodes, lan, wan)
4848 Simulate(clients, truth, cycles, nil)
4949 stats := Evaluate(clients, truth)
50 if stats.ErrorAvg > 0.045 {
51 t.Fatalf("average error is too large, %9.6f", stats.ErrorAvg)
50 if stats.ErrorAvg > 0.044 || stats.ErrorMax > 0.343 {
51 t.Fatalf("performance stats are out of spec: %v", stats)
5252 }
5353 }
5454
5555 func TestPerformance_Random(t *testing.T) {
56 const max = 10*time.Millisecond
56 const mean, deviation = 100*time.Millisecond, 10*time.Millisecond
5757 const nodes, cycles = 25, 1000
5858 config := DefaultConfig()
5959 clients, err := GenerateClients(nodes, config)
6060 if err != nil {
6161 t.Fatal(err)
6262 }
63 truth := GenerateRandom(nodes, max)
63 truth := GenerateRandom(nodes, mean, deviation)
6464 Simulate(clients, truth, cycles, nil)
6565 stats := Evaluate(clients, truth)
66
67 // TODO - Currently horrible! Height and the adjustment factor should
68 // help here, so revisit once those are in.
69 if stats.ErrorAvg > 4.8 {
70 t.Fatalf("average error is too large, %9.6f", stats.ErrorAvg)
66 if stats.ErrorAvg > 0.079 || stats.ErrorMax > 0.363 {
67 t.Fatalf("performance stats are out of spec: %v", stats)
7168 }
7269 }
8383 return truth
8484 }
8585
86 // GenerateRandom returns a truth matrix for a set of nodes with random delays, up
87 // to the given max. The RNG is re-seeded so you always get the same matrix for a
88 // given size.
89 func GenerateRandom(nodes int, max time.Duration) [][]time.Duration {
86 // GenerateRandom returns a truth matrix for a set of nodes with normally
87 // distributed delays, with the given mean and deviation. The RNG is re-seeded
88 // so you always get the same matrix for a given size.
89 func GenerateRandom(nodes int, mean time.Duration, deviation time.Duration) [][]time.Duration {
9090 rand.Seed(1)
9191
9292 truth := make([][]time.Duration, nodes)
9696
9797 for i := 0; i < nodes; i++ {
9898 for j := i + 1; j < nodes; j++ {
99 rtt := time.Duration(rand.Float64() * float64(max))
99 rttSeconds := rand.NormFloat64() * deviation.Seconds() + mean.Seconds()
100 rtt := time.Duration(rttSeconds * secondsToNanoseconds)
100101 truth[i][j], truth[j][i] = rtt, rtt
101102 }
102103 }