New Upstream Snapshot - golang-github-weaveworks-mesh

Ready changes

Summary

Merged new upstream version: 0.4 (was: 0.1+git20180323.0c91e69).

Resulting package

Built on 2022-10-16T20:05 (took 3m40s)

The resulting binary packages can be installed (if you have the apt repository enabled) by running one of:

apt install -t fresh-snapshots golang-github-weaveworks-mesh-dev

Lintian Result

Diff

diff --git a/.circleci/config.yml b/.circleci/config.yml
new file mode 100644
index 0000000..40ec684
--- /dev/null
+++ b/.circleci/config.yml
@@ -0,0 +1,17 @@
+version: 2.1
+
+jobs:
+  build:
+    docker:
+      - image: golang:1.13.1-stretch
+    working_directory: /go/src/github.com/weaveworks/mesh
+    steps:
+    - checkout
+    - run:
+        name: Lint
+        command: |
+          ./lint
+    - run:
+        name: Test
+        command: |
+          go test -v
diff --git a/README.md b/README.md
index 856768f..4b23a3f 100644
--- a/README.md
+++ b/README.md
@@ -77,3 +77,12 @@ All contributions should be made as pull requests that satisfy the guidelines, b
 In addition, several mechanical checks are enforced.
 See [the lint script](/lint) for details.
 
+## <a name="help"></a>Getting Help
+
+If you have any questions about, feedback for or problems with `mesh`:
+
+- Invite yourself to the <a href="https://slack.weave.works/" target="_blank">Weave Users Slack</a>.
+- Ask a question on the [#general](https://weave-community.slack.com/messages/general/) slack channel.
+- [File an issue](https://github.com/weaveworks/mesh/issues/new).
+
+Your feedback is always welcome!
diff --git a/circle.yml b/circle.yml
deleted file mode 100644
index 6f4df80..0000000
--- a/circle.yml
+++ /dev/null
@@ -1,4 +0,0 @@
-test:
-  pre:
-    - ./lint
-
diff --git a/debian/changelog b/debian/changelog
index 5a389b2..2060867 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,9 @@
+golang-github-weaveworks-mesh (0.4-1) UNRELEASED; urgency=low
+
+  * New upstream release.
+
+ -- Debian Janitor <janitor@jelmer.uk>  Sun, 16 Oct 2022 20:02:17 -0000
+
 golang-github-weaveworks-mesh (0.1+git20180323.0c91e69-1) unstable; urgency=medium
 
   [ Alexandre Viau ]
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..8ebf605
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,8 @@
+module github.com/weaveworks/mesh
+
+go 1.12
+
+require (
+	github.com/stretchr/testify v1.4.0
+	golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..8ab1f0e
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,17 @@
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc h1:c0o/qxkaO2LF5t6fQrT4b5hzyggAkLLlCUjqfRxd8Q4=
+golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/gossip.go b/gossip.go
index fe90d13..44bd6ed 100644
--- a/gossip.go
+++ b/gossip.go
@@ -22,6 +22,9 @@ type Gossip interface {
 	//
 	// TODO(pb): rename to Broadcast?
 	GossipBroadcast(update GossipData)
+
+	// GossipNeighbourSubset emits a message to subset of neighbour peers in the mesh.
+	GossipNeighbourSubset(update GossipData)
 }
 
 // Gossiper is the receiving interface.
diff --git a/gossip_channel.go b/gossip_channel.go
index 2ff512e..d95688b 100644
--- a/gossip_channel.go
+++ b/gossip_channel.go
@@ -83,6 +83,12 @@ func (c *gossipChannel) GossipBroadcast(update GossipData) {
 	c.relayBroadcast(c.ourself.Name, update)
 }
 
+// GossipNeighbourSubset implements Gossip, relaying update to subset of members of the
+// channel.
+func (c *gossipChannel) GossipNeighbourSubset(update GossipData) {
+	c.relay(c.ourself.Name, update)
+}
+
 // Send relays data into the channel topology via random neighbours.
 func (c *gossipChannel) Send(data GossipData) {
 	c.relay(c.ourself.Name, data)
diff --git a/gossip_test.go b/gossip_test.go
index dd9ae70..d7ddd27 100644
--- a/gossip_test.go
+++ b/gossip_test.go
@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"io/ioutil"
 	"log"
+	"math"
 	"sync"
 	"testing"
 
@@ -68,14 +69,25 @@ func sendPendingGossip(routers ...*Router) {
 	}
 }
 
-func addTestGossipConnection(r1, r2 *Router) {
-	c1 := r1.newTestGossipConnection(r2)
-	c2 := r2.newTestGossipConnection(r1)
+func sendPendingTopologyUpdates(routers ...*Router) {
+	for _, router := range routers {
+		router.Ourself.Lock()
+		pendingUpdate := router.Ourself.pendingTopologyUpdate
+		router.Ourself.Unlock()
+		if pendingUpdate {
+			router.Ourself.broadcastPendingTopologyUpdates()
+		}
+	}
+}
+
+func addTestGossipConnection(t require.TestingT, r1, r2 *Router) {
+	c1 := r1.newTestGossipConnection(t, r2)
+	c2 := r2.newTestGossipConnection(t, r1)
 	c1.Start()
 	c2.Start()
 }
 
-func (router *Router) newTestGossipConnection(r *Router) *mockGossipConnection {
+func (router *Router) newTestGossipConnection(t require.TestingT, r *Router) *mockGossipConnection {
 	to := r.Ourself.Peer
 	toPeer := newPeer(to.Name, to.NickName, to.UID, 0, to.ShortID)
 	toPeer = router.Peers.fetchWithDefault(toPeer) // Has side-effect of incrementing refcount
@@ -86,7 +98,7 @@ func (router *Router) newTestGossipConnection(r *Router) *mockGossipConnection {
 		start:            make(chan struct{}),
 	}
 	conn.senders = newGossipSenders(conn, make(chan struct{}))
-	router.Ourself.handleAddConnection(conn, false)
+	require.NoError(t, router.Ourself.handleAddConnection(conn, false))
 	router.Ourself.handleConnectionEstablished(conn)
 	return conn
 }
@@ -121,6 +133,7 @@ func checkTopology(t *testing.T, router *Router, wantedPeers ...*Peer) {
 }
 
 func flushAndCheckTopology(t *testing.T, routers []*Router, wantedPeers ...*Peer) {
+	sendPendingTopologyUpdates(routers...)
 	sendPendingGossip(routers...)
 	for _, r := range routers {
 		checkTopology(t, r, wantedPeers...)
@@ -138,15 +151,15 @@ func TestGossipTopology(t *testing.T) {
 	checkTopology(t, r2, r2.tp())
 
 	// Now try adding some connections
-	addTestGossipConnection(r1, r2)
+	addTestGossipConnection(t, r1, r2)
 	sendPendingGossip(r1, r2)
 	checkTopology(t, r1, r1.tp(r2), r2.tp(r1))
 	checkTopology(t, r2, r1.tp(r2), r2.tp(r1))
 
-	addTestGossipConnection(r2, r3)
+	addTestGossipConnection(t, r2, r3)
 	flushAndCheckTopology(t, routers, r1.tp(r2), r2.tp(r1, r3), r3.tp(r2))
 
-	addTestGossipConnection(r3, r1)
+	addTestGossipConnection(t, r3, r1)
 	flushAndCheckTopology(t, routers, r1.tp(r2, r3), r2.tp(r1, r3), r3.tp(r1, r2))
 
 	// Drop the connection from 2 to 3
@@ -155,7 +168,9 @@ func TestGossipTopology(t *testing.T) {
 
 	// Drop the connection from 1 to 3
 	r1.DeleteTestGossipConnection(r3)
+	sendPendingTopologyUpdates(routers...)
 	sendPendingGossip(r1, r2, r3)
+	forcePendingGC(r1, r2, r3)
 	checkTopology(t, r1, r1.tp(r2), r2.tp(r1))
 	checkTopology(t, r2, r1.tp(r2), r2.tp(r1))
 	// r3 still thinks r1 has a connection to it
@@ -168,8 +183,8 @@ func TestGossipSurrogate(t *testing.T) {
 	r2 := newTestRouter(t, "02:00:00:02:00:00")
 	r3 := newTestRouter(t, "03:00:00:03:00:00")
 	routers := []*Router{r1, r2, r3}
-	addTestGossipConnection(r1, r2)
-	addTestGossipConnection(r3, r2)
+	addTestGossipConnection(t, r1, r2)
+	addTestGossipConnection(t, r3, r2)
 	flushAndCheckTopology(t, routers, r1.tp(r2), r2.tp(r1, r3), r3.tp(r2))
 
 	// create a gossiper at either end, but not the middle
@@ -257,3 +272,40 @@ func (g *testGossiper) checkHas(t *testing.T, vs ...byte) {
 func broadcast(s Gossip, v byte) {
 	s.GossipBroadcast(newSurrogateGossipData([]byte{v}))
 }
+
+func TestRandomNeighbours(t *testing.T) {
+	const nTrials = 5000
+	ourself := PeerName(0) // aliased with UnknownPeerName, which is ok here
+	// Check fairness of selection across different-sized sets
+	for _, test := range []struct{ nPeers, nNeighbours int }{{1, 0}, {2, 1}, {3, 2}, {10, 2}, {10, 3}, {10, 9}, {100, 2}, {100, 99}} {
+		t.Run(fmt.Sprint(test.nPeers, "_peers_", test.nNeighbours, "_neighbours"), func(t *testing.T) {
+			// Create a test fixture with unicastAll set up
+			r := routes{
+				unicastAll: make(unicastRoutes, test.nPeers),
+			}
+			// The route to 'ourself' is always via 'unknown'
+			r.unicastAll[ourself] = UnknownPeerName
+			// Fully-connected: unicast route to X is via X
+			for i := 1; i < test.nPeers; i++ {
+				r.unicastAll[PeerName(i)] = PeerName(i%test.nNeighbours + 1)
+			}
+			total := 0
+			counts := make([]int, test.nNeighbours+1)
+			// Run randomNeighbours() several times, and count the distribution
+			for trial := 0; trial < nTrials; trial++ {
+				targets := r.randomNeighbours(ourself)
+				expected := int(math.Min(2*math.Log2(float64(test.nPeers)), float64(test.nNeighbours)))
+				require.Equal(t, expected, len(targets))
+				total += len(targets)
+				for _, p := range targets {
+					counts[p]++
+				}
+			}
+			require.Equal(t, 0, counts[ourself], "randomNeighbours should not select source peer")
+			// Check that each neighbour was picked within 20% of an average count
+			for i := 1; i < test.nNeighbours+1; i++ {
+				require.InEpsilon(t, float64(total)/float64(test.nNeighbours), counts[i], 0.2, "peer %d picked %d times out of %d; counts %v", i, counts[i], total, counts)
+			}
+		})
+	}
+}
diff --git a/lint b/lint
index ec403c5..5bbb9e8 100755
--- a/lint
+++ b/lint
@@ -4,19 +4,10 @@ set -o errexit
 set -o nounset
 set -o pipefail
 
-if [ ! $(command -v gometalinter) ]
+if [ ! $(command -v golangci-lint) ]
 then
-	go get github.com/alecthomas/gometalinter
-	gometalinter --install --vendor
+    curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b $(go env GOPATH)/bin v1.20.0
+    golangci-lint --version
 fi
 
-gometalinter \
-	--exclude='error return value not checked.*(Close|Log|Print).*\(errcheck\)$' \
-	--exclude='.*_test\.go:.*error return value not checked.*\(errcheck\)$' \
-	--exclude='duplicate of.*_test.go.*\(dupl\)$' \
-	--disable=aligncheck \
-	--disable=gotype \
-	--disable=gas \
-	--cyclo-over=20 \
-	--tests \
-	--deadline=20s
+golangci-lint run
diff --git a/local_peer.go b/local_peer.go
index 7e633c9..7b36da1 100644
--- a/local_peer.go
+++ b/local_peer.go
@@ -8,13 +8,20 @@ import (
 	"time"
 )
 
+const (
+	deferTopologyUpdateDuration = 1 * time.Second
+)
+
 // localPeer is the only "active" peer in the mesh. It extends Peer with
 // additional behaviors, mostly to retrieve and manage connection state.
 type localPeer struct {
 	sync.RWMutex
 	*Peer
-	router     *Router
-	actionChan chan<- localPeerAction
+	router                *Router
+	actionChan            chan<- localPeerAction
+	topologyUpdates       peerNameSet
+	timer                 *time.Timer
+	pendingTopologyUpdate bool
 }
 
 // The actor closure used by localPeer.
@@ -23,11 +30,15 @@ type localPeerAction func()
 // newLocalPeer returns a usable LocalPeer.
 func newLocalPeer(name PeerName, nickName string, router *Router) *localPeer {
 	actionChan := make(chan localPeerAction, ChannelSize)
+	topologyUpdates := make(peerNameSet)
 	peer := &localPeer{
-		Peer:       newPeer(name, nickName, randomPeerUID(), 0, randomPeerShortID()),
-		router:     router,
-		actionChan: actionChan,
+		Peer:            newPeer(name, nickName, randomPeerUID(), 0, randomPeerShortID()),
+		router:          router,
+		actionChan:      actionChan,
+		topologyUpdates: topologyUpdates,
+		timer:           time.NewTimer(deferTopologyUpdateDuration),
 	}
+	peer.timer.Stop()
 	go peer.actorLoop(actionChan)
 	return peer
 }
@@ -136,6 +147,10 @@ func (peer *localPeer) encode(enc *gob.Encoder) {
 // ACTOR server
 
 func (peer *localPeer) actorLoop(actionChan <-chan localPeerAction) {
+	gossipInterval := defaultGossipInterval
+	if peer.router != nil {
+		gossipInterval = peer.router.gossipInterval()
+	}
 	gossipTimer := time.Tick(gossipInterval)
 	for {
 		select {
@@ -143,10 +158,22 @@ func (peer *localPeer) actorLoop(actionChan <-chan localPeerAction) {
 			action()
 		case <-gossipTimer:
 			peer.router.sendAllGossip()
+		case <-peer.timer.C:
+			peer.broadcastPendingTopologyUpdates()
 		}
 	}
 }
 
+func (peer *localPeer) broadcastPendingTopologyUpdates() {
+	peer.Lock()
+	gossipData := peer.topologyUpdates
+	peer.topologyUpdates = make(peerNameSet)
+	peer.pendingTopologyUpdate = false
+	peer.Unlock()
+	gossipData[peer.Peer.Name] = struct{}{}
+	peer.router.broadcastTopologyUpdate(gossipData)
+}
+
 func (peer *localPeer) handleAddConnection(conn ourConnection, isRestartedPeer bool) error {
 	if peer.Peer != conn.getLocal() {
 		panic("Attempt made to add connection to peer where peer is not the source of connection")
@@ -190,7 +217,6 @@ func (peer *localPeer) handleAddConnection(conn ourConnection, isRestartedPeer b
 		conn.logf("connection added (new peer)")
 		peer.router.sendAllGossipDown(conn)
 	}
-
 	peer.router.Routes.recalculate()
 	peer.broadcastPeerUpdate(conn.Remote())
 
@@ -240,7 +266,15 @@ func (peer *localPeer) broadcastPeerUpdate(peers ...*Peer) {
 	// context of a test, but that will involve significant
 	// reworking of tests.
 	if peer.router != nil {
-		peer.router.broadcastTopologyUpdate(append(peers, peer.Peer))
+		peer.Lock()
+		defer peer.Unlock()
+		if !peer.pendingTopologyUpdate {
+			peer.timer.Reset(deferTopologyUpdateDuration)
+			peer.pendingTopologyUpdate = true
+		}
+		for _, p := range peers {
+			peer.topologyUpdates[p.Name] = struct{}{}
+		}
 	}
 }
 
diff --git a/peer_test.go b/peer_test.go
index 3950731..deb9ea0 100644
--- a/peer_test.go
+++ b/peer_test.go
@@ -13,3 +13,16 @@ func TestPeerRoutes(t *testing.T) {
 func TestPeerForEachConnectedPeer(t *testing.T) {
 	t.Skip("TODO")
 }
+
+func forcePendingGC(routers ...*Router) {
+	for _, router := range routers {
+		router.Peers.Lock()
+		if router.Peers.pendingGC {
+			var pending peersPendingNotifications
+			router.Peers.garbageCollect(&pending)
+			router.Peers.unlockAndNotify(&pending)
+		} else {
+			router.Peers.Unlock()
+		}
+	}
+}
diff --git a/peers.go b/peers.go
index 1190450..c10b031 100644
--- a/peers.go
+++ b/peers.go
@@ -6,6 +6,11 @@ import (
 	"io"
 	"math/rand"
 	"sync"
+	"time"
+)
+
+const (
+	gcInterval = 1 * time.Second
 )
 
 // Peers collects all of the known peers in the mesh, including ourself.
@@ -18,6 +23,8 @@ type Peers struct {
 
 	// Called when the mapping from short IDs to peers changes
 	onInvalidateShortIDs []func()
+	timer                *time.Timer
+	pendingGC            bool
 }
 
 type shortIDPeers struct {
@@ -60,8 +67,11 @@ func newPeers(ourself *localPeer) *Peers {
 		ourself:   ourself,
 		byName:    make(map[PeerName]*Peer),
 		byShortID: make(map[PeerShortID]shortIDPeers),
+		timer:     time.NewTimer(gcInterval),
 	}
 	peers.fetchWithDefault(ourself.Peer)
+	peers.timer.Stop()
+	go peers.actorLoop()
 	return peers
 }
 
@@ -339,6 +349,15 @@ func (peers *Peers) forEach(fun func(*Peer)) {
 	}
 }
 
+func (peers *Peers) actorLoop() {
+	for range peers.timer.C {
+		peers.GarbageCollect()
+		peers.Lock()
+		peers.pendingGC = false
+		peers.Unlock()
+	}
+}
+
 // Merge an incoming update with our own topology.
 //
 // We add peers hitherto unknown to us, and update peers for which the
@@ -363,7 +382,6 @@ func (peers *Peers) applyUpdate(update []byte) (peerNameSet, peerNameSet, error)
 
 	// Now apply the updates
 	newUpdate := peers.applyDecodedUpdate(decodedUpdate, decodedConns, &pending)
-	peers.garbageCollect(&pending)
 	for _, peerRemoved := range pending.removed {
 		delete(newUpdate, peerRemoved.Name)
 	}
@@ -373,6 +391,13 @@ func (peers *Peers) applyUpdate(update []byte) (peerNameSet, peerNameSet, error)
 		updateNames[peer.Name] = struct{}{}
 	}
 
+	if !peers.pendingGC {
+		// schedule a GarbageCollect() to run after gcInterval time period
+		// corresponding to all topology updates received during the period
+		peers.timer.Reset(gcInterval)
+		peers.pendingGC = true
+	}
+
 	return updateNames, newUpdate, nil
 }
 
diff --git a/peers_test.go b/peers_test.go
index c9bec85..61de2bd 100644
--- a/peers_test.go
+++ b/peers_test.go
@@ -31,7 +31,8 @@ func checkApplyUpdate(t *testing.T, peers *Peers) {
 	// into it.
 	_, testBedPeers := newNode(dummyName)
 	testBedPeers.AddTestConnection(peers.ourself.Peer)
-	testBedPeers.applyUpdate(peers.encodePeers(peers.names()))
+	_, _, err := testBedPeers.applyUpdate(peers.encodePeers(peers.names()))
+	require.NoError(t, err)
 
 	checkTopologyPeers(t, true, testBedPeers.allPeersExcept(dummyName), peers.allPeers()...)
 }
@@ -296,13 +297,15 @@ func TestShortIDPropagation(t *testing.T) {
 	_, peers2 := newNode(PeerName(2))
 
 	peers1.AddTestConnection(peers2.ourself.Peer)
-	peers1.applyUpdate(peers2.encodePeers(peers2.names()))
+	_, _, err := peers1.applyUpdate(peers2.encodePeers(peers2.names()))
+	require.NoError(t, err)
 	peers12 := peers1.Fetch(PeerName(2))
 	old := peers12.peerSummary
 
 	require.True(t,
 		peers2.reassignLocalShortID(&peersPendingNotifications{}))
-	peers1.applyUpdate(peers2.encodePeers(peers2.names()))
+	_, _, err = peers1.applyUpdate(peers2.encodePeers(peers2.names()))
+	require.NoError(t, err)
 	require.NotEqual(t, old.Version, peers12.Version)
 	require.NotEqual(t, old.ShortID, peers12.ShortID)
 }
@@ -322,15 +325,18 @@ func TestShortIDCollision(t *testing.T) {
 	peers3.AddTestConnection(peers2.ourself.Peer)
 
 	// Propogate from 1 to 2 to 3
-	peers2.applyUpdate(peers1.encodePeers(peers1.names()))
-	peers3.applyUpdate(peers2.encodePeers(peers2.names()))
+	_, _, err := peers2.applyUpdate(peers1.encodePeers(peers1.names()))
+	require.NoError(t, err)
+	_, _, err = peers3.applyUpdate(peers2.encodePeers(peers2.names()))
+	require.NoError(t, err)
 
 	// Force the short id of peer 1 to collide with peer 2.  Peer
 	// 1 has the lowest name, so it gets to keep the short id
 	peers1.setLocalShortID(2, &pending)
 
 	oldShortID := peers2.ourself.ShortID
-	_, updated, _ := peers2.applyUpdate(peers1.encodePeers(peers1.names()))
+	_, updated, err := peers2.applyUpdate(peers1.encodePeers(peers1.names()))
+	require.NoError(t, err)
 
 	// peer 2 should have noticed the collision and resolved it
 	require.NotEqual(t, oldShortID, peers2.ourself.ShortID)
@@ -341,7 +347,8 @@ func TestShortIDCollision(t *testing.T) {
 	updated[PeerName(2)] = struct{}{}
 
 	// the update from peer 2 should include its short id change
-	peers3.applyUpdate(peers2.encodePeers(updated))
+	_, _, err = peers3.applyUpdate(peers2.encodePeers(updated))
+	require.NoError(t, err)
 	require.Equal(t, peers2.ourself.ShortID,
 		peers3.Fetch(PeerName(2)).ShortID)
 }
diff --git a/router.go b/router.go
index 27be495..9af5418 100644
--- a/router.go
+++ b/router.go
@@ -17,14 +17,15 @@ var (
 	// ChannelSize is the buffer size used by so-called actor goroutines
 	// throughout mesh.
 	ChannelSize = 16
+
+	defaultGossipInterval = 30 * time.Second
 )
 
 const (
 	tcpHeartbeat     = 30 * time.Second
-	gossipInterval   = 30 * time.Second
 	maxDuration      = time.Duration(math.MaxInt64)
-	acceptMaxTokens  = 100
-	acceptTokenDelay = 100 * time.Millisecond // [2]
+	acceptMaxTokens  = 20
+	acceptTokenDelay = 50 * time.Millisecond
 )
 
 // Config defines dimensions of configuration for the router.
@@ -37,6 +38,7 @@ type Config struct {
 	ProtocolMinVersion byte
 	PeerDiscovery      bool
 	TrustedSubnets     []*net.IPNet
+	GossipInterval     *time.Duration
 }
 
 // Router manages communication between this peer and the rest of the mesh.
@@ -154,7 +156,7 @@ func (router *Router) gossipChannel(channelName string) *gossipChannel {
 	if channel, found = router.gossipChannels[channelName]; found {
 		return channel
 	}
-	channel = newGossipChannel(channelName, router.Ourself, router.Routes, &surrogateGossiper{}, router.logger)
+	channel = newGossipChannel(channelName, router.Ourself, router.Routes, &surrogateGossiper{router: router}, router.logger)
 	channel.logf("created surrogate channel")
 	router.gossipChannels[channelName] = channel
 	return channel
@@ -170,6 +172,14 @@ func (router *Router) gossipChannelSet() map[*gossipChannel]struct{} {
 	return channels
 }
 
+func (router *Router) gossipInterval() time.Duration {
+	if router.Config.GossipInterval != nil {
+		return *router.Config.GossipInterval
+	} else {
+		return defaultGossipInterval
+	}
+}
+
 func (router *Router) handleGossip(tag protocolTag, payload []byte) error {
 	decoder := gob.NewDecoder(bytes.NewReader(payload))
 	var channelName string
@@ -221,12 +231,9 @@ func (router *Router) sendPendingGossip() bool {
 
 // BroadcastTopologyUpdate is invoked whenever there is a change to the mesh
 // topology, and broadcasts the new set of peers to the mesh.
-func (router *Router) broadcastTopologyUpdate(update []*Peer) {
-	names := make(peerNameSet)
-	for _, p := range update {
-		names[p.Name] = struct{}{}
-	}
-	router.topologyGossip.GossipBroadcast(&topologyGossipData{peers: router.Peers, update: names})
+func (router *Router) broadcastTopologyUpdate(update peerNameSet) {
+	gossipData := &topologyGossipData{peers: router.Peers, update: update}
+	router.topologyGossip.GossipNeighbourSubset(gossipData)
 }
 
 // OnGossipUnicast implements Gossiper, but always returns an error, as a
diff --git a/routes.go b/routes.go
index 10ff315..0e72d0f 100644
--- a/routes.go
+++ b/routes.go
@@ -2,7 +2,9 @@ package mesh
 
 import (
 	"math"
+	"math/rand"
 	"sync"
+	"time"
 )
 
 type unicastRoutes map[PeerName]PeerName
@@ -11,23 +13,29 @@ type broadcastRoutes map[PeerName][]PeerName
 // routes aggregates unicast and broadcast routes for our peer.
 type routes struct {
 	sync.RWMutex
-	ourself      *localPeer
-	peers        *Peers
-	onChange     []func()
-	unicast      unicastRoutes
-	unicastAll   unicastRoutes // [1]
-	broadcast    broadcastRoutes
-	broadcastAll broadcastRoutes // [1]
-	recalc       chan<- *struct{}
-	wait         chan<- chan struct{}
-	action       chan<- func()
+	ourself       *localPeer
+	peers         *Peers
+	onChange      []func()
+	unicast       unicastRoutes
+	unicastAll    unicastRoutes // [1]
+	broadcast     broadcastRoutes
+	broadcastAll  broadcastRoutes // [1]
+	recalcTimer   *time.Timer
+	pendingRecalc bool
+	wait          chan chan struct{}
+	action        chan<- func()
 	// [1] based on *all* connections, not just established &
 	// symmetric ones
 }
 
+const (
+	// We defer recalculation requests by up to 100ms, in order to
+	// coalesce multiple recalcs together.
+	recalcDeferTime = 100 * time.Millisecond
+)
+
 // newRoutes returns a usable Routes based on the LocalPeer and existing Peers.
 func newRoutes(ourself *localPeer, peers *Peers) *routes {
-	recalculate := make(chan *struct{}, 1)
 	wait := make(chan chan struct{})
 	action := make(chan func())
 	r := &routes{
@@ -37,11 +45,12 @@ func newRoutes(ourself *localPeer, peers *Peers) *routes {
 		unicastAll:   unicastRoutes{ourself.Name: UnknownPeerName},
 		broadcast:    broadcastRoutes{ourself.Name: []PeerName{}},
 		broadcastAll: broadcastRoutes{ourself.Name: []PeerName{}},
-		recalc:       recalculate,
+		recalcTimer:  time.NewTimer(time.Hour),
 		wait:         wait,
 		action:       action,
 	}
-	go r.run(recalculate, wait, action)
+	r.recalcTimer.Stop()
+	go r.run(wait, action)
 	return r
 }
 
@@ -119,7 +128,7 @@ func (r *routes) lookupOrCalculate(name PeerName, broadcast *broadcastRoutes, es
 	return <-res
 }
 
-// RandomNeighbours chooses min(log2(n_peers), n_neighbouring_peers)
+// RandomNeighbours chooses min(2 log2(n_peers), n_neighbouring_peers)
 // neighbours, with a random distribution that is topology-sensitive,
 // favouring neighbours at the end of "bottleneck links". We determine the
 // latter based on the unicast routing table. If a neighbour appears as the
@@ -127,61 +136,86 @@ func (r *routes) lookupOrCalculate(name PeerName, broadcast *broadcastRoutes, es
 // proportion of peers via that neighbour than other neighbours - then it is
 // chosen with a higher probability.
 //
-// Note that we choose log2(n_peers) *neighbours*, not peers. Consequently, on
+// Note that we choose 2log2(n_peers) *neighbours*, not peers. Consequently, on
 // sparsely connected peers this function returns a higher proportion of
 // neighbours than elsewhere. In extremis, on peers with fewer than
 // log2(n_peers) neighbours, all neighbours are returned.
 func (r *routes) randomNeighbours(except PeerName) []PeerName {
-	destinations := make(peerNameSet)
 	r.RLock()
 	defer r.RUnlock()
-	count := int(math.Log2(float64(len(r.unicastAll))))
-	// depends on go's random map iteration
+	var total int64 = 0
+	weights := make(map[PeerName]int64)
+	// First iterate the whole set, counting how often each neighbour appears
 	for _, dst := range r.unicastAll {
 		if dst != UnknownPeerName && dst != except {
-			destinations[dst] = struct{}{}
-			if len(destinations) >= count {
+			total++
+			weights[dst]++
+		}
+	}
+	needed := int(math.Min(2*math.Log2(float64(len(r.unicastAll))), float64(len(weights))))
+	destinations := make([]PeerName, 0, needed)
+	for len(destinations) < needed {
+		// Pick a random point on the distribution and linear search for it
+		rnd := rand.Int63n(total)
+		for dst, count := range weights {
+			if rnd < count {
+				destinations = append(destinations, dst)
+				// Remove the one we selected from consideration
+				delete(weights, dst)
+				total -= count
 				break
 			}
+			rnd -= count
 		}
 	}
-	res := make([]PeerName, 0, len(destinations))
-	for dst := range destinations {
-		res = append(res, dst)
-	}
-	return res
+	return destinations
 }
 
 // Recalculate requests recalculation of the routing table. This is async but
 // can effectively be made synchronous with a subsequent call to
 // EnsureRecalculated.
 func (r *routes) recalculate() {
-	// The use of a 1-capacity channel in combination with the
-	// non-blocking send is an optimisation that results in multiple
-	// requests being coalesced.
-	select {
-	case r.recalc <- nil:
-	default:
+	r.Lock()
+	if !r.pendingRecalc {
+		r.recalcTimer.Reset(recalcDeferTime)
+		r.pendingRecalc = true
 	}
+	r.Unlock()
+}
+
+func (r *routes) clearPendingRecalcFlag() {
+	r.Lock()
+	r.pendingRecalc = false
+	r.Unlock()
 }
 
 // EnsureRecalculated waits for any preceding Recalculate requests to finish.
 func (r *routes) ensureRecalculated() {
-	done := make(chan struct{})
+	var done chan struct{}
+	// If another call is already waiting, wait on the same chan, otherwise make a new one
+	select {
+	case done = <-r.wait:
+	default:
+		done = make(chan struct{})
+	}
 	r.wait <- done
 	<-done
 }
 
-func (r *routes) run(recalculate <-chan *struct{}, wait <-chan chan struct{}, action <-chan func()) {
+func (r *routes) run(wait <-chan chan struct{}, action <-chan func()) {
 	for {
 		select {
-		case <-recalculate:
+		case <-r.recalcTimer.C:
+			r.clearPendingRecalcFlag()
 			r.calculate()
 		case done := <-wait:
-			select {
-			case <-recalculate:
+			r.Lock()
+			pending := r.pendingRecalc
+			r.Unlock()
+			if pending {
+				<-r.recalcTimer.C
+				r.clearPendingRecalcFlag()
 				r.calculate()
-			default:
 			}
 			close(done)
 		case f := <-action:
@@ -190,6 +224,8 @@ func (r *routes) run(recalculate <-chan *struct{}, wait <-chan chan struct{}, ac
 	}
 }
 
+// Calculate unicast and broadcast routes from r.ourself, and reset
+// the broadcast route cache.
 func (r *routes) calculate() {
 	r.peers.RLock()
 	r.ourself.RLock()
diff --git a/status.go b/status.go
index a78cac3..1c77039 100644
--- a/status.go
+++ b/status.go
@@ -30,7 +30,7 @@ type Status struct {
 func NewStatus(router *Router) *Status {
 	return &Status{
 		Protocol:           Protocol,
-		ProtocolMinVersion: ProtocolMinVersion,
+		ProtocolMinVersion: int(router.ProtocolMinVersion),
 		ProtocolMaxVersion: ProtocolMaxVersion,
 		Encryption:         router.usingPassword(),
 		PeerDiscovery:      router.PeerDiscovery,
diff --git a/surrogate_gossiper.go b/surrogate_gossiper.go
index 89ea9e1..4ed9f28 100644
--- a/surrogate_gossiper.go
+++ b/surrogate_gossiper.go
@@ -11,6 +11,7 @@ import (
 type surrogateGossiper struct {
 	sync.Mutex
 	prevUpdates []prevUpdate
+	router      *Router
 }
 
 type prevUpdate struct {
@@ -56,6 +57,10 @@ func (s *surrogateGossiper) OnGossip(update []byte) (GossipData, error) {
 	// (this time limit is arbitrary; surrogateGossiper should pass on new gossip immediately
 	// so there should be no reason for a duplicate to show up after a long time)
 	updateTime := now()
+	gossipInterval := defaultGossipInterval
+	if s.router != nil {
+		gossipInterval = s.router.gossipInterval()
+	}
 	deleteBefore := updateTime.Add(-gossipInterval)
 	keepFrom := len(s.prevUpdates)
 	for i, p := range s.prevUpdates {
diff --git a/surrogate_gossiper_test.go b/surrogate_gossiper_test.go
index 61931b1..a4db566 100644
--- a/surrogate_gossiper_test.go
+++ b/surrogate_gossiper_test.go
@@ -37,11 +37,11 @@ func TestSurrogateGossiperOnGossip(t *testing.T) {
 	checkOnGossip(t, s, msg[1], msg[1])
 	checkOnGossip(t, s, msg[0], nil)
 	checkOnGossip(t, s, msg[1], nil)
-	myTime = myTime.Add(gossipInterval / 2) // Should not trigger cleardown
-	checkOnGossip(t, s, msg[2], msg[2])     // Only clears out old ones on new entry
+	myTime = myTime.Add(defaultGossipInterval / 2) // Should not trigger cleardown
+	checkOnGossip(t, s, msg[2], msg[2])            // Only clears out old ones on new entry
 	checkOnGossip(t, s, msg[0], nil)
 	checkOnGossip(t, s, msg[1], nil)
-	myTime = myTime.Add(gossipInterval)
+	myTime = myTime.Add(defaultGossipInterval)
 	checkOnGossip(t, s, msg[0], nil)
 	checkOnGossip(t, s, msg[3], msg[3]) // Only clears out old ones on new entry
 	checkOnGossip(t, s, msg[0], msg[0])

Debdiff

[The following lists of changes regard files as different if they have different names, permissions or owners.]

Files in second set of .debs but not in first

-rw-r--r--  root/root   /usr/share/gocode/src/github.com/weaveworks/mesh/go.mod
-rw-r--r--  root/root   /usr/share/gocode/src/github.com/weaveworks/mesh/go.sum

No differences were encountered in the control files

More details

Full run details