Codebase list golang-github-go-kit-kit / 8cbddfe tracing / zipkin / kafka_test.go
8cbddfe

Tree @8cbddfe (Download .tar.gz)

kafka_test.go @8cbddferaw · history · blame

package zipkin_test

import (
	"errors"
	"testing"
	"time"

	"github.com/apache/thrift/lib/go/thrift"
	"gopkg.in/Shopify/sarama.v1"

	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/tracing/zipkin"
	"github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/zipkincore"
)

type stubProducer struct {
	in     chan *sarama.ProducerMessage
	err    chan *sarama.ProducerError
	kdown  bool
	closed bool
}

func (p *stubProducer) AsyncClose() {}
func (p *stubProducer) Close() error {
	if p.kdown {
		return errors.New("Kafka is down")
	}
	p.closed = true
	return nil
}
func (p *stubProducer) Input() chan<- *sarama.ProducerMessage     { return p.in }
func (p *stubProducer) Successes() <-chan *sarama.ProducerMessage { return nil }
func (p *stubProducer) Errors() <-chan *sarama.ProducerError      { return p.err }

func newStubProducer(kdown bool) *stubProducer {
	return &stubProducer{
		make(chan *sarama.ProducerMessage),
		make(chan *sarama.ProducerError),
		kdown,
		false,
	}
}

var spans = []*zipkin.Span{
	zipkin.NewSpan("203.0.113.10:1234", "service1", "avg", 123, 456, 0),
	zipkin.NewSpan("203.0.113.10:1234", "service2", "sum", 123, 789, 456),
	zipkin.NewSpan("203.0.113.10:1234", "service2", "div", 123, 101112, 456),
}

func TestKafkaProduce(t *testing.T) {
	p := newStubProducer(false)
	c, err := zipkin.NewKafkaCollector(
		[]string{"192.0.2.10:9092"}, zipkin.KafkaProducer(p),
	)
	if err != nil {
		t.Fatal(err)
	}

	for _, want := range spans {
		m := collectSpan(t, c, p, want)
		testMetadata(t, m)
		got := deserializeSpan(t, m.Value)
		testEqual(t, want, got)
	}
}

func TestKafkaClose(t *testing.T) {
	p := newStubProducer(false)
	c, err := zipkin.NewKafkaCollector(
		[]string{"192.0.2.10:9092"}, zipkin.KafkaProducer(p),
	)
	if err != nil {
		t.Fatal(err)
	}
	if err = c.Close(); err != nil {
		t.Fatal(err)
	}
	if !p.closed {
		t.Fatal("producer not closed")
	}
}

func TestKafkaCloseError(t *testing.T) {
	p := newStubProducer(true)
	c, err := zipkin.NewKafkaCollector(
		[]string{"192.0.2.10:9092"}, zipkin.KafkaProducer(p),
	)
	if err != nil {
		t.Fatal(err)
	}
	if err = c.Close(); err == nil {
		t.Error("no error on close")
	}
}

func TestKafkaErrors(t *testing.T) {
	p := newStubProducer(true)
	errs := make(chan []interface{}, len(spans))
	lg := log.Logger(log.LoggerFunc(func(keyvals ...interface{}) error {
		for i := 0; i < len(keyvals); i += 2 {
			if keyvals[i] == "result" && keyvals[i+1] == "failed to produce msg" {
				errs <- keyvals
			}
		}
		return nil
	}))
	c, err := zipkin.NewKafkaCollector(
		[]string{"192.0.2.10:9092"},
		zipkin.KafkaProducer(p),
		zipkin.KafkaLogger(lg),
	)
	if err != nil {
		t.Fatal(err)
	}
	for _, want := range spans {
		_ = collectSpan(t, c, p, want)
	}

	for i := 0; i < len(spans); i++ {
		select {
		case <-errs:
		case <-time.After(100 * time.Millisecond):
			t.Fatalf("errors not logged. got %d, wanted %d", i, len(spans))
		}
	}
}

func collectSpan(t *testing.T, c zipkin.Collector, p *stubProducer, s *zipkin.Span) *sarama.ProducerMessage {
	var m *sarama.ProducerMessage
	rcvd := make(chan bool, 1)
	go func() {
		select {
		case m = <-p.in:
			rcvd <- true
			if p.kdown {
				p.err <- &sarama.ProducerError{m, errors.New("kafka is down")}
			}
		case <-time.After(100 * time.Millisecond):
			rcvd <- false
		}
	}()

	if err := c.Collect(s); err != nil {
		t.Errorf("error during collection: %v", err)
	}
	if !<-rcvd {
		t.Fatal("span message was not produced")
	}
	return m
}

func testMetadata(t *testing.T, m *sarama.ProducerMessage) {
	if m.Topic != "zipkin" {
		t.Errorf("produced to topic %q, want %q", m.Topic, "zipkin")
	}
	if m.Key != nil {
		t.Errorf("produced with key %q, want nil", m.Key)
	}
}

func deserializeSpan(t *testing.T, e sarama.Encoder) *zipkincore.Span {
	bytes, err := e.Encode()
	if err != nil {
		t.Errorf("error in encoding: %v", err)
	}
	s := zipkincore.NewSpan()
	mb := thrift.NewTMemoryBufferLen(len(bytes))
	mb.Write(bytes)
	mb.Flush()
	pt := thrift.NewTBinaryProtocolTransport(mb)
	err = s.Read(pt)
	if err != nil {
		t.Errorf("error in decoding: %v", err)
	}
	return s
}

func testEqual(t *testing.T, want *zipkin.Span, got *zipkincore.Span) {
	if got.TraceId != want.TraceID() {
		t.Errorf("trace_id %d, want %d", got.TraceId, want.TraceID())
	}
	if got.Id != want.SpanID() {
		t.Errorf("id %d, want %d", got.Id, want.SpanID())
	}
	if got.ParentId == nil {
		if want.ParentSpanID() != 0 {
			t.Errorf("parent_id %d, want %d", got.ParentId, want.ParentSpanID())
		}
	} else if *got.ParentId != want.ParentSpanID() {
		t.Errorf("parent_id %d, want %d", got.ParentId, want.ParentSpanID())
	}
}