Codebase list kafkacat / af881b0
Avro consumer: print value_schema_id and key_schema_id (#311) Magnus Edenhill 3 years ago
9 changed file(s) with 118 addition(s) and 9 deletion(s). Raw diff Collapse all Expand all
22 * Add support for multibyte delimiters to `-D` and `-K` (#140, #280)
33 * Add support for `-X partition.assignment.strategy=cooperative-sticky` incremental rebalancing.
44 * High-level consumer `-G` now supports exit-on-eof `-e` option (#86)
5 * Avro consumer with -J will now emit `key_schema_id` and `value_schema_id`.
56
67
78 # kafkacat v1.6.0
117117 *
118118 * @returns newly allocated JSON string, or NULL on error.
119119 */
120 char *kc_avro_to_json (const void *data, size_t data_len,
120 char *kc_avro_to_json (const void *data, size_t data_len, int *schema_idp,
121121 char *errstr, size_t errstr_size) {
122122 avro_value_t avro;
123123 serdes_schema_t *schema;
147147 return NULL;
148148 }
149149
150 if (schema && schema_idp)
151 *schema_idp = serdes_schema_id(schema);
152
150153 avro_value_decref(&avro);
151154
152155 return json;
399399 char *json = kc_avro_to_json(
400400 rkmessage->key,
401401 rkmessage->key_len,
402 NULL,
402403 errstr, sizeof(errstr));
403404
404405 if (!json) {
445446 char *json = kc_avro_to_json(
446447 rkmessage->payload,
447448 rkmessage->len,
448 errstr,
449 sizeof(errstr));
449 NULL,
450 errstr, sizeof(errstr));
450451
451452 if (!json) {
452453 what_failed =
3333 const char *_s = (STR); \
3434 yajl_gen_string(G, (const unsigned char *)_s, strlen(_s)); \
3535 } while (0)
36 #define JS_INT(G, INT) yajl_gen_integer(g, INT)
3637
3738 void fmt_msg_output_json (FILE *fp, const rd_kafka_message_t *rkmessage) {
3839 yajl_gen g;
116117 #if ENABLE_AVRO && YAJL_HAS_GEN_VERBATIM
117118 if (conf.flags & CONF_F_FMT_AVRO_KEY) {
118119 char errstr[256];
120 int schema_id = -1;
119121 char *json = kc_avro_to_json(
120122 rkmessage->key,
121123 rkmessage->key_len,
124 &schema_id,
122125 errstr, sizeof(errstr));
123126
124127 if (!json) {
131134 yajl_gen_null(g);
132135 JS_STR(g, "key_error");
133136 JS_STR(g, errstr);
134 } else
137 } else {
135138 yajl_gen_verbatim(g, json, strlen(json));
139 JS_STR(g, "key_schema_id");
140 JS_INT(g, schema_id);
141 }
136142 free(json);
137143 } else
138144 #endif
147153 #if ENABLE_AVRO && YAJL_HAS_GEN_VERBATIM
148154 if (conf.flags & CONF_F_FMT_AVRO_VALUE) {
149155 char errstr[256];
156 int schema_id = -1;
150157 char *json = kc_avro_to_json(
151158 rkmessage->payload,
152159 rkmessage->len,
160 &schema_id,
153161 errstr, sizeof(errstr));
154162
155163 if (!json) {
162170 yajl_gen_null(g);
163171 JS_STR(g, "payload_error");
164172 JS_STR(g, errstr);
165 } else
173 } else {
166174 yajl_gen_verbatim(g, json, strlen(json));
175 JS_STR(g, "value_schema_id");
176 JS_INT(g, schema_id);
177 }
178
167179 free(json);
168180 } else
169181 #endif
14431443 " \"broker\": int,\n"
14441444 " \"headers\": { \"<name>\": str, .. }, // optional\n"
14451445 " \"key\": str|json, \"payload\": str|json,\n"
1446 " \"key_error\": str, \"payload_error\": str } //optional\n"
1447 " (note: key_error and payload_error are only included if "
1448 "deserialization failed)\n"
1446 " \"key_error\": str, \"payload_error\": str, //optional\n"
1447 " \"key_schema_id\": int, "
1448 "\"value_schema_id\": int //optional\n"
1449 " }\n"
1450 " notes:\n"
1451 " - key_error and payload_error are only included if "
1452 "deserialization fails.\n"
1453 " - key_schema_id and value_schema_id are included for "
1454 "successfully deserialized Avro messages.\n"
14491455 "\n"
14501456 #endif
14511457 "Consumer mode (writes messages to stdout):\n"
209209 /*
210210 * avro.c
211211 */
212 char *kc_avro_to_json (const void *data, size_t data_len,
212 char *kc_avro_to_json (const void *data, size_t data_len, int *schema_idp,
213213 char *errstr, size_t errstr_size);
214214
215215 void kc_avro_init (const char *key_schema_name,
0 #!/bin/bash
1 #
2
3 set -e
4 source helpers.sh
5
6
7 #
8 # Verify Avro consumer, requires docker and a running trivup cluster with
9 # Kafka and Schema-registry.
10 #
11
12
13 if [[ -z $SR_URL ]]; then
14 SKIP "No schema-registry available (SR_URL env not set)"
15 fi
16
17 if ! $KAFKACAT -V | grep -q ^Version.*Avro.*builtin\.features; then
18 SKIP "Kafkacat not built with Avro support"
19 fi
20
21 topic=$(make_topic_name)
22
23 create_topic $topic 1
24
25 info "Producing Avro message to $topic"
26
27 echo '{"number": 63, "name": "TestName"}' |
28 docker run --network=host -i \
29 confluentinc/cp-schema-registry:6.0.0 \
30 kafka-avro-console-producer \
31 --bootstrap-server $BROKERS \
32 --topic $topic \
33 --property schema.registry.url="$SR_URL" \
34 --property value.schema="$(< basic_schema.avsc)"
35
36 info "Reading Avro messages"
37 output=$($KAFKACAT -C -r $SR_URL -t $topic -o beginning -e -s value=avro | \
38 jq -r '(.name + "=" + (.number | tostring))')
39
40 exp="TestName=63"
41
42 if [[ $output != $exp ]]; then
43 echo "FAIL: Expected '$exp', not '$output'"
44 exit 1
45 fi
46
47
48 PASS "Expected output seen: $output"
0 {
1 "name": "basic",
2 "type": "record",
3 "doc": "basic schema for tests",
4 "namespace": "python.test",
5 "fields": [
6 {
7 "name": "number",
8 "doc": "age",
9 "type": "long"
10 },
11 {
12 "name": "name",
13 "doc": "a name",
14 "type": "string"
15 }
16 ]
17 }
11
22 CLR_BGRED="\033[37;41m"
33 CLR_BGGREEN="\033[37;42m"
4 CLR_YELLOW="\033[33m"
45 CLR_INFO="\033[34m"
56 CLR="\033[0m"
67
1718 echo "kafkacat_test_$$_${RANDOM}_${TEST_NAME}name"
1819 }
1920
21 function create_topic {
22 local topic=$1
23 local partitions=$2
24 info "Creating topic $topic with $partitions partition(s)"
25 $KAFKA_PATH/bin/kafka-topics.sh \
26 --bootstrap-server $BROKERS \
27 --create \
28 --topic "$topic" \
29 --partitions $partitions \
30 --replication-factor 1
31 }
2032
2133
2234 function info {
3446 local str=$1
3547 echo -e "${CLR_BGGREEN}${TEST_NAME} | TEST PASSED: $str${CLR}"
3648 }
49
50
51 function SKIP {
52 local str=$1
53 echo -e "${CLR_YELLOW}${TEST_NAME} | TEST SKIPPED: $str${CLR}"
54 exit 0
55 }