Codebase list kafkacat / HEAD
HEAD

Tree @HEAD (Download .tar.gz)

![logo by @dtrapezoid](./resources/kcat_small.png)

# kcat

**kcat is the project formerly known as as kafkacat**

kcat and kafkacat are Copyright (c) 2014-2021 Magnus Edenhill

[https://github.com/edenhill/kcat](https://github.com/edenhill/kcat)

*kcat logo by [@dtrapezoid](https://twitter.com/dtrapezoid)*


## What is kcat

**kcat** is a generic non-JVM producer and consumer for Apache Kafka >=0.8,
think of it as a netcat for Kafka.

In **producer** mode kcat reads messages from stdin, delimited with a
configurable delimiter (-D, defaults to newline), and produces them to the
provided Kafka cluster (-b), topic (-t) and partition (-p).

In **consumer** mode kcat reads messages from a topic and partition and
prints them to stdout using the configured message delimiter.

There's also support for the Kafka >=0.9 high-level balanced consumer, use
the `-G <group>` switch and provide a list of topics to join the group.

kcat also features a Metadata list (-L) mode to display the current
state of the Kafka cluster and its topics and partitions.

Supports Avro message deserialization using the Confluent Schema-Registry,
and generic primitive deserializers (see examples below).

kcat is fast and lightweight; statically linked it is no more than 150Kb.

## What happened to kafkacat?

**kcat is kafkacat**. The kafkacat project was renamed to kcat in August 2021
to adhere to the Apache Software Foundation's (ASF) trademark policies.
Apart from the name, nothing else was changed.


## Try it out with docker

```bash
# List brokers and topics in cluster
$ docker run -it --network=host edenhill/kcat:1.7.1 -b YOUR_BROKER -L
```

See [Examples](#examples) for usage options, and [Running in Docker](#running-in-docker) for more information on how to properly run docker-based clients with Kafka.


## Install

### On recent enough Debian systems:

````
apt-get install kafkacat
````

On recent openSUSE systems:

```
zypper addrepo https://download.opensuse.org/repositories/network:utilities/openSUSE_Factory/network:utilities.repo
zypper refresh
zypper install kafkacat
```
(see [this page](https://software.opensuse.org/download/package?package=kafkacat&project=network%3Autilities) for instructions to install with openSUSE LEAP)

### On Mac OS X with homebrew installed:

````
brew install kcat
````

### On Fedora

```
# dnf copr enable bvn13/kafkacat
# dnf update
# dnf install kafkacat
```

See [this blog](https://rmoff.net/2020/04/20/how-to-install-kafkacat-on-fedora/) for how to build from sources and install kafkacat/kcat on recent Fedora systems.


### Otherwise follow directions below


## Requirements

 * librdkafka - https://github.com/edenhill/librdkafka
 * libyajl (for JSON support, optional)
 * libavro-c and libserdes (for Avro support, optional. See https://github.com/confluentinc/libserdes)

On Ubuntu or Debian: `sudo apt-get install librdkafka-dev libyajl-dev`

## Build

    ./configure <usual-configure-options>
    make
    sudo make install

### Build for Windows

    cd win32
    nuget restore
    msbuild

## Quick build

The bootstrap.sh build script will download and build the required dependencies,
providing a quick and easy means of building kcat.
Internet connectivity and wget/curl is required by this script.
The resulting kcat binary will be linked statically to avoid runtime
dependencies.
**NOTE**: Requires `curl` and `cmake` (for yajl) to be installed.

    ./bootstrap.sh


## Configuration

Any librdkafka [configuration](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
property can be set on the command line using `-X property=value`, or in
a configuration file specified by `-F <config-file>`.

If no configuration file was specified with `-F ..` on the command line,
kcat will try the `$KCAT_CONFIG` or (deprecated) `$KAFKACAT_CONFIG`
environment variable,
and then the default configuration file `~/.config/kcat.conf` or
the (deprecated) `~/.config/kafkacat.conf`.

Configuration files are optional.


## Examples

High-level balanced KafkaConsumer: subscribe to topic1 and topic2
(requires broker >=0.9.0 and librdkafka version >=0.9.1)

    $ kcat -b mybroker -G mygroup topic1 topic2


Read messages from stdin, produce to 'syslog' topic with snappy compression

    $ tail -f /var/log/syslog | kcat -b mybroker -t syslog -z snappy


Read messages from Kafka 'syslog' topic, print to stdout

    $ kcat -b mybroker -t syslog


Produce messages from file (one file is one message)

    $ kcat -P -b mybroker -t filedrop -p 0 myfile1.bin /etc/motd thirdfile.tgz


Produce messages transactionally (one single transaction for all messages):

    $ kcat -P -b mybroker -t mytopic -X transactional.id=myproducerapp


Read the last 2000 messages from 'syslog' topic, then exit

    $ kcat -C -b mybroker -t syslog -p 0 -o -2000 -e


Consume from all partitions from 'syslog' topic

    $ kcat -C -b mybroker -t syslog


Output consumed messages in JSON envelope:

    $ kcat -b mybroker -t syslog -J


Decode Avro key (`-s key=avro`), value (`-s value=avro`) or both (`-s avro`) to JSON using schema from the Schema-Registry:

    $ kcat -b mybroker -t ledger -s avro -r http://schema-registry-url:8080


Decode Avro message value and extract Avro record's "age" field:

    $ kcat -b mybroker -t ledger -s value=avro -r http://schema-registry-url:8080 | jq .payload.age


Decode key as 32-bit signed integer and value as 16-bit signed integer followed by an unsigned byte followed by string:

    $ kcat -b mybroker -t mytopic -s key='i$' -s value='hB s'


*Hint: see `kcat -h` for all available deserializer options.*


Output consumed messages according to format string:

    $ kcat -b mybroker -t syslog -f 'Topic %t[%p], offset: %o, key: %k, payload: %S bytes: %s\n'


Read the last 100 messages from topic 'syslog' with  librdkafka configuration parameter 'broker.version.fallback' set to '0.8.2.1' :

    $ kcat -C -b mybroker -X broker.version.fallback=0.8.2.1 -t syslog -p 0 -o -100 -e


Produce a tombstone (a "delete" for compacted topics) for key "abc" by providing an empty message value which `-Z` interpretes as NULL:

    $ echo "abc:" | kcat -b mybroker -t mytopic -Z -K:


Produce with headers:

    $ echo "hello there" | kcat -b mybroker -H "header1=header value" -H "nullheader" -H "emptyheader=" -H "header1=duplicateIsOk"


Print headers in consumer:

    $ kcat -b mybroker -C -t mytopic -f 'Headers: %h: Message value: %s\n'


Enable the idempotent producer, providing exactly-once and strict-ordering
**producer** guarantees:

    $ kcat -b mybroker -X enable.idempotence=true -P -t mytopic ....


Connect to cluster using SSL and SASL PLAIN authentication:

    $ kcat -b mybroker -X security.protocol=SASL_SSL -X sasl.mechanism=PLAIN -X sasl.username=myapikey -X sasl.password=myapisecret ...


Metadata listing:

```
$ kcat -L -b mybroker
Metadata for all topics (from broker 1: mybroker:9092/1):
 3 brokers:
  broker 1 at mybroker:9092
  broker 2 at mybrokertoo:9092
  broker 3 at thirdbroker:9092
 16 topics:
  topic "syslog" with 3 partitions:
    partition 0, leader 3, replicas: 1,2,3, isrs: 1,2,3
    partition 1, leader 1, replicas: 1,2,3, isrs: 1,2,3
    partition 2, leader 1, replicas: 1,2, isrs: 1,2
  topic "rdkafkatest1_auto_49f744a4327b1b1e" with 2 partitions:
    partition 0, leader 3, replicas: 3, isrs: 3
    partition 1, leader 1, replicas: 1, isrs: 1
  topic "rdkafkatest1_auto_e02f58f2c581cba" with 2 partitions:
    partition 0, leader 3, replicas: 3, isrs: 3
    partition 1, leader 1, replicas: 1, isrs: 1
  ....
```


JSON metadata listing

    $ kcat -b mybroker -L -J


Pretty-printed JSON metadata listing

    $ kcat -b mybroker -L -J | jq .


Query offset(s) by timestamp(s)

    $ kcat -b mybroker -Q -t mytopic:3:2389238523 -t mytopic2:0:18921841


Consume messages between two timestamps

    $ kcat -b mybroker -C -t mytopic -o s@1568276612443 -o e@1568276617901



## Running in Docker

The latest kcat docker image is `edenhill/kcat:1.7.1`, there's
also [Confluent's kafkacat docker images on Docker Hub](https://hub.docker.com/r/confluentinc/cp-kafkacat/).

If you are connecting to Kafka brokers also running on Docker you should specify the network name as part of the `docker run` command using the `--network` parameter. For more details of networking with Kafka and Docker [see this post](https://rmoff.net/2018/08/02/kafka-listeners-explained/).

Here are two short examples of using kcat from Docker. See the [Docker Hub listing](https://hub.docker.com/r/confluentinc/cp-kafkacat/) and [kafkacat docs](https://docs.confluent.io/current/app-development/kafkacat-usage.html) for more details:

**Send messages using [here doc](http://tldp.org/LDP/abs/html/here-docs.html):**

```
docker run -it --rm \
        edenhill/kcat \
                -b kafka-broker:9092 \
                -t test \
                -K: \
                -P <<EOF

1:{"order_id":1,"order_ts":1534772501276,"total_amount":10.50,"customer_name":"Bob Smith"}
2:{"order_id":2,"order_ts":1534772605276,"total_amount":3.32,"customer_name":"Sarah Black"}
3:{"order_id":3,"order_ts":1534772742276,"total_amount":21.00,"customer_name":"Emma Turner"}
EOF
```

**Consume messages:**

```
docker run -it --rm \
        edenhill/kcat \
           -b kafka-broker:9092 \
           -C \
           -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n' \
           -t test

Key (1 bytes): 1
Value (88 bytes): {"order_id":1,"order_ts":1534772501276,"total_amount":10.50,"customer_name":"Bob Smith"}
Partition: 0    Offset: 0
--

Key (1 bytes): 2
Value (89 bytes): {"order_id":2,"order_ts":1534772605276,"total_amount":3.32,"customer_name":"Sarah Black"}
Partition: 0    Offset: 1
--

Key (1 bytes): 3
Value (90 bytes): {"order_id":3,"order_ts":1534772742276,"total_amount":21.00,"customer_name":"Emma Turner"}
Partition: 0    Offset: 2
--
% Reached end of topic test [0] at offset 3
```