Codebase list golang-github-segmentio-kafka-go / HEAD joingroup_test.go
HEAD

Tree @HEAD (Download .tar.gz)

joingroup_test.go @HEADraw · history · blame

package kafka

import (
	"bufio"
	"bytes"
	"reflect"
	"testing"
)

func TestSaramaCompatibility(t *testing.T) {
	var (
		// sample data from github.com/Shopify/sarama
		//
		// See consumer_group_members_test.go
		//
		groupMemberMetadata = []byte{
			0, 1, // Version
			0, 0, 0, 2, // Topic array length
			0, 3, 'o', 'n', 'e', // Topic one
			0, 3, 't', 'w', 'o', // Topic two
			0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
		}
		groupMemberAssignment = []byte{
			0, 1, // Version
			0, 0, 0, 1, // Topic array length
			0, 3, 'o', 'n', 'e', // Topic one
			0, 0, 0, 3, // Topic one, partition array length
			0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4
			0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
		}
	)

	t.Run("verify metadata", func(t *testing.T) {
		var item groupMetadata
		remain, err := (&item).readFrom(bufio.NewReader(bytes.NewReader(groupMemberMetadata)), len(groupMemberMetadata))
		if err != nil {
			t.Fatalf("bad err: %v", err)
		}
		if remain != 0 {
			t.Fatalf("expected 0; got %v", remain)
		}

		if v := item.Version; v != 1 {
			t.Errorf("expected Version 1; got %v", v)
		}
		if v := item.Topics; !reflect.DeepEqual([]string{"one", "two"}, v) {
			t.Errorf(`expected {"one", "two"}; got %v`, v)
		}
		if v := item.UserData; !reflect.DeepEqual([]byte{0x01, 0x02, 0x03}, v) {
			t.Errorf("expected []byte{0x01, 0x02, 0x03}; got %v", v)
		}
	})

	t.Run("verify assignments", func(t *testing.T) {
		var item groupAssignment
		remain, err := (&item).readFrom(bufio.NewReader(bytes.NewReader(groupMemberAssignment)), len(groupMemberAssignment))
		if err != nil {
			t.Fatalf("bad err: %v", err)
		}
		if remain != 0 {
			t.Fatalf("expected 0; got %v", remain)
		}

		if v := item.Version; v != 1 {
			t.Errorf("expected Version 1; got %v", v)
		}
		if v := item.Topics; !reflect.DeepEqual(map[string][]int32{"one": {0, 2, 4}}, v) {
			t.Errorf(`expected map[string][]int32{"one": {0, 2, 4}}; got %v`, v)
		}
		if v := item.UserData; !reflect.DeepEqual([]byte{0x01, 0x02, 0x03}, v) {
			t.Errorf("expected []byte{0x01, 0x02, 0x03}; got %v", v)
		}
	})
}

func TestMemberMetadata(t *testing.T) {
	item := groupMetadata{
		Version:  1,
		Topics:   []string{"a", "b"},
		UserData: []byte(`blah`),
	}

	buf := bytes.NewBuffer(nil)
	w := bufio.NewWriter(buf)
	item.writeTo(w)
	w.Flush()

	var found groupMetadata
	remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len())
	if err != nil {
		t.Error(err)
		t.FailNow()
	}
	if remain != 0 {
		t.Errorf("expected 0 remain, got %v", remain)
		t.FailNow()
	}
	if !reflect.DeepEqual(item, found) {
		t.Error("expected item and found to be the same")
		t.FailNow()
	}
}

func TestJoinGroupResponseV1(t *testing.T) {
	item := joinGroupResponseV1{
		ErrorCode:     2,
		GenerationID:  3,
		GroupProtocol: "a",
		LeaderID:      "b",
		MemberID:      "c",
		Members: []joinGroupResponseMemberV1{
			{
				MemberID:       "d",
				MemberMetadata: []byte("blah"),
			},
		},
	}

	buf := bytes.NewBuffer(nil)
	w := bufio.NewWriter(buf)
	item.writeTo(w)
	w.Flush()

	var found joinGroupResponseV1
	remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len())
	if err != nil {
		t.Error(err)
		t.FailNow()
	}
	if remain != 0 {
		t.Errorf("expected 0 remain, got %v", remain)
		t.FailNow()
	}
	if !reflect.DeepEqual(item, found) {
		t.Error("expected item and found to be the same")
		t.FailNow()
	}
}