Codebase list golang-github-weaveworks-mesh / 256deb49-a507-48b9-8e53-ef75ae2505c4/main gossip_channel.go
256deb49-a507-48b9-8e53-ef75ae2505c4/main

Tree @256deb49-a507-48b9-8e53-ef75ae2505c4/main (Download .tar.gz)

gossip_channel.go @256deb49-a507-48b9-8e53-ef75ae2505c4/mainraw · history · blame

package mesh

import (
	"bytes"
	"encoding/gob"
	"fmt"
)

// gossipChannel is a logical communication channel within a physical mesh.
type gossipChannel struct {
	name     string
	ourself  *localPeer
	routes   *routes
	gossiper Gossiper
	logger   Logger
}

// newGossipChannel returns a named, usable channel.
// It delegates receiving duties to the passed Gossiper.
func newGossipChannel(channelName string, ourself *localPeer, r *routes, g Gossiper, logger Logger) *gossipChannel {
	return &gossipChannel{
		name:     channelName,
		ourself:  ourself,
		routes:   r,
		gossiper: g,
		logger:   logger,
	}
}

func (c *gossipChannel) deliverUnicast(srcName PeerName, origPayload []byte, dec *gob.Decoder) error {
	var destName PeerName
	if err := dec.Decode(&destName); err != nil {
		return err
	}
	if c.ourself.Name == destName {
		var payload []byte
		if err := dec.Decode(&payload); err != nil {
			return err
		}
		return c.gossiper.OnGossipUnicast(srcName, payload)
	}
	if err := c.relayUnicast(destName, origPayload); err != nil {
		c.logf("%v", err)
	}
	return nil
}

func (c *gossipChannel) deliverBroadcast(srcName PeerName, _ []byte, dec *gob.Decoder) error {
	var payload []byte
	if err := dec.Decode(&payload); err != nil {
		return err
	}
	data, err := c.gossiper.OnGossipBroadcast(srcName, payload)
	if err != nil || data == nil {
		return err
	}
	c.relayBroadcast(srcName, data)
	return nil
}

func (c *gossipChannel) deliver(srcName PeerName, _ []byte, dec *gob.Decoder) error {
	var payload []byte
	if err := dec.Decode(&payload); err != nil {
		return err
	}
	update, err := c.gossiper.OnGossip(payload)
	if err != nil || update == nil {
		return err
	}
	c.relay(srcName, update)
	return nil
}

// GossipUnicast implements Gossip, relaying msg to dst, which must be a
// member of the channel.
func (c *gossipChannel) GossipUnicast(dstPeerName PeerName, msg []byte) error {
	return c.relayUnicast(dstPeerName, gobEncode(c.name, c.ourself.Name, dstPeerName, msg))
}

// GossipBroadcast implements Gossip, relaying update to all members of the
// channel.
func (c *gossipChannel) GossipBroadcast(update GossipData) {
	c.relayBroadcast(c.ourself.Name, update)
}

// Send relays data into the channel topology via random neighbours.
func (c *gossipChannel) Send(data GossipData) {
	c.relay(c.ourself.Name, data)
}

// SendDown relays data into the channel topology via conn.
func (c *gossipChannel) SendDown(conn Connection, data GossipData) {
	c.senderFor(conn).Send(data)
}

func (c *gossipChannel) relayUnicast(dstPeerName PeerName, buf []byte) (err error) {
	if relayPeerName, found := c.routes.UnicastAll(dstPeerName); !found {
		err = fmt.Errorf("unknown relay destination: %s", dstPeerName)
	} else if conn, found := c.ourself.ConnectionTo(relayPeerName); !found {
		err = fmt.Errorf("unable to find connection to relay peer %s", relayPeerName)
	} else {
		err = conn.(protocolSender).SendProtocolMsg(protocolMsg{ProtocolGossipUnicast, buf})
	}
	return err
}

func (c *gossipChannel) relayBroadcast(srcName PeerName, update GossipData) {
	c.routes.ensureRecalculated()
	for _, conn := range c.ourself.ConnectionsTo(c.routes.BroadcastAll(srcName)) {
		c.senderFor(conn).Broadcast(srcName, update)
	}
}

func (c *gossipChannel) relay(srcName PeerName, data GossipData) {
	c.routes.ensureRecalculated()
	for _, conn := range c.ourself.ConnectionsTo(c.routes.randomNeighbours(srcName)) {
		c.senderFor(conn).Send(data)
	}
}

func (c *gossipChannel) senderFor(conn Connection) *gossipSender {
	return conn.(gossipConnection).gossipSenders().Sender(c.name, c.makeGossipSender)
}

func (c *gossipChannel) makeGossipSender(sender protocolSender, stop <-chan struct{}) *gossipSender {
	return newGossipSender(c.makeMsg, c.makeBroadcastMsg, sender, stop)
}

func (c *gossipChannel) makeMsg(msg []byte) protocolMsg {
	return protocolMsg{ProtocolGossip, gobEncode(c.name, c.ourself.Name, msg)}
}

func (c *gossipChannel) makeBroadcastMsg(srcName PeerName, msg []byte) protocolMsg {
	return protocolMsg{ProtocolGossipBroadcast, gobEncode(c.name, srcName, msg)}
}

func (c *gossipChannel) logf(format string, args ...interface{}) {
	format = "[gossip " + c.name + "]: " + format
	c.logger.Printf(format, args...)
}

// GobEncode gob-encodes each item and returns the resulting byte slice.
func gobEncode(items ...interface{}) []byte {
	buf := new(bytes.Buffer)
	enc := gob.NewEncoder(buf)
	for _, i := range items {
		if err := enc.Encode(i); err != nil {
			panic(err)
		}
	}
	return buf.Bytes()
}