83 | 83 |
* Fatal error: print error and exit
|
84 | 84 |
*/
|
85 | 85 |
void RD_NORETURN fatal0 (const char *func, int line,
|
86 | |
const char *fmt, ...) {
|
|
86 |
const char *fmt, ...) {
|
87 | 87 |
va_list ap;
|
88 | 88 |
char buf[1024];
|
89 | 89 |
|
|
129 | 129 |
|
130 | 130 |
if (rkmessage->err) {
|
131 | 131 |
KC_INFO(1, "Delivery failed for message: %s\n",
|
132 | |
rd_kafka_err2str(rkmessage->err));
|
|
132 |
rd_kafka_err2str(rkmessage->err));
|
133 | 133 |
stats.tx_err_dr++;
|
134 | 134 |
return;
|
135 | 135 |
}
|
136 | 136 |
|
137 | 137 |
KC_INFO(3, "Message delivered to partition %"PRId32" (offset %"PRId64")\n",
|
138 | |
rkmessage->partition, rkmessage->offset);
|
|
138 |
rkmessage->partition, rkmessage->offset);
|
139 | 139 |
|
140 | 140 |
if (rkmessage->offset == 0 && say_once) {
|
141 | 141 |
KC_INFO(3, "Enable message offset reporting "
|
142 | |
"with '-X topic.produce.offset.report=true'\n");
|
|
142 |
"with '-X topic.produce.offset.report=true'\n");
|
143 | 143 |
say_once = 0;
|
144 | 144 |
}
|
145 | 145 |
stats.tx_delivered++;
|
|
164 | 164 |
|
165 | 165 |
if (!conf.run)
|
166 | 166 |
KC_FATAL("Program terminated while "
|
167 | |
"producing message of %zd bytes", len);
|
|
167 |
"producing message of %zd bytes", len);
|
168 | 168 |
|
169 | 169 |
err = rd_kafka_producev(
|
170 | 170 |
conf.rk,
|
|
183 | 183 |
|
184 | 184 |
if (err != RD_KAFKA_RESP_ERR__QUEUE_FULL)
|
185 | 185 |
KC_FATAL("Failed to produce message (%zd bytes): %s",
|
186 | |
len, rd_kafka_err2str(err));
|
|
186 |
len, rd_kafka_err2str(err));
|
187 | 187 |
|
188 | 188 |
stats.tx_err_q++;
|
189 | 189 |
|
|
206 | 206 |
int fd;
|
207 | 207 |
void *ptr;
|
208 | 208 |
struct stat st;
|
209 | |
ssize_t sz;
|
210 | |
int msgflags = 0;
|
|
209 |
ssize_t sz;
|
|
210 |
int msgflags = 0;
|
211 | 211 |
|
212 | 212 |
if ((fd = _COMPAT(open)(path, O_RDONLY)) == -1) {
|
213 | 213 |
KC_INFO(1, "Failed to open %s: %s\n", path, strerror(errno));
|
|
233 | 233 |
_COMPAT(close)(fd);
|
234 | 234 |
return -1;
|
235 | 235 |
}
|
236 | |
sz = st.st_size;
|
237 | |
msgflags = RD_KAFKA_MSG_F_COPY;
|
|
236 |
sz = st.st_size;
|
|
237 |
msgflags = RD_KAFKA_MSG_F_COPY;
|
238 | 238 |
#else
|
239 | |
ptr = malloc(st.st_size);
|
240 | |
if (!ptr) {
|
241 | |
KC_INFO(1, "Failed to allocate message for %s: %s\n",
|
242 | |
path, strerror(errno));
|
243 | |
_COMPAT(close)(fd);
|
244 | |
return -1;
|
245 | |
}
|
246 | |
|
247 | |
sz = _read(fd, ptr, st.st_size);
|
248 | |
if (sz < st.st_size) {
|
249 | |
KC_INFO(1, "Read failed for %s (%zd/%zd): %s\n",
|
250 | |
path, sz, (size_t)st.st_size, sz == -1 ? strerror(errno) :
|
251 | |
"incomplete read");
|
252 | |
free(ptr);
|
253 | |
close(fd);
|
254 | |
return -1;
|
255 | |
}
|
256 | |
msgflags = RD_KAFKA_MSG_F_FREE;
|
|
239 |
ptr = malloc(st.st_size);
|
|
240 |
if (!ptr) {
|
|
241 |
KC_INFO(1, "Failed to allocate message for %s: %s\n",
|
|
242 |
path, strerror(errno));
|
|
243 |
_COMPAT(close)(fd);
|
|
244 |
return -1;
|
|
245 |
}
|
|
246 |
|
|
247 |
sz = _read(fd, ptr, st.st_size);
|
|
248 |
if (sz < st.st_size) {
|
|
249 |
KC_INFO(1, "Read failed for %s (%zd/%zd): %s\n",
|
|
250 |
path, sz, (size_t)st.st_size, sz == -1 ? strerror(errno) :
|
|
251 |
"incomplete read");
|
|
252 |
free(ptr);
|
|
253 |
close(fd);
|
|
254 |
return -1;
|
|
255 |
}
|
|
256 |
msgflags = RD_KAFKA_MSG_F_FREE;
|
257 | 257 |
#endif
|
258 | 258 |
|
259 | 259 |
KC_INFO(4, "Producing file %s (%"PRIdMAX" bytes)\n",
|
260 | |
path, (intmax_t)st.st_size);
|
261 | |
produce(ptr, sz, NULL, 0, msgflags);
|
262 | |
|
263 | |
_COMPAT(close)(fd);
|
264 | |
|
265 | |
if (!(msgflags & RD_KAFKA_MSG_F_FREE)) {
|
|
260 |
path, (intmax_t)st.st_size);
|
|
261 |
produce(ptr, sz, NULL, 0, msgflags);
|
|
262 |
|
|
263 |
_COMPAT(close)(fd);
|
|
264 |
|
|
265 |
if (!(msgflags & RD_KAFKA_MSG_F_FREE)) {
|
266 | 266 |
#ifndef _MSC_VER
|
267 | |
munmap(ptr, st.st_size);
|
|
267 |
munmap(ptr, st.st_size);
|
268 | 268 |
#else
|
269 | |
free(ptr);
|
270 | |
#endif
|
271 | |
}
|
272 | |
return sz;
|
|
269 |
free(ptr);
|
|
270 |
#endif
|
|
271 |
}
|
|
272 |
return sz;
|
273 | 273 |
}
|
274 | 274 |
|
275 | 275 |
|
|
317 | 317 |
conf.exitcode = 1;
|
318 | 318 |
else if (good < pathcnt)
|
319 | 319 |
KC_INFO(1, "Failed to produce from %i/%i files\n",
|
320 | |
pathcnt - good, pathcnt);
|
|
320 |
pathcnt - good, pathcnt);
|
321 | 321 |
|
322 | 322 |
} else {
|
323 | 323 |
/* Read messages from input, delimited by conf.delim */
|
|
392 | 392 |
if (conf.flags & CONF_F_TEE &&
|
393 | 393 |
fwrite(sbuf, orig_len, 1, stdout) != 1)
|
394 | 394 |
KC_FATAL("Tee write error for message of %zd bytes: %s",
|
395 | |
orig_len, strerror(errno));
|
|
395 |
orig_len, strerror(errno));
|
396 | 396 |
|
397 | 397 |
if (msgflags & RD_KAFKA_MSG_F_FREE) {
|
398 | 398 |
/* rdkafka owns the allocated buffer
|
|
409 | 409 |
if (conf.run) {
|
410 | 410 |
if (!feof(fp))
|
411 | 411 |
KC_FATAL("Unable to read message: %s",
|
412 | |
strerror(errno));
|
|
412 |
strerror(errno));
|
413 | 413 |
}
|
414 | 414 |
}
|
415 | 415 |
|
|
460 | 460 |
}
|
461 | 461 |
|
462 | 462 |
KC_INFO(1, "Reached end of topic %s [%"PRId32"] "
|
463 | |
"at offset %"PRId64"%s\n",
|
464 | |
rd_kafka_topic_name(rkmessage->rkt),
|
465 | |
rkmessage->partition,
|
466 | |
rkmessage->offset,
|
467 | |
!conf.run ? ": exiting" : "");
|
|
463 |
"at offset %"PRId64"%s\n",
|
|
464 |
rd_kafka_topic_name(rkmessage->rkt),
|
|
465 |
rkmessage->partition,
|
|
466 |
rkmessage->offset,
|
|
467 |
!conf.run ? ": exiting" : "");
|
468 | 468 |
}
|
469 | 469 |
|
470 | 470 |
|
|
485 | 485 |
|
486 | 486 |
if (rkmessage->rkt)
|
487 | 487 |
KC_FATAL("Topic %s [%"PRId32"] error: %s",
|
488 | |
rd_kafka_topic_name(rkmessage->rkt),
|
489 | |
rkmessage->partition,
|
490 | |
rd_kafka_message_errstr(rkmessage));
|
|
488 |
rd_kafka_topic_name(rkmessage->rkt),
|
|
489 |
rkmessage->partition,
|
|
490 |
rd_kafka_message_errstr(rkmessage));
|
491 | 491 |
else
|
492 | 492 |
KC_FATAL("Consumer error: %s",
|
493 | |
rd_kafka_message_errstr(rkmessage));
|
|
493 |
rd_kafka_message_errstr(rkmessage));
|
494 | 494 |
|
495 | 495 |
}
|
496 | 496 |
|
|
499 | 499 |
if (ts >= conf.stopts) {
|
500 | 500 |
stop_partition(rkmessage);
|
501 | 501 |
KC_INFO(1, "Reached stop timestamp for topic %s [%"PRId32"] "
|
502 | |
"at offset %"PRId64"%s\n",
|
503 | |
rd_kafka_topic_name(rkmessage->rkt),
|
504 | |
rkmessage->partition,
|
505 | |
rkmessage->offset,
|
506 | |
!conf.run ? ": exiting" : "");
|
|
502 |
"at offset %"PRId64"%s\n",
|
|
503 |
rd_kafka_topic_name(rkmessage->rkt),
|
|
504 |
rkmessage->partition,
|
|
505 |
rkmessage->offset,
|
|
506 |
!conf.run ? ": exiting" : "");
|
507 | 507 |
return;
|
508 | 508 |
}
|
509 | 509 |
}
|
|
528 | 528 |
static void throttle_cb (rd_kafka_t *rk, const char *broker_name,
|
529 | 529 |
int32_t broker_id, int throttle_time_ms, void *opaque){
|
530 | 530 |
KC_INFO(1, "Broker %s (%"PRId32") throttled request for %dms\n",
|
531 | |
broker_name, broker_id, throttle_time_ms);
|
|
531 |
broker_name, broker_id, throttle_time_ms);
|
532 | 532 |
}
|
533 | 533 |
#endif
|
534 | 534 |
|
|
551 | 551 |
void *opaque) {
|
552 | 552 |
|
553 | 553 |
KC_INFO(1, "Group %s rebalanced (memberid %s): ",
|
554 | |
conf.group, rd_kafka_memberid(rk));
|
555 | |
|
556 | |
switch (err)
|
557 | |
{
|
558 | |
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
|
|
554 |
conf.group, rd_kafka_memberid(rk));
|
|
555 |
|
|
556 |
switch (err)
|
|
557 |
{
|
|
558 |
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
|
559 | 559 |
if (conf.verbosity >= 1) {
|
560 | 560 |
fprintf(stderr, "assigned: ");
|
561 | 561 |
print_partition_list(1, partitions);
|
562 | 562 |
}
|
563 | |
rd_kafka_assign(rk, partitions);
|
564 | |
break;
|
565 | |
|
566 | |
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
|
|
563 |
rd_kafka_assign(rk, partitions);
|
|
564 |
break;
|
|
565 |
|
|
566 |
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
|
567 | 567 |
if (conf.verbosity >= 1) {
|
568 | 568 |
fprintf(stderr, "revoked: ");
|
569 | 569 |
print_partition_list(1, partitions);
|
570 | 570 |
}
|
571 | |
rd_kafka_assign(rk, NULL);
|
572 | |
break;
|
573 | |
|
574 | |
default:
|
|
571 |
rd_kafka_assign(rk, NULL);
|
|
572 |
break;
|
|
573 |
|
|
574 |
default:
|
575 | 575 |
KC_INFO(0, "failed: %s\n", rd_kafka_err2str(err));
|
576 | |
break;
|
577 | |
}
|
|
576 |
break;
|
|
577 |
}
|
578 | 578 |
}
|
579 | 579 |
|
580 | 580 |
/**
|
|
613 | 613 |
/* Subscribe */
|
614 | 614 |
if ((err = rd_kafka_subscribe(conf.rk, topiclist)))
|
615 | 615 |
KC_FATAL("Failed to subscribe to %d topics: %s\n",
|
616 | |
topiclist->cnt, rd_kafka_err2str(err));
|
|
616 |
topiclist->cnt, rd_kafka_err2str(err));
|
617 | 617 |
|
618 | 618 |
rd_kafka_topic_partition_list_destroy(topiclist);
|
619 | 619 |
|
|
650 | 650 |
int64_t *offsets;
|
651 | 651 |
rd_kafka_resp_err_t err;
|
652 | 652 |
rd_kafka_topic_partition_list_t *rktparlistp =
|
653 | |
rd_kafka_topic_partition_list_new(1);
|
|
653 |
rd_kafka_topic_partition_list_new(1);
|
654 | 654 |
|
655 | 655 |
for (i = 0 ; i < topic->partition_cnt ; i++) {
|
656 | 656 |
int32_t partition = topic->partitions[i].id;
|
|
661 | 661 |
continue;
|
662 | 662 |
|
663 | 663 |
rd_kafka_topic_partition_list_add(rktparlistp,
|
664 | |
rd_kafka_topic_name(conf.rkt),
|
665 | |
partition)->offset = conf.startts;
|
|
664 |
rd_kafka_topic_name(conf.rkt),
|
|
665 |
partition)->offset = conf.startts;
|
666 | 666 |
|
667 | 667 |
if (conf.partition != RD_KAFKA_PARTITION_UA)
|
668 | 668 |
break;
|
|
721 | 721 |
/* Query broker for topic + partition information. */
|
722 | 722 |
if ((err = rd_kafka_metadata(conf.rk, 0, conf.rkt, &metadata, 5000)))
|
723 | 723 |
KC_FATAL("Failed to query metadata for topic %s: %s",
|
724 | |
rd_kafka_topic_name(conf.rkt), rd_kafka_err2str(err));
|
|
724 |
rd_kafka_topic_name(conf.rkt), rd_kafka_err2str(err));
|
725 | 725 |
|
726 | 726 |
/* Error handling */
|
727 | 727 |
if (metadata->topic_cnt == 0)
|
728 | 728 |
KC_FATAL("No such topic in cluster: %s",
|
729 | |
rd_kafka_topic_name(conf.rkt));
|
|
729 |
rd_kafka_topic_name(conf.rkt));
|
730 | 730 |
|
731 | 731 |
if ((err = metadata->topics[0].err))
|
732 | 732 |
KC_FATAL("Topic %s error: %s",
|
733 | |
rd_kafka_topic_name(conf.rkt), rd_kafka_err2str(err));
|
|
733 |
rd_kafka_topic_name(conf.rkt), rd_kafka_err2str(err));
|
734 | 734 |
|
735 | 735 |
if (metadata->topics[0].partition_cnt == 0)
|
736 | 736 |
KC_FATAL("Topic %s has no partitions",
|
737 | |
rd_kafka_topic_name(conf.rkt));
|
|
737 |
rd_kafka_topic_name(conf.rkt));
|
738 | 738 |
|
739 | 739 |
/* If Exit-at-EOF is enabled, set up array to track EOF
|
740 | 740 |
* state for each partition. */
|
741 | 741 |
if (conf.exit_eof || conf.stopts) {
|
742 | 742 |
part_stop = calloc(sizeof(*part_stop),
|
743 | |
metadata->topics[0].partition_cnt);
|
|
743 |
metadata->topics[0].partition_cnt);
|
744 | 744 |
|
745 | 745 |
if (conf.partition != RD_KAFKA_PARTITION_UA)
|
746 | 746 |
part_stop_thres = 1;
|
|
750 | 750 |
|
751 | 751 |
#if RD_KAFKA_VERSION >= 0x00090300
|
752 | 752 |
if (conf.startts) {
|
753 | |
offsets = get_offsets(&metadata->topics[0]);
|
|
753 |
offsets = get_offsets(&metadata->topics[0]);
|
754 | 754 |
}
|
755 | 755 |
#endif
|
756 | 756 |
|
|
784 | 784 |
if (conf.partition != RD_KAFKA_PARTITION_UA &&
|
785 | 785 |
i == metadata->topics[0].partition_cnt)
|
786 | 786 |
KC_FATAL("Topic %s (with partitions 0..%i): "
|
787 | |
"partition %i does not exist",
|
788 | |
rd_kafka_topic_name(conf.rkt),
|
789 | |
metadata->topics[0].partition_cnt-1,
|
790 | |
conf.partition);
|
|
787 |
"partition %i does not exist",
|
|
788 |
rd_kafka_topic_name(conf.rkt),
|
|
789 |
metadata->topics[0].partition_cnt-1,
|
|
790 |
conf.partition);
|
791 | 791 |
|
792 | 792 |
|
793 | 793 |
/* Read messages from Kafka, write to 'fp'. */
|
|
808 | 808 |
conf.partition != partition)
|
809 | 809 |
continue;
|
810 | 810 |
|
811 | |
/* Dont stop already stopped partitions */
|
812 | |
if (!part_stop || !part_stop[partition])
|
813 | |
rd_kafka_consume_stop(conf.rkt, partition);
|
|
811 |
/* Dont stop already stopped partitions */
|
|
812 |
if (!part_stop || !part_stop[partition])
|
|
813 |
rd_kafka_consume_stop(conf.rkt, partition);
|
814 | 814 |
|
815 | 815 |
rd_kafka_consume_stop(conf.rkt, partition);
|
816 | 816 |
}
|
|
1209 | 1209 |
rd_kafka_resp_err_t fatal_err;
|
1210 | 1210 |
|
1211 | 1211 |
fatal_err = rd_kafka_fatal_error(rk, fatal_errstr,
|
1212 | |
sizeof(fatal_errstr));
|
|
1212 |
sizeof(fatal_errstr));
|
1213 | 1213 |
KC_INFO(0, "FATAL CLIENT ERROR: %s: %s: terminating\n",
|
1214 | 1214 |
rd_kafka_err2str(fatal_err), fatal_errstr);
|
1215 | 1215 |
conf.run = 0;
|
1216 | 1216 |
|
1217 | 1217 |
} else
|
1218 | 1218 |
#endif
|
1219 | |
if (err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) {
|
1220 | |
KC_ERROR("%s: %s", rd_kafka_err2str(err),
|
1221 | |
reason ? reason : "");
|
1222 | |
} else {
|
1223 | |
KC_INFO(1, "ERROR: %s: %s\n", rd_kafka_err2str(err),
|
1224 | |
reason ? reason : "");
|
1225 | |
}
|
|
1219 |
if (err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) {
|
|
1220 |
KC_ERROR("%s: %s", rd_kafka_err2str(err),
|
|
1221 |
reason ? reason : "");
|
|
1222 |
} else {
|
|
1223 |
KC_INFO(1, "ERROR: %s: %s\n", rd_kafka_err2str(err),
|
|
1224 |
reason ? reason : "");
|
|
1225 |
}
|
1226 | 1226 |
}
|
1227 | 1227 |
|
1228 | 1228 |
|
|
1818 | 1818 |
else
|
1819 | 1819 |
conf.mode = 'P';
|
1820 | 1820 |
KC_INFO(1, "Auto-selecting %s mode (use -P or -C to override)\n",
|
1821 | |
conf.mode == 'C' ? "Consumer":"Producer");
|
|
1821 |
conf.mode == 'C' ? "Consumer":"Producer");
|
1822 | 1822 |
}
|
1823 | 1823 |
|
1824 | 1824 |
|
|
1940 | 1940 |
|
1941 | 1941 |
} else if (conf.mode == 'P') {
|
1942 | 1942 |
conf.delim = parse_delim(delim);
|
1943 | |
if (conf.flags & CONF_F_KEY_DELIM)
|
1944 | |
conf.key_delim = parse_delim(key_delim);
|
|
1943 |
if (conf.flags & CONF_F_KEY_DELIM)
|
|
1944 |
conf.key_delim = parse_delim(key_delim);
|
1945 | 1945 |
}
|
1946 | 1946 |
|
1947 | 1947 |
/* Automatically enable API version requests if needed and
|
|
1959 | 1959 |
|
1960 | 1960 |
int main (int argc, char **argv) {
|
1961 | 1961 |
#ifdef SIGIO
|
1962 | |
char tmp[16];
|
|
1962 |
char tmp[16];
|
1963 | 1963 |
#endif
|
1964 | 1964 |
FILE *in = stdin;
|
1965 | |
struct timeval tv;
|
|
1965 |
struct timeval tv;
|
1966 | 1966 |
rd_kafka_topic_partition_list_t *rktparlist = NULL;
|
1967 | 1967 |
|
1968 | 1968 |
/* Certain Docker images don't have kafkacat as the entry point,
|
|
1982 | 1982 |
signal(SIGPIPE, term);
|
1983 | 1983 |
#endif
|
1984 | 1984 |
|
1985 | |
/* Seed rng for random partitioner, jitter, etc. */
|
1986 | |
rd_gettimeofday(&tv, NULL);
|
1987 | |
srand(tv.tv_usec);
|
|
1985 |
/* Seed rng for random partitioner, jitter, etc. */
|
|
1986 |
rd_gettimeofday(&tv, NULL);
|
|
1987 |
srand(tv.tv_usec);
|
1988 | 1988 |
|
1989 | 1989 |
/* Create config containers */
|
1990 | 1990 |
conf.rk_conf = rd_kafka_conf_new();
|