Codebase list golang-github-weaveworks-mesh / 256deb49-a507-48b9-8e53-ef75ae2505c4/main gossip.go
256deb49-a507-48b9-8e53-ef75ae2505c4/main

Tree @256deb49-a507-48b9-8e53-ef75ae2505c4/main (Download .tar.gz)

gossip.go @256deb49-a507-48b9-8e53-ef75ae2505c4/mainraw · history · blame

package mesh

import "sync"

// Gossip is the sending interface.
//
// TODO(pb): rename to e.g. Sender
type Gossip interface {
	// GossipUnicast emits a single message to a peer in the mesh.
	//
	// TODO(pb): rename to Unicast?
	//
	// Unicast takes []byte instead of GossipData because "to date there has
	// been no compelling reason [in practice] to do merging on unicast."
	// But there may be some motivation to have unicast Mergeable; see
	// https://github.com/weaveworks/weave/issues/1764
	//
	// TODO(pb): for uniformity of interface, rather take GossipData?
	GossipUnicast(dst PeerName, msg []byte) error

	// GossipBroadcast emits a message to all peers in the mesh.
	//
	// TODO(pb): rename to Broadcast?
	GossipBroadcast(update GossipData)
}

// Gossiper is the receiving interface.
//
// TODO(pb): rename to e.g. Receiver
type Gossiper interface {
	// OnGossipUnicast merges received data into state.
	//
	// TODO(pb): rename to e.g. OnUnicast
	OnGossipUnicast(src PeerName, msg []byte) error

	// OnGossipBroadcast merges received data into state and returns a
	// representation of the received data (typically a delta) for further
	// propagation.
	//
	// TODO(pb): rename to e.g. OnBroadcast
	OnGossipBroadcast(src PeerName, update []byte) (received GossipData, err error)

	// Gossip returns the state of everything we know; gets called periodically.
	Gossip() (complete GossipData)

	// OnGossip merges received data into state and returns "everything new
	// I've just learnt", or nil if nothing in the received data was new.
	OnGossip(msg []byte) (delta GossipData, err error)
}

// GossipData is a merge-able dataset.
// Think: log-structured data.
type GossipData interface {
	// Encode encodes the data into multiple byte-slices.
	Encode() [][]byte

	// Merge combines another GossipData into this one and returns the result.
	//
	// TODO(pb): does it need to be leave the original unmodified?
	Merge(GossipData) GossipData
}

// GossipSender accumulates GossipData that needs to be sent to one
// destination, and sends it when possible. GossipSender is one-to-one with a
// channel.
type gossipSender struct {
	sync.Mutex
	makeMsg          func(msg []byte) protocolMsg
	makeBroadcastMsg func(srcName PeerName, msg []byte) protocolMsg
	sender           protocolSender
	gossip           GossipData
	broadcasts       map[PeerName]GossipData
	more             chan<- struct{}
	flush            chan<- chan<- bool // for testing
}

// NewGossipSender constructs a usable GossipSender.
func newGossipSender(
	makeMsg func(msg []byte) protocolMsg,
	makeBroadcastMsg func(srcName PeerName, msg []byte) protocolMsg,
	sender protocolSender,
	stop <-chan struct{},
) *gossipSender {
	more := make(chan struct{}, 1)
	flush := make(chan chan<- bool)
	s := &gossipSender{
		makeMsg:          makeMsg,
		makeBroadcastMsg: makeBroadcastMsg,
		sender:           sender,
		broadcasts:       make(map[PeerName]GossipData),
		more:             more,
		flush:            flush,
	}
	go s.run(stop, more, flush)
	return s
}

func (s *gossipSender) run(stop <-chan struct{}, more <-chan struct{}, flush <-chan chan<- bool) {
	sent := false
	for {
		select {
		case <-stop:
			return
		case <-more:
			sentSomething, err := s.deliver(stop)
			if err != nil {
				return
			}
			sent = sent || sentSomething
		case ch := <-flush: // for testing
			// send anything pending, then reply back whether we sent
			// anything since previous flush
			select {
			case <-more:
				sentSomething, err := s.deliver(stop)
				if err != nil {
					return
				}
				sent = sent || sentSomething
			default:
			}
			ch <- sent
			sent = false
		}
	}
}

func (s *gossipSender) deliver(stop <-chan struct{}) (bool, error) {
	sent := false
	// We must not hold our lock when sending, since that would block
	// the callers of Send/Broadcast while we are stuck waiting for
	// network congestion to clear. So we pick and send one piece of
	// data at a time, only holding the lock during the picking.
	for {
		select {
		case <-stop:
			return sent, nil
		default:
		}
		data, makeProtocolMsg := s.pick()
		if data == nil {
			return sent, nil
		}
		for _, msg := range data.Encode() {
			if err := s.sender.SendProtocolMsg(makeProtocolMsg(msg)); err != nil {
				return sent, err
			}
		}
		sent = true
	}
}

func (s *gossipSender) pick() (data GossipData, makeProtocolMsg func(msg []byte) protocolMsg) {
	s.Lock()
	defer s.Unlock()
	switch {
	case s.gossip != nil: // usually more important than broadcasts
		data = s.gossip
		makeProtocolMsg = s.makeMsg
		s.gossip = nil
	case len(s.broadcasts) > 0:
		for srcName, d := range s.broadcasts {
			data = d
			makeProtocolMsg = func(msg []byte) protocolMsg { return s.makeBroadcastMsg(srcName, msg) }
			delete(s.broadcasts, srcName)
			break
		}
	}
	return
}

// Send accumulates the GossipData and will send it eventually.
// Send and Broadcast accumulate into different buckets.
func (s *gossipSender) Send(data GossipData) {
	s.Lock()
	defer s.Unlock()
	if s.empty() {
		defer s.prod()
	}
	if s.gossip == nil {
		s.gossip = data
	} else {
		s.gossip = s.gossip.Merge(data)
	}
}

// Broadcast accumulates the GossipData under the given srcName and will send
// it eventually. Send and Broadcast accumulate into different buckets.
func (s *gossipSender) Broadcast(srcName PeerName, data GossipData) {
	s.Lock()
	defer s.Unlock()
	if s.empty() {
		defer s.prod()
	}
	d, found := s.broadcasts[srcName]
	if !found {
		s.broadcasts[srcName] = data
	} else {
		s.broadcasts[srcName] = d.Merge(data)
	}
}

func (s *gossipSender) empty() bool { return s.gossip == nil && len(s.broadcasts) == 0 }

func (s *gossipSender) prod() {
	select {
	case s.more <- struct{}{}:
	default:
	}
}

// Flush sends all pending data, and returns true if anything was sent since
// the previous flush. For testing.
func (s *gossipSender) Flush() bool {
	ch := make(chan bool)
	s.flush <- ch
	return <-ch
}

// gossipSenders wraps a ProtocolSender (e.g. a LocalConnection) and yields
// per-channel GossipSenders.
// TODO(pb): may be able to remove this and use makeGossipSender directly
type gossipSenders struct {
	sync.Mutex
	sender  protocolSender
	stop    <-chan struct{}
	senders map[string]*gossipSender
}

// NewGossipSenders returns a usable GossipSenders leveraging the ProtocolSender.
// TODO(pb): is stop chan the best way to do that?
func newGossipSenders(sender protocolSender, stop <-chan struct{}) *gossipSenders {
	return &gossipSenders{
		sender:  sender,
		stop:    stop,
		senders: make(map[string]*gossipSender),
	}
}

// Sender yields the GossipSender for the named channel.
// It will use the factory function if no sender yet exists.
func (gs *gossipSenders) Sender(channelName string, makeGossipSender func(sender protocolSender, stop <-chan struct{}) *gossipSender) *gossipSender {
	gs.Lock()
	defer gs.Unlock()
	s, found := gs.senders[channelName]
	if !found {
		s = makeGossipSender(gs.sender, gs.stop)
		gs.senders[channelName] = s
	}
	return s
}

// Flush flushes all managed senders. Used for testing.
func (gs *gossipSenders) Flush() bool {
	sent := false
	gs.Lock()
	defer gs.Unlock()
	for _, sender := range gs.senders {
		sent = sender.Flush() || sent
	}
	return sent
}

// GossipChannels is an index of channel name to gossip channel.
type gossipChannels map[string]*gossipChannel

type gossipConnection interface {
	gossipSenders() *gossipSenders
}