本文以 C++ 客户端为例,介绍如何在 VPC 或公网环境下通过 SASL_PLAINTEXT 接入点 PLAIN 机制接入消息队列 Kafka版,并收发消息。
已完成准备工作。详细说明请参考准备工作。
创建消息发送程序 producer.cpp
。
执行以下命令编译 producer.cpp
。
gcc -lrdkafka ./producer.cpp -o producer
执行以下命令发送消息。
从命令行接收消息并发送至 Kafka。
./producer -b <bootstrap_servers> -t <topic> -u <user> -p <password> -m PLAIN
查看运行结果。
运行结果示例如下。
说明
消息队列 Kafka版提供示例项目供您快速接入,下载并解压缩 Demo 后,可以直接执行以下命令发送并消费消息。
./producer -b <bootstrap_servers> -t <topic> -u <user> -p <password> -m PLAIN
通过 SASL_SSL 接入点生产消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/producer.cpp
,实现相关业务逻辑。
/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /** * Apache Kafka producer example programs * using the Kafka driver from librdkafka * (https://github.com/edenhill/librdkafka) */ #include <ctype.h> #include <signal.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <syslog.h> #include <time.h> #include <sys/time.h> #include <getopt.h> /* Typical include path would be <librdkafka/rdkafka.h>, but this program * is builtin from within the librdkafka source tree and thus differs. */ #include "librdkafka/rdkafka.h" /* for Kafka driver */ static volatile sig_atomic_t run = 1; static rd_kafka_t *rk; static void stop(int sig) { run = 0; fclose(stdin); /* abort fgets() */ } /** * Kafka logger callback (optional) */ static void logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf) { struct timeval tv; gettimeofday(&tv, NULL); fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", (int)tv.tv_sec, (int)(tv.tv_usec / 1000), level, fac, rk ? rd_kafka_name(rk) : NULL, buf); } /** * Message delivery report callback using the richer rd_kafka_message_t object. */ static void msg_delivered(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { if (rkmessage->err) fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err)); else fprintf(stderr, "%% Message delivered (%zd bytes, offset %" PRId64 ", " "partition %" PRId32 "): %.*s\n", rkmessage->len, rkmessage->offset, rkmessage->partition, (int)rkmessage->len, (const char *)rkmessage->payload); } static void sig_usr1(int sig) { rd_kafka_dump(stdout, rk); } int main(int argc, char **argv) { rd_kafka_topic_t *rkt; char *brokers = NULL; char *topic = NULL; char *user = NULL; char *password = NULL; char *mechanisms = NULL; int partition = RD_KAFKA_PARTITION_UA; int opt; rd_kafka_conf_t *conf; char errstr[512]; char tmp[16]; rd_kafka_resp_err_t err; bool printUsage = true; /* Kafka configuration */ conf = rd_kafka_conf_new(); /* Set logger */ rd_kafka_conf_set_log_cb(conf, logger); /* Quick termination */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); while ((opt = getopt(argc, argv, "h:t:b:u:p:m:e:d")) != -1) { switch (opt) { case 't': topic = optarg; break; case 'b': brokers = optarg; break; case 'u': user = optarg; break; case 'p': password = optarg; break; case 'm': mechanisms = optarg; break; case 'd': if (rd_kafka_conf_set(conf, "debug", optarg, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% Debug configuration failed: " "%s: %s\n", errstr, optarg); exit(1); } break; default: goto usage; } printUsage = false; } if (printUsage) { usage: fprintf(stderr, "\n" " Options:\n" " -t <topic> 必填,需要生产的主题,可以从kafka主题页获取\n" " -b <brokers> 必填,kafka实例的接入点,可以从实例详情页获取\n" " -u <user> 选填,使用SASL接入点时必填,可以从用户管理页面获取用户名称\n" " -p <password> 选填,使用SASL接入点时必填,需要user对应的密码\n" " -m <mechanisms> 选填, 使用SASL接入点时必填,加密类型,取值为PLAIN|SCRAM-SHA-256,可以从用户管理页面查询\n" " -d [facs..] 打印debug日志:\n" "\n"); exit(1); } signal(SIGINT, stop); signal(SIGUSR1, sig_usr1); /* Set bootstrap servers */ if (brokers && rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } if (mechanisms != NULL) { if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || // 设置通信协议 rd_kafka_conf_set(conf, "sasl.mechanisms", mechanisms, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || // 设置加密协议 rd_kafka_conf_set(conf, "sasl.username", user, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || // 设置用户 rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { // 设置bootstrap地址 fprintf(stderr, "%% %s\n", errstr); exit(1); } } /* * Producer */ char buf[2048]; /* Set up a message delivery report callback. * It will be called once for each message, either on successful * delivery to broker, or upon failure to deliver to broker. */ rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered); /* Create Kafka handle */ if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)))) { fprintf(stderr, "%% Failed to create new producer: %s\n", errstr); exit(1); } /* Create topic */ rkt = rd_kafka_topic_new(rk, topic, NULL); fprintf(stderr, "%% Type stuff and hit enter to send\n"); while (run && fgets(buf, sizeof(buf), stdin)) { size_t len = strlen(buf); if (buf[len - 1] == '\n') buf[--len] = '\0'; err = RD_KAFKA_RESP_ERR_NO_ERROR; /* Send/Produce message. */ if (rd_kafka_produce( rkt, partition, RD_KAFKA_MSG_F_COPY, /* Payload and length */ buf, len, /* Optional key and its length */ NULL, 0, /* Message opaque, provided in * delivery report callback as * msg_opaque. */ NULL) == -1) { err = rd_kafka_last_error(); } if (err) { fprintf(stderr, "%% Failed to produce to topic %s " "partition %i: %s\n", rd_kafka_topic_name(rkt), partition, rd_kafka_err2str(err)); /* Poll to handle delivery reports */ rd_kafka_poll(rk, 0); continue; } fprintf(stderr, "%% Sent %zd bytes to topic " "%s partition %i\n", len, rd_kafka_topic_name(rkt), partition); /* Poll to handle delivery reports */ rd_kafka_poll(rk, 0); } /* Poll to handle delivery reports */ rd_kafka_poll(rk, 0); /* Wait for messages to be delivered */ while (run && rd_kafka_outq_len(rk) > 0) rd_kafka_poll(rk, 100); /* Destroy topic */ rd_kafka_topic_destroy(rkt); /* Destroy the handle */ rd_kafka_destroy(rk); /* Let background threads clean up and terminate cleanly. */ run = 5; while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1) printf("Waiting for librdkafka to decommission\n"); if (run <= 0) rd_kafka_dump(stdout, rk); return 0; }
创建 Consumer 订阅消息程序 consumer.cpp
。
执行以下命令编译 consumer.cpp
。
gcc -lrdkafka ./consumer.cpp -o consumer
执行如下命令消费消息。
./consumer -b <bootstrap_servers> -t <topic> -p group -u <user> -p <password> -m PLAIN
查看运行结果。
运行结果示例如下。
通过 SASL_PLAINTEXT 接入点消费消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/consumer.cpp
,实现相关业务逻辑。
/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /** * Apache Kafka consumer example programs * using the Kafka driver from librdkafka * (https://github.com/edenhill/librdkafka) */ #include <ctype.h> #include <signal.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <syslog.h> #include <time.h> #include <sys/time.h> #include <getopt.h> /* Typical include path would be <librdkafka/rdkafka.h>, but this program * is builtin from within the librdkafka source tree and thus differs. */ #include "librdkafka/rdkafka.h" /* for Kafka driver */ static volatile sig_atomic_t run = 1; static rd_kafka_t *rk; static int wait_eof = 0; /* number of partitions awaiting EOF */ static void stop(int sig) { run = 0; fclose(stdin); /* abort fgets() */ } /** * Kafka logger callback (optional) */ static void logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf) { struct timeval tv; gettimeofday(&tv, NULL); fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", (int)tv.tv_sec, (int)(tv.tv_usec / 1000), level, fac, rk ? rd_kafka_name(rk) : NULL, buf); } static void msg_consume(rd_kafka_message_t *rkmessage) { if (rkmessage->err) { if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { fprintf(stderr, "%% Consumer reached end of %s [%" PRId32 "] " "message queue at offset %" PRId64 "\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset); return; } fprintf(stderr, "%% Consume error for topic \"%s\" [%" PRId32 "] " "offset %" PRId64 ": %s\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset, rd_kafka_message_errstr(rkmessage)); if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION || rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) run = 0; return; } if (rkmessage->key_len) { printf("Key: %.*s\n", (int)rkmessage->key_len, (char *)rkmessage->key); } printf("%.*s\n", (int)rkmessage->len, (char *)rkmessage->payload); } static void print_partition_list(FILE *fp, const rd_kafka_topic_partition_list_t *partitions) { int i; for (i = 0; i < partitions->cnt; i++) { fprintf(fp, "%s %s [%" PRId32 "] offset %" PRId64, i > 0 ? "," : "", partitions->elems[i].topic, partitions->elems[i].partition, partitions->elems[i].offset); } fprintf(fp, "\n"); } static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque) { fprintf(stderr, "%% Consumer group rebalanced: "); switch (err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: fprintf(stderr, "assigned:\n"); print_partition_list(stderr, partitions); rd_kafka_assign(rk, partitions); wait_eof += partitions->cnt; break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: fprintf(stderr, "revoked:\n"); print_partition_list(stderr, partitions); rd_kafka_assign(rk, NULL); wait_eof = 0; break; default: fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err)); rd_kafka_assign(rk, NULL); break; } } static void sig_usr1(int sig) { rd_kafka_dump(stdout, rk); } int main(int argc, char **argv) { char *brokers = NULL; char *topic = NULL; char *user = NULL; char *password = NULL; char *mechanisms = NULL; char *group_id = NULL; int partition = RD_KAFKA_PARTITION_UA; int opt; rd_kafka_conf_t *conf; char errstr[512]; char tmp[16]; rd_kafka_topic_partition_list_t *topics; rd_kafka_resp_err_t err; bool printUsage = true; /* Kafka configuration */ conf = rd_kafka_conf_new(); /* Set logger */ rd_kafka_conf_set_log_cb(conf, logger); /* Quick termination */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); while ((opt = getopt(argc, argv, "h:t:b:u:p:g:m:e:d")) != -1) { switch (opt) { case 't': topic = optarg; break; case 'b': brokers = optarg; break; case 'u': user = optarg; break; case 'p': password = optarg; break; case 'm': mechanisms = optarg; break; case 'g': group_id = optarg; break; case 'd': if (rd_kafka_conf_set(conf, "debug", optarg, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% Debug configuration failed: " "%s: %s\n", errstr, optarg); exit(1); } break; default: goto usage; } printUsage = false; } if (printUsage) { usage: fprintf(stderr, "\n" " Options:\n" " -t <topic> 必填,需要生产的主题,可以从kafka主题页获取\n" " -b <brokers> 必填,kafka实例的接入点,可以从实例详情页获取\n" " -g <group> 必填, kafka消费组,当消费组不存在时,客户端会自动创建消费组\n" " -u <user> 选填,使用SASL接入点时必填,可以从用户管理页面获取用户名称\n" " -p <password> 选填,使用SASL接入点时必填,需要user对应的密码\n" " -m <mechanisms> 选填, 使用SASL接入点时必填,加密类型,取值为PLAIN|SCRAM-SHA-256,可以从用户管理页面查询\n" "\n"); exit(1); } signal(SIGINT, stop); signal(SIGUSR1, sig_usr1); /* Set bootstrap servers */ if (brokers && rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } /* Set group id */ if (rd_kafka_conf_set(conf, "group.id", group_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } else { /* Callback called on partition assignment changes */ rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb); rd_kafka_conf_set(conf, "enable.partition.eof", "true", NULL,0); } /* 设置其实offset */ if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } if (mechanisms != NULL) { if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || // 设置通信协议 rd_kafka_conf_set(conf, "sasl.mechanisms", mechanisms, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || // 设置加密协议 rd_kafka_conf_set(conf, "sasl.username", user, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || // 设置用户 rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { // 设置bootstrap地址 fprintf(stderr, "%% %s\n", errstr); exit(1); } } /* Create Kafka handle */ if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) { fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr); exit(1); } /* Redirect rd_kafka_poll() to consumer_poll() */ rd_kafka_poll_set_consumer(rk); topics = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(topics, topic, partition); /* 订阅topic */ if ((err = rd_kafka_subscribe(rk, topics))) { fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err)); exit(1); } while (run) { rd_kafka_message_t *rkmessage; rkmessage = rd_kafka_consumer_poll(rk, 1000); if (rkmessage) { msg_consume(rkmessage); rd_kafka_message_destroy(rkmessage); } } /* Stop consuming */ err = rd_kafka_consumer_close(rk); if (err) fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err)); else fprintf(stderr, "%% Consumer closed\n"); rd_kafka_topic_partition_list_destroy(topics); /* Destroy the handle */ rd_kafka_destroy(rk); /* Let background threads clean up and terminate cleanly. */ run = 5; while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1) printf("Waiting for librdkafka to decommission\n"); if (run <= 0) rd_kafka_dump(stdout, rk); return 0; }