Codebase list golang-github-segmentio-kafka-go / HEAD commit.go
HEAD

Tree @HEAD (Download .tar.gz)

commit.go @HEADraw · history · blame

package kafka

// A commit represents the instruction of publishing an update of the last
// offset read by a program for a topic and partition.
type commit struct {
	topic     string
	partition int
	offset    int64
}

// makeCommit builds a commit value from a message, the resulting commit takes
// its topic, partition, and offset from the message.
func makeCommit(msg Message) commit {
	return commit{
		topic:     msg.Topic,
		partition: msg.Partition,
		offset:    msg.Offset + 1,
	}
}

// makeCommits generates a slice of commits from a list of messages, it extracts
// the topic, partition, and offset of each message and builds the corresponding
// commit slice.
func makeCommits(msgs ...Message) []commit {
	commits := make([]commit, len(msgs))

	for i, m := range msgs {
		commits[i] = makeCommit(m)
	}

	return commits
}

// commitRequest is the data type exchanged between the CommitMessages method
// and internals of the reader's implementation.
type commitRequest struct {
	commits []commit
	errch   chan<- error
}