Codebase list golang-github-segmentio-kafka-go / HEAD crc32.go
HEAD

Tree @HEAD (Download .tar.gz)

crc32.go @HEADraw · history · blame

package kafka

import (
	"bytes"
	"encoding/binary"
	"hash/crc32"
	"sync"
)

func crc32OfMessage(magicByte int8, attributes int8, timestamp int64, key []byte, value []byte) uint32 {
	b := acquireCrc32Buffer()
	b.writeInt8(magicByte)
	b.writeInt8(attributes)
	if magicByte != 0 {
		b.writeInt64(timestamp)
	}
	b.writeBytes(key)
	b.writeBytes(value)
	sum := b.sum
	releaseCrc32Buffer(b)
	return sum
}

type crc32Buffer struct {
	sum uint32
	buf bytes.Buffer
}

func (c *crc32Buffer) writeInt8(i int8) {
	c.buf.Truncate(0)
	c.buf.WriteByte(byte(i))
	c.update()
}

func (c *crc32Buffer) writeInt32(i int32) {
	a := [4]byte{}
	binary.BigEndian.PutUint32(a[:], uint32(i))
	c.buf.Truncate(0)
	c.buf.Write(a[:])
	c.update()
}

func (c *crc32Buffer) writeInt64(i int64) {
	a := [8]byte{}
	binary.BigEndian.PutUint64(a[:], uint64(i))
	c.buf.Truncate(0)
	c.buf.Write(a[:])
	c.update()
}

func (c *crc32Buffer) writeBytes(b []byte) {
	if b == nil {
		c.writeInt32(-1)
	} else {
		c.writeInt32(int32(len(b)))
	}
	c.sum = crc32Update(c.sum, b)
}

func (c *crc32Buffer) update() {
	c.sum = crc32Update(c.sum, c.buf.Bytes())
}

func crc32Update(sum uint32, b []byte) uint32 {
	return crc32.Update(sum, crc32.IEEETable, b)
}

var crc32BufferPool = sync.Pool{
	New: func() interface{} { return &crc32Buffer{} },
}

func acquireCrc32Buffer() *crc32Buffer {
	c := crc32BufferPool.Get().(*crc32Buffer)
	c.sum = 0
	return c
}

func releaseCrc32Buffer(b *crc32Buffer) {
	crc32BufferPool.Put(b)
}