Codebase list golang-github-go-kit-kit / 75666f7 tracing / zipkin / collector_test.go
75666f7

Tree @75666f7 (Download .tar.gz)

collector_test.go @75666f7raw · history · blame

package zipkin_test

import (
	"encoding/base64"
	"fmt"
	"math/rand"
	"net"
	"sync"
	"testing"
	"time"

	"github.com/apache/thrift/lib/go/thrift"

	"github.com/go-kit/kit/tracing/zipkin"
	"github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/scribe"
	"github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/zipkincore"
)

func TestScribeCollector(t *testing.T) {
	server := newScribeServer(t)

	timeout := time.Second
	batchInterval := time.Millisecond
	c, err := zipkin.NewScribeCollector(server.addr(), timeout, 0, batchInterval)
	if err != nil {
		t.Fatal(err)
	}

	var (
		name         = "span-name"
		traceID      = int64(123)
		spanID       = int64(456)
		parentSpanID = int64(0)
		value        = "foo"
		duration     = 42 * time.Millisecond
	)

	span := zipkin.NewSpan("some-host", name, traceID, spanID, parentSpanID)
	span.AnnotateDuration("foo", 42*time.Millisecond)
	if err := c.Collect(span); err != nil {
		t.Errorf("error during collection: %v", err)
	}

	// Need to yield to the select loop to accept the send request, and then
	// yield again to the send operation to write to the socket. I think the
	// best way to do that is just give it some time.

	deadline := time.Now().Add(1 * time.Second)
	for {
		if time.Now().After(deadline) {
			t.Fatalf("never received a span")
		}
		if want, have := 1, len(server.spans()); want != have {
			time.Sleep(time.Millisecond)
			continue
		}
		break
	}

	gotSpan := server.spans()[0]
	if want, have := name, gotSpan.GetName(); want != have {
		t.Errorf("want %q, have %q", want, have)
	}
	if want, have := traceID, gotSpan.GetTraceId(); want != have {
		t.Errorf("want %d, have %d", want, have)
	}
	if want, have := spanID, gotSpan.GetId(); want != have {
		t.Errorf("want %d, have %d", want, have)
	}
	if want, have := parentSpanID, gotSpan.GetParentId(); want != have {
		t.Errorf("want %d, have %d", want, have)
	}

	if want, have := 1, len(gotSpan.GetAnnotations()); want != have {
		t.Fatalf("want %d, have %d", want, have)
	}

	gotAnnotation := gotSpan.GetAnnotations()[0]
	if want, have := value, gotAnnotation.GetValue(); want != have {
		t.Errorf("want %q, have %q", want, have)
	}
	if want, have := duration, time.Duration(gotAnnotation.GetDuration())*time.Microsecond; want != have {
		t.Errorf("want %s, have %s", want, have)
	}
}

type scribeServer struct {
	t         *testing.T
	transport *thrift.TServerSocket
	address   string
	server    *thrift.TSimpleServer
	handler   *scribeHandler
}

func newScribeServer(t *testing.T) *scribeServer {
	protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
	transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())

	var port int
	var transport *thrift.TServerSocket
	var err error
	for i := 0; i < 10; i++ {
		port = 10000 + rand.Intn(10000)
		transport, err = thrift.NewTServerSocket(fmt.Sprintf(":%d", port))
		if err != nil {
			t.Logf("port %d: %v", port, err)
			continue
		}
		break
	}
	if err != nil {
		t.Fatal(err)
	}

	handler := newScribeHandler(t)
	server := thrift.NewTSimpleServer4(
		scribe.NewScribeProcessor(handler),
		transport,
		transportFactory,
		protocolFactory,
	)

	go server.Serve()

	deadline := time.Now().Add(time.Second)
	for !canConnect(port) {
		if time.Now().After(deadline) {
			t.Fatal("server never started")
		}
		time.Sleep(time.Millisecond)
	}

	return &scribeServer{
		transport: transport,
		address:   fmt.Sprintf("127.0.0.1:%d", port),
		handler:   handler,
	}
}

func (s *scribeServer) addr() string {
	return s.address
}

func (s *scribeServer) spans() []*zipkincore.Span {
	return s.handler.spans()
}

type scribeHandler struct {
	t *testing.T
	sync.RWMutex
	entries []*scribe.LogEntry
}

func newScribeHandler(t *testing.T) *scribeHandler {
	return &scribeHandler{t: t}
}

func (h *scribeHandler) Log(messages []*scribe.LogEntry) (scribe.ResultCode, error) {
	h.Lock()
	defer h.Unlock()
	for _, m := range messages {
		h.entries = append(h.entries, m)
	}
	return scribe.ResultCode_OK, nil
}

func (h *scribeHandler) spans() []*zipkincore.Span {
	h.RLock()
	defer h.RUnlock()
	spans := []*zipkincore.Span{}
	for _, m := range h.entries {
		decoded, err := base64.StdEncoding.DecodeString(m.GetMessage())
		if err != nil {
			h.t.Error(err)
			continue
		}
		buffer := thrift.NewTMemoryBuffer()
		if _, err := buffer.Write(decoded); err != nil {
			h.t.Error(err)
			continue
		}
		transport := thrift.NewTBinaryProtocolTransport(buffer)
		zs := &zipkincore.Span{}
		if err := zs.Read(transport); err != nil {
			h.t.Error(err)
			continue
		}
		spans = append(spans, zs)
	}
	return spans
}

func canConnect(port int) bool {
	c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port))
	if err != nil {
		return false
	}
	c.Close()
	return true
}