New Upstream Snapshot - golang-github-weaveworks-mesh
Ready changes
Summary
Merged new upstream version: 0.4 (was: 0.1+git20180323.0c91e69).
Resulting package
Built on 2022-10-16T20:05 (took 3m40s)
The resulting binary packages can be installed (if you have the apt repository enabled) by running one of:
apt install -t fresh-snapshots golang-github-weaveworks-mesh-dev
Lintian Result
Diff
diff --git a/.circleci/config.yml b/.circleci/config.yml
new file mode 100644
index 0000000..40ec684
--- /dev/null
+++ b/.circleci/config.yml
@@ -0,0 +1,17 @@
+version: 2.1
+
+jobs:
+ build:
+ docker:
+ - image: golang:1.13.1-stretch
+ working_directory: /go/src/github.com/weaveworks/mesh
+ steps:
+ - checkout
+ - run:
+ name: Lint
+ command: |
+ ./lint
+ - run:
+ name: Test
+ command: |
+ go test -v
diff --git a/README.md b/README.md
index 856768f..4b23a3f 100644
--- a/README.md
+++ b/README.md
@@ -77,3 +77,12 @@ All contributions should be made as pull requests that satisfy the guidelines, b
In addition, several mechanical checks are enforced.
See [the lint script](/lint) for details.
+## <a name="help"></a>Getting Help
+
+If you have any questions about, feedback for or problems with `mesh`:
+
+- Invite yourself to the <a href="https://slack.weave.works/" target="_blank">Weave Users Slack</a>.
+- Ask a question on the [#general](https://weave-community.slack.com/messages/general/) slack channel.
+- [File an issue](https://github.com/weaveworks/mesh/issues/new).
+
+Your feedback is always welcome!
diff --git a/circle.yml b/circle.yml
deleted file mode 100644
index 6f4df80..0000000
--- a/circle.yml
+++ /dev/null
@@ -1,4 +0,0 @@
-test:
- pre:
- - ./lint
-
diff --git a/debian/changelog b/debian/changelog
index 5a389b2..2060867 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,9 @@
+golang-github-weaveworks-mesh (0.4-1) UNRELEASED; urgency=low
+
+ * New upstream release.
+
+ -- Debian Janitor <janitor@jelmer.uk> Sun, 16 Oct 2022 20:02:17 -0000
+
golang-github-weaveworks-mesh (0.1+git20180323.0c91e69-1) unstable; urgency=medium
[ Alexandre Viau ]
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..8ebf605
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,8 @@
+module github.com/weaveworks/mesh
+
+go 1.12
+
+require (
+ github.com/stretchr/testify v1.4.0
+ golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..8ab1f0e
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,17 @@
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc h1:c0o/qxkaO2LF5t6fQrT4b5hzyggAkLLlCUjqfRxd8Q4=
+golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/gossip.go b/gossip.go
index fe90d13..44bd6ed 100644
--- a/gossip.go
+++ b/gossip.go
@@ -22,6 +22,9 @@ type Gossip interface {
//
// TODO(pb): rename to Broadcast?
GossipBroadcast(update GossipData)
+
+ // GossipNeighbourSubset emits a message to subset of neighbour peers in the mesh.
+ GossipNeighbourSubset(update GossipData)
}
// Gossiper is the receiving interface.
diff --git a/gossip_channel.go b/gossip_channel.go
index 2ff512e..d95688b 100644
--- a/gossip_channel.go
+++ b/gossip_channel.go
@@ -83,6 +83,12 @@ func (c *gossipChannel) GossipBroadcast(update GossipData) {
c.relayBroadcast(c.ourself.Name, update)
}
+// GossipNeighbourSubset implements Gossip, relaying update to subset of members of the
+// channel.
+func (c *gossipChannel) GossipNeighbourSubset(update GossipData) {
+ c.relay(c.ourself.Name, update)
+}
+
// Send relays data into the channel topology via random neighbours.
func (c *gossipChannel) Send(data GossipData) {
c.relay(c.ourself.Name, data)
diff --git a/gossip_test.go b/gossip_test.go
index dd9ae70..d7ddd27 100644
--- a/gossip_test.go
+++ b/gossip_test.go
@@ -4,6 +4,7 @@ import (
"fmt"
"io/ioutil"
"log"
+ "math"
"sync"
"testing"
@@ -68,14 +69,25 @@ func sendPendingGossip(routers ...*Router) {
}
}
-func addTestGossipConnection(r1, r2 *Router) {
- c1 := r1.newTestGossipConnection(r2)
- c2 := r2.newTestGossipConnection(r1)
+func sendPendingTopologyUpdates(routers ...*Router) {
+ for _, router := range routers {
+ router.Ourself.Lock()
+ pendingUpdate := router.Ourself.pendingTopologyUpdate
+ router.Ourself.Unlock()
+ if pendingUpdate {
+ router.Ourself.broadcastPendingTopologyUpdates()
+ }
+ }
+}
+
+func addTestGossipConnection(t require.TestingT, r1, r2 *Router) {
+ c1 := r1.newTestGossipConnection(t, r2)
+ c2 := r2.newTestGossipConnection(t, r1)
c1.Start()
c2.Start()
}
-func (router *Router) newTestGossipConnection(r *Router) *mockGossipConnection {
+func (router *Router) newTestGossipConnection(t require.TestingT, r *Router) *mockGossipConnection {
to := r.Ourself.Peer
toPeer := newPeer(to.Name, to.NickName, to.UID, 0, to.ShortID)
toPeer = router.Peers.fetchWithDefault(toPeer) // Has side-effect of incrementing refcount
@@ -86,7 +98,7 @@ func (router *Router) newTestGossipConnection(r *Router) *mockGossipConnection {
start: make(chan struct{}),
}
conn.senders = newGossipSenders(conn, make(chan struct{}))
- router.Ourself.handleAddConnection(conn, false)
+ require.NoError(t, router.Ourself.handleAddConnection(conn, false))
router.Ourself.handleConnectionEstablished(conn)
return conn
}
@@ -121,6 +133,7 @@ func checkTopology(t *testing.T, router *Router, wantedPeers ...*Peer) {
}
func flushAndCheckTopology(t *testing.T, routers []*Router, wantedPeers ...*Peer) {
+ sendPendingTopologyUpdates(routers...)
sendPendingGossip(routers...)
for _, r := range routers {
checkTopology(t, r, wantedPeers...)
@@ -138,15 +151,15 @@ func TestGossipTopology(t *testing.T) {
checkTopology(t, r2, r2.tp())
// Now try adding some connections
- addTestGossipConnection(r1, r2)
+ addTestGossipConnection(t, r1, r2)
sendPendingGossip(r1, r2)
checkTopology(t, r1, r1.tp(r2), r2.tp(r1))
checkTopology(t, r2, r1.tp(r2), r2.tp(r1))
- addTestGossipConnection(r2, r3)
+ addTestGossipConnection(t, r2, r3)
flushAndCheckTopology(t, routers, r1.tp(r2), r2.tp(r1, r3), r3.tp(r2))
- addTestGossipConnection(r3, r1)
+ addTestGossipConnection(t, r3, r1)
flushAndCheckTopology(t, routers, r1.tp(r2, r3), r2.tp(r1, r3), r3.tp(r1, r2))
// Drop the connection from 2 to 3
@@ -155,7 +168,9 @@ func TestGossipTopology(t *testing.T) {
// Drop the connection from 1 to 3
r1.DeleteTestGossipConnection(r3)
+ sendPendingTopologyUpdates(routers...)
sendPendingGossip(r1, r2, r3)
+ forcePendingGC(r1, r2, r3)
checkTopology(t, r1, r1.tp(r2), r2.tp(r1))
checkTopology(t, r2, r1.tp(r2), r2.tp(r1))
// r3 still thinks r1 has a connection to it
@@ -168,8 +183,8 @@ func TestGossipSurrogate(t *testing.T) {
r2 := newTestRouter(t, "02:00:00:02:00:00")
r3 := newTestRouter(t, "03:00:00:03:00:00")
routers := []*Router{r1, r2, r3}
- addTestGossipConnection(r1, r2)
- addTestGossipConnection(r3, r2)
+ addTestGossipConnection(t, r1, r2)
+ addTestGossipConnection(t, r3, r2)
flushAndCheckTopology(t, routers, r1.tp(r2), r2.tp(r1, r3), r3.tp(r2))
// create a gossiper at either end, but not the middle
@@ -257,3 +272,40 @@ func (g *testGossiper) checkHas(t *testing.T, vs ...byte) {
func broadcast(s Gossip, v byte) {
s.GossipBroadcast(newSurrogateGossipData([]byte{v}))
}
+
+func TestRandomNeighbours(t *testing.T) {
+ const nTrials = 5000
+ ourself := PeerName(0) // aliased with UnknownPeerName, which is ok here
+ // Check fairness of selection across different-sized sets
+ for _, test := range []struct{ nPeers, nNeighbours int }{{1, 0}, {2, 1}, {3, 2}, {10, 2}, {10, 3}, {10, 9}, {100, 2}, {100, 99}} {
+ t.Run(fmt.Sprint(test.nPeers, "_peers_", test.nNeighbours, "_neighbours"), func(t *testing.T) {
+ // Create a test fixture with unicastAll set up
+ r := routes{
+ unicastAll: make(unicastRoutes, test.nPeers),
+ }
+ // The route to 'ourself' is always via 'unknown'
+ r.unicastAll[ourself] = UnknownPeerName
+ // Fully-connected: unicast route to X is via X
+ for i := 1; i < test.nPeers; i++ {
+ r.unicastAll[PeerName(i)] = PeerName(i%test.nNeighbours + 1)
+ }
+ total := 0
+ counts := make([]int, test.nNeighbours+1)
+ // Run randomNeighbours() several times, and count the distribution
+ for trial := 0; trial < nTrials; trial++ {
+ targets := r.randomNeighbours(ourself)
+ expected := int(math.Min(2*math.Log2(float64(test.nPeers)), float64(test.nNeighbours)))
+ require.Equal(t, expected, len(targets))
+ total += len(targets)
+ for _, p := range targets {
+ counts[p]++
+ }
+ }
+ require.Equal(t, 0, counts[ourself], "randomNeighbours should not select source peer")
+ // Check that each neighbour was picked within 20% of an average count
+ for i := 1; i < test.nNeighbours+1; i++ {
+ require.InEpsilon(t, float64(total)/float64(test.nNeighbours), counts[i], 0.2, "peer %d picked %d times out of %d; counts %v", i, counts[i], total, counts)
+ }
+ })
+ }
+}
diff --git a/lint b/lint
index ec403c5..5bbb9e8 100755
--- a/lint
+++ b/lint
@@ -4,19 +4,10 @@ set -o errexit
set -o nounset
set -o pipefail
-if [ ! $(command -v gometalinter) ]
+if [ ! $(command -v golangci-lint) ]
then
- go get github.com/alecthomas/gometalinter
- gometalinter --install --vendor
+ curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b $(go env GOPATH)/bin v1.20.0
+ golangci-lint --version
fi
-gometalinter \
- --exclude='error return value not checked.*(Close|Log|Print).*\(errcheck\)$' \
- --exclude='.*_test\.go:.*error return value not checked.*\(errcheck\)$' \
- --exclude='duplicate of.*_test.go.*\(dupl\)$' \
- --disable=aligncheck \
- --disable=gotype \
- --disable=gas \
- --cyclo-over=20 \
- --tests \
- --deadline=20s
+golangci-lint run
diff --git a/local_peer.go b/local_peer.go
index 7e633c9..7b36da1 100644
--- a/local_peer.go
+++ b/local_peer.go
@@ -8,13 +8,20 @@ import (
"time"
)
+const (
+ deferTopologyUpdateDuration = 1 * time.Second
+)
+
// localPeer is the only "active" peer in the mesh. It extends Peer with
// additional behaviors, mostly to retrieve and manage connection state.
type localPeer struct {
sync.RWMutex
*Peer
- router *Router
- actionChan chan<- localPeerAction
+ router *Router
+ actionChan chan<- localPeerAction
+ topologyUpdates peerNameSet
+ timer *time.Timer
+ pendingTopologyUpdate bool
}
// The actor closure used by localPeer.
@@ -23,11 +30,15 @@ type localPeerAction func()
// newLocalPeer returns a usable LocalPeer.
func newLocalPeer(name PeerName, nickName string, router *Router) *localPeer {
actionChan := make(chan localPeerAction, ChannelSize)
+ topologyUpdates := make(peerNameSet)
peer := &localPeer{
- Peer: newPeer(name, nickName, randomPeerUID(), 0, randomPeerShortID()),
- router: router,
- actionChan: actionChan,
+ Peer: newPeer(name, nickName, randomPeerUID(), 0, randomPeerShortID()),
+ router: router,
+ actionChan: actionChan,
+ topologyUpdates: topologyUpdates,
+ timer: time.NewTimer(deferTopologyUpdateDuration),
}
+ peer.timer.Stop()
go peer.actorLoop(actionChan)
return peer
}
@@ -136,6 +147,10 @@ func (peer *localPeer) encode(enc *gob.Encoder) {
// ACTOR server
func (peer *localPeer) actorLoop(actionChan <-chan localPeerAction) {
+ gossipInterval := defaultGossipInterval
+ if peer.router != nil {
+ gossipInterval = peer.router.gossipInterval()
+ }
gossipTimer := time.Tick(gossipInterval)
for {
select {
@@ -143,10 +158,22 @@ func (peer *localPeer) actorLoop(actionChan <-chan localPeerAction) {
action()
case <-gossipTimer:
peer.router.sendAllGossip()
+ case <-peer.timer.C:
+ peer.broadcastPendingTopologyUpdates()
}
}
}
+func (peer *localPeer) broadcastPendingTopologyUpdates() {
+ peer.Lock()
+ gossipData := peer.topologyUpdates
+ peer.topologyUpdates = make(peerNameSet)
+ peer.pendingTopologyUpdate = false
+ peer.Unlock()
+ gossipData[peer.Peer.Name] = struct{}{}
+ peer.router.broadcastTopologyUpdate(gossipData)
+}
+
func (peer *localPeer) handleAddConnection(conn ourConnection, isRestartedPeer bool) error {
if peer.Peer != conn.getLocal() {
panic("Attempt made to add connection to peer where peer is not the source of connection")
@@ -190,7 +217,6 @@ func (peer *localPeer) handleAddConnection(conn ourConnection, isRestartedPeer b
conn.logf("connection added (new peer)")
peer.router.sendAllGossipDown(conn)
}
-
peer.router.Routes.recalculate()
peer.broadcastPeerUpdate(conn.Remote())
@@ -240,7 +266,15 @@ func (peer *localPeer) broadcastPeerUpdate(peers ...*Peer) {
// context of a test, but that will involve significant
// reworking of tests.
if peer.router != nil {
- peer.router.broadcastTopologyUpdate(append(peers, peer.Peer))
+ peer.Lock()
+ defer peer.Unlock()
+ if !peer.pendingTopologyUpdate {
+ peer.timer.Reset(deferTopologyUpdateDuration)
+ peer.pendingTopologyUpdate = true
+ }
+ for _, p := range peers {
+ peer.topologyUpdates[p.Name] = struct{}{}
+ }
}
}
diff --git a/peer_test.go b/peer_test.go
index 3950731..deb9ea0 100644
--- a/peer_test.go
+++ b/peer_test.go
@@ -13,3 +13,16 @@ func TestPeerRoutes(t *testing.T) {
func TestPeerForEachConnectedPeer(t *testing.T) {
t.Skip("TODO")
}
+
+func forcePendingGC(routers ...*Router) {
+ for _, router := range routers {
+ router.Peers.Lock()
+ if router.Peers.pendingGC {
+ var pending peersPendingNotifications
+ router.Peers.garbageCollect(&pending)
+ router.Peers.unlockAndNotify(&pending)
+ } else {
+ router.Peers.Unlock()
+ }
+ }
+}
diff --git a/peers.go b/peers.go
index 1190450..c10b031 100644
--- a/peers.go
+++ b/peers.go
@@ -6,6 +6,11 @@ import (
"io"
"math/rand"
"sync"
+ "time"
+)
+
+const (
+ gcInterval = 1 * time.Second
)
// Peers collects all of the known peers in the mesh, including ourself.
@@ -18,6 +23,8 @@ type Peers struct {
// Called when the mapping from short IDs to peers changes
onInvalidateShortIDs []func()
+ timer *time.Timer
+ pendingGC bool
}
type shortIDPeers struct {
@@ -60,8 +67,11 @@ func newPeers(ourself *localPeer) *Peers {
ourself: ourself,
byName: make(map[PeerName]*Peer),
byShortID: make(map[PeerShortID]shortIDPeers),
+ timer: time.NewTimer(gcInterval),
}
peers.fetchWithDefault(ourself.Peer)
+ peers.timer.Stop()
+ go peers.actorLoop()
return peers
}
@@ -339,6 +349,15 @@ func (peers *Peers) forEach(fun func(*Peer)) {
}
}
+func (peers *Peers) actorLoop() {
+ for range peers.timer.C {
+ peers.GarbageCollect()
+ peers.Lock()
+ peers.pendingGC = false
+ peers.Unlock()
+ }
+}
+
// Merge an incoming update with our own topology.
//
// We add peers hitherto unknown to us, and update peers for which the
@@ -363,7 +382,6 @@ func (peers *Peers) applyUpdate(update []byte) (peerNameSet, peerNameSet, error)
// Now apply the updates
newUpdate := peers.applyDecodedUpdate(decodedUpdate, decodedConns, &pending)
- peers.garbageCollect(&pending)
for _, peerRemoved := range pending.removed {
delete(newUpdate, peerRemoved.Name)
}
@@ -373,6 +391,13 @@ func (peers *Peers) applyUpdate(update []byte) (peerNameSet, peerNameSet, error)
updateNames[peer.Name] = struct{}{}
}
+ if !peers.pendingGC {
+ // schedule a GarbageCollect() to run after gcInterval time period
+ // corresponding to all topology updates received during the period
+ peers.timer.Reset(gcInterval)
+ peers.pendingGC = true
+ }
+
return updateNames, newUpdate, nil
}
diff --git a/peers_test.go b/peers_test.go
index c9bec85..61de2bd 100644
--- a/peers_test.go
+++ b/peers_test.go
@@ -31,7 +31,8 @@ func checkApplyUpdate(t *testing.T, peers *Peers) {
// into it.
_, testBedPeers := newNode(dummyName)
testBedPeers.AddTestConnection(peers.ourself.Peer)
- testBedPeers.applyUpdate(peers.encodePeers(peers.names()))
+ _, _, err := testBedPeers.applyUpdate(peers.encodePeers(peers.names()))
+ require.NoError(t, err)
checkTopologyPeers(t, true, testBedPeers.allPeersExcept(dummyName), peers.allPeers()...)
}
@@ -296,13 +297,15 @@ func TestShortIDPropagation(t *testing.T) {
_, peers2 := newNode(PeerName(2))
peers1.AddTestConnection(peers2.ourself.Peer)
- peers1.applyUpdate(peers2.encodePeers(peers2.names()))
+ _, _, err := peers1.applyUpdate(peers2.encodePeers(peers2.names()))
+ require.NoError(t, err)
peers12 := peers1.Fetch(PeerName(2))
old := peers12.peerSummary
require.True(t,
peers2.reassignLocalShortID(&peersPendingNotifications{}))
- peers1.applyUpdate(peers2.encodePeers(peers2.names()))
+ _, _, err = peers1.applyUpdate(peers2.encodePeers(peers2.names()))
+ require.NoError(t, err)
require.NotEqual(t, old.Version, peers12.Version)
require.NotEqual(t, old.ShortID, peers12.ShortID)
}
@@ -322,15 +325,18 @@ func TestShortIDCollision(t *testing.T) {
peers3.AddTestConnection(peers2.ourself.Peer)
// Propogate from 1 to 2 to 3
- peers2.applyUpdate(peers1.encodePeers(peers1.names()))
- peers3.applyUpdate(peers2.encodePeers(peers2.names()))
+ _, _, err := peers2.applyUpdate(peers1.encodePeers(peers1.names()))
+ require.NoError(t, err)
+ _, _, err = peers3.applyUpdate(peers2.encodePeers(peers2.names()))
+ require.NoError(t, err)
// Force the short id of peer 1 to collide with peer 2. Peer
// 1 has the lowest name, so it gets to keep the short id
peers1.setLocalShortID(2, &pending)
oldShortID := peers2.ourself.ShortID
- _, updated, _ := peers2.applyUpdate(peers1.encodePeers(peers1.names()))
+ _, updated, err := peers2.applyUpdate(peers1.encodePeers(peers1.names()))
+ require.NoError(t, err)
// peer 2 should have noticed the collision and resolved it
require.NotEqual(t, oldShortID, peers2.ourself.ShortID)
@@ -341,7 +347,8 @@ func TestShortIDCollision(t *testing.T) {
updated[PeerName(2)] = struct{}{}
// the update from peer 2 should include its short id change
- peers3.applyUpdate(peers2.encodePeers(updated))
+ _, _, err = peers3.applyUpdate(peers2.encodePeers(updated))
+ require.NoError(t, err)
require.Equal(t, peers2.ourself.ShortID,
peers3.Fetch(PeerName(2)).ShortID)
}
diff --git a/router.go b/router.go
index 27be495..9af5418 100644
--- a/router.go
+++ b/router.go
@@ -17,14 +17,15 @@ var (
// ChannelSize is the buffer size used by so-called actor goroutines
// throughout mesh.
ChannelSize = 16
+
+ defaultGossipInterval = 30 * time.Second
)
const (
tcpHeartbeat = 30 * time.Second
- gossipInterval = 30 * time.Second
maxDuration = time.Duration(math.MaxInt64)
- acceptMaxTokens = 100
- acceptTokenDelay = 100 * time.Millisecond // [2]
+ acceptMaxTokens = 20
+ acceptTokenDelay = 50 * time.Millisecond
)
// Config defines dimensions of configuration for the router.
@@ -37,6 +38,7 @@ type Config struct {
ProtocolMinVersion byte
PeerDiscovery bool
TrustedSubnets []*net.IPNet
+ GossipInterval *time.Duration
}
// Router manages communication between this peer and the rest of the mesh.
@@ -154,7 +156,7 @@ func (router *Router) gossipChannel(channelName string) *gossipChannel {
if channel, found = router.gossipChannels[channelName]; found {
return channel
}
- channel = newGossipChannel(channelName, router.Ourself, router.Routes, &surrogateGossiper{}, router.logger)
+ channel = newGossipChannel(channelName, router.Ourself, router.Routes, &surrogateGossiper{router: router}, router.logger)
channel.logf("created surrogate channel")
router.gossipChannels[channelName] = channel
return channel
@@ -170,6 +172,14 @@ func (router *Router) gossipChannelSet() map[*gossipChannel]struct{} {
return channels
}
+func (router *Router) gossipInterval() time.Duration {
+ if router.Config.GossipInterval != nil {
+ return *router.Config.GossipInterval
+ } else {
+ return defaultGossipInterval
+ }
+}
+
func (router *Router) handleGossip(tag protocolTag, payload []byte) error {
decoder := gob.NewDecoder(bytes.NewReader(payload))
var channelName string
@@ -221,12 +231,9 @@ func (router *Router) sendPendingGossip() bool {
// BroadcastTopologyUpdate is invoked whenever there is a change to the mesh
// topology, and broadcasts the new set of peers to the mesh.
-func (router *Router) broadcastTopologyUpdate(update []*Peer) {
- names := make(peerNameSet)
- for _, p := range update {
- names[p.Name] = struct{}{}
- }
- router.topologyGossip.GossipBroadcast(&topologyGossipData{peers: router.Peers, update: names})
+func (router *Router) broadcastTopologyUpdate(update peerNameSet) {
+ gossipData := &topologyGossipData{peers: router.Peers, update: update}
+ router.topologyGossip.GossipNeighbourSubset(gossipData)
}
// OnGossipUnicast implements Gossiper, but always returns an error, as a
diff --git a/routes.go b/routes.go
index 10ff315..0e72d0f 100644
--- a/routes.go
+++ b/routes.go
@@ -2,7 +2,9 @@ package mesh
import (
"math"
+ "math/rand"
"sync"
+ "time"
)
type unicastRoutes map[PeerName]PeerName
@@ -11,23 +13,29 @@ type broadcastRoutes map[PeerName][]PeerName
// routes aggregates unicast and broadcast routes for our peer.
type routes struct {
sync.RWMutex
- ourself *localPeer
- peers *Peers
- onChange []func()
- unicast unicastRoutes
- unicastAll unicastRoutes // [1]
- broadcast broadcastRoutes
- broadcastAll broadcastRoutes // [1]
- recalc chan<- *struct{}
- wait chan<- chan struct{}
- action chan<- func()
+ ourself *localPeer
+ peers *Peers
+ onChange []func()
+ unicast unicastRoutes
+ unicastAll unicastRoutes // [1]
+ broadcast broadcastRoutes
+ broadcastAll broadcastRoutes // [1]
+ recalcTimer *time.Timer
+ pendingRecalc bool
+ wait chan chan struct{}
+ action chan<- func()
// [1] based on *all* connections, not just established &
// symmetric ones
}
+const (
+ // We defer recalculation requests by up to 100ms, in order to
+ // coalesce multiple recalcs together.
+ recalcDeferTime = 100 * time.Millisecond
+)
+
// newRoutes returns a usable Routes based on the LocalPeer and existing Peers.
func newRoutes(ourself *localPeer, peers *Peers) *routes {
- recalculate := make(chan *struct{}, 1)
wait := make(chan chan struct{})
action := make(chan func())
r := &routes{
@@ -37,11 +45,12 @@ func newRoutes(ourself *localPeer, peers *Peers) *routes {
unicastAll: unicastRoutes{ourself.Name: UnknownPeerName},
broadcast: broadcastRoutes{ourself.Name: []PeerName{}},
broadcastAll: broadcastRoutes{ourself.Name: []PeerName{}},
- recalc: recalculate,
+ recalcTimer: time.NewTimer(time.Hour),
wait: wait,
action: action,
}
- go r.run(recalculate, wait, action)
+ r.recalcTimer.Stop()
+ go r.run(wait, action)
return r
}
@@ -119,7 +128,7 @@ func (r *routes) lookupOrCalculate(name PeerName, broadcast *broadcastRoutes, es
return <-res
}
-// RandomNeighbours chooses min(log2(n_peers), n_neighbouring_peers)
+// RandomNeighbours chooses min(2 log2(n_peers), n_neighbouring_peers)
// neighbours, with a random distribution that is topology-sensitive,
// favouring neighbours at the end of "bottleneck links". We determine the
// latter based on the unicast routing table. If a neighbour appears as the
@@ -127,61 +136,86 @@ func (r *routes) lookupOrCalculate(name PeerName, broadcast *broadcastRoutes, es
// proportion of peers via that neighbour than other neighbours - then it is
// chosen with a higher probability.
//
-// Note that we choose log2(n_peers) *neighbours*, not peers. Consequently, on
+// Note that we choose 2log2(n_peers) *neighbours*, not peers. Consequently, on
// sparsely connected peers this function returns a higher proportion of
// neighbours than elsewhere. In extremis, on peers with fewer than
// log2(n_peers) neighbours, all neighbours are returned.
func (r *routes) randomNeighbours(except PeerName) []PeerName {
- destinations := make(peerNameSet)
r.RLock()
defer r.RUnlock()
- count := int(math.Log2(float64(len(r.unicastAll))))
- // depends on go's random map iteration
+ var total int64 = 0
+ weights := make(map[PeerName]int64)
+ // First iterate the whole set, counting how often each neighbour appears
for _, dst := range r.unicastAll {
if dst != UnknownPeerName && dst != except {
- destinations[dst] = struct{}{}
- if len(destinations) >= count {
+ total++
+ weights[dst]++
+ }
+ }
+ needed := int(math.Min(2*math.Log2(float64(len(r.unicastAll))), float64(len(weights))))
+ destinations := make([]PeerName, 0, needed)
+ for len(destinations) < needed {
+ // Pick a random point on the distribution and linear search for it
+ rnd := rand.Int63n(total)
+ for dst, count := range weights {
+ if rnd < count {
+ destinations = append(destinations, dst)
+ // Remove the one we selected from consideration
+ delete(weights, dst)
+ total -= count
break
}
+ rnd -= count
}
}
- res := make([]PeerName, 0, len(destinations))
- for dst := range destinations {
- res = append(res, dst)
- }
- return res
+ return destinations
}
// Recalculate requests recalculation of the routing table. This is async but
// can effectively be made synchronous with a subsequent call to
// EnsureRecalculated.
func (r *routes) recalculate() {
- // The use of a 1-capacity channel in combination with the
- // non-blocking send is an optimisation that results in multiple
- // requests being coalesced.
- select {
- case r.recalc <- nil:
- default:
+ r.Lock()
+ if !r.pendingRecalc {
+ r.recalcTimer.Reset(recalcDeferTime)
+ r.pendingRecalc = true
}
+ r.Unlock()
+}
+
+func (r *routes) clearPendingRecalcFlag() {
+ r.Lock()
+ r.pendingRecalc = false
+ r.Unlock()
}
// EnsureRecalculated waits for any preceding Recalculate requests to finish.
func (r *routes) ensureRecalculated() {
- done := make(chan struct{})
+ var done chan struct{}
+ // If another call is already waiting, wait on the same chan, otherwise make a new one
+ select {
+ case done = <-r.wait:
+ default:
+ done = make(chan struct{})
+ }
r.wait <- done
<-done
}
-func (r *routes) run(recalculate <-chan *struct{}, wait <-chan chan struct{}, action <-chan func()) {
+func (r *routes) run(wait <-chan chan struct{}, action <-chan func()) {
for {
select {
- case <-recalculate:
+ case <-r.recalcTimer.C:
+ r.clearPendingRecalcFlag()
r.calculate()
case done := <-wait:
- select {
- case <-recalculate:
+ r.Lock()
+ pending := r.pendingRecalc
+ r.Unlock()
+ if pending {
+ <-r.recalcTimer.C
+ r.clearPendingRecalcFlag()
r.calculate()
- default:
}
close(done)
case f := <-action:
@@ -190,6 +224,8 @@ func (r *routes) run(recalculate <-chan *struct{}, wait <-chan chan struct{}, ac
}
}
+// Calculate unicast and broadcast routes from r.ourself, and reset
+// the broadcast route cache.
func (r *routes) calculate() {
r.peers.RLock()
r.ourself.RLock()
diff --git a/status.go b/status.go
index a78cac3..1c77039 100644
--- a/status.go
+++ b/status.go
@@ -30,7 +30,7 @@ type Status struct {
func NewStatus(router *Router) *Status {
return &Status{
Protocol: Protocol,
- ProtocolMinVersion: ProtocolMinVersion,
+ ProtocolMinVersion: int(router.ProtocolMinVersion),
ProtocolMaxVersion: ProtocolMaxVersion,
Encryption: router.usingPassword(),
PeerDiscovery: router.PeerDiscovery,
diff --git a/surrogate_gossiper.go b/surrogate_gossiper.go
index 89ea9e1..4ed9f28 100644
--- a/surrogate_gossiper.go
+++ b/surrogate_gossiper.go
@@ -11,6 +11,7 @@ import (
type surrogateGossiper struct {
sync.Mutex
prevUpdates []prevUpdate
+ router *Router
}
type prevUpdate struct {
@@ -56,6 +57,10 @@ func (s *surrogateGossiper) OnGossip(update []byte) (GossipData, error) {
// (this time limit is arbitrary; surrogateGossiper should pass on new gossip immediately
// so there should be no reason for a duplicate to show up after a long time)
updateTime := now()
+ gossipInterval := defaultGossipInterval
+ if s.router != nil {
+ gossipInterval = s.router.gossipInterval()
+ }
deleteBefore := updateTime.Add(-gossipInterval)
keepFrom := len(s.prevUpdates)
for i, p := range s.prevUpdates {
diff --git a/surrogate_gossiper_test.go b/surrogate_gossiper_test.go
index 61931b1..a4db566 100644
--- a/surrogate_gossiper_test.go
+++ b/surrogate_gossiper_test.go
@@ -37,11 +37,11 @@ func TestSurrogateGossiperOnGossip(t *testing.T) {
checkOnGossip(t, s, msg[1], msg[1])
checkOnGossip(t, s, msg[0], nil)
checkOnGossip(t, s, msg[1], nil)
- myTime = myTime.Add(gossipInterval / 2) // Should not trigger cleardown
- checkOnGossip(t, s, msg[2], msg[2]) // Only clears out old ones on new entry
+ myTime = myTime.Add(defaultGossipInterval / 2) // Should not trigger cleardown
+ checkOnGossip(t, s, msg[2], msg[2]) // Only clears out old ones on new entry
checkOnGossip(t, s, msg[0], nil)
checkOnGossip(t, s, msg[1], nil)
- myTime = myTime.Add(gossipInterval)
+ myTime = myTime.Add(defaultGossipInterval)
checkOnGossip(t, s, msg[0], nil)
checkOnGossip(t, s, msg[3], msg[3]) // Only clears out old ones on new entry
checkOnGossip(t, s, msg[0], msg[0])
Debdiff
[The following lists of changes regard files as different if they have different names, permissions or owners.]
Files in second set of .debs but not in first
-rw-r--r-- root/root /usr/share/gocode/src/github.com/weaveworks/mesh/go.mod -rw-r--r-- root/root /usr/share/gocode/src/github.com/weaveworks/mesh/go.sum
No differences were encountered in the control files