You need to enable JavaScript to run this app.
导航
SASL_SSL 接入点 PLAIN 机制收发消息
最近更新时间:2023.09.12 15:16:14首次发布时间:2023.03.08 13:55:40

本文以 C++ 客户端为例,介绍如何在 VPC 或公网环境下通过 SASL_SSL 接入点 PLAIN 机制接入消息队列 Kafka版,并收发消息。

前提条件

已完成准备工作。详细说明请参考准备工作

1 发送消息

实现方法

  1. 创建消息发送程序 producer_ssl.cpp

  2. 执行以下命令编译 producer_ssl.cpp

    gcc -lrdkafka ./producer_ssl.cpp -o producer_ssl
    
  3. 执行以下命令发送消息。
    从命令行接收消息并发送至 Kafka。

    ./producer_ssl -b <bootstrap_servers> -t <topic> -u <user> -p <password> -m PLAIN
    
  4. 查看运行结果。
    运行结果示例如下。其中,9492 为 SASL_SSL 协议端口。
    图片

    说明

    消息队列 Kafka版提供示例项目供您快速接入,下载并解压缩 Demo 后,可以直接执行以下命令发送并消费消息。

    ./producer_ssl -b <bootstrap_servers> -t <topic> -u <user> -p <password> -m PLAIN
    

示例代码

通过 SASL_SSL 接入点生产消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/producer_ssl.cpp,实现相关业务逻辑。

/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012, Magnus Edenhill
 * All rights reserved. 
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * Apache Kafka producer example programs
 * using the Kafka driver from librdkafka
 * (https://github.com/edenhill/librdkafka)
 */

#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <time.h>
#include <sys/time.h>
#include <getopt.h>

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is builtin from within the librdkafka source tree and thus differs. */
#include "librdkafka/rdkafka.h" /* for Kafka driver */

static volatile sig_atomic_t run = 1;
static rd_kafka_t *rk;

static void stop(int sig) {
    run = 0;
    fclose(stdin); /* abort fgets() */
}

/**
 * Kafka logger callback (optional)
 */
static void
logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf) {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", (int)tv.tv_sec,
            (int)(tv.tv_usec / 1000), level, fac,
            rk ? rd_kafka_name(rk) : NULL, buf);
}


/**
 * Message delivery report callback using the richer rd_kafka_message_t object.
 */
static void msg_delivered(rd_kafka_t *rk,
                          const rd_kafka_message_t *rkmessage,
                          void *opaque) {
    if (rkmessage->err)
        fprintf(stderr,
                "%% Message delivery failed: %s\n",
                rd_kafka_err2str(rkmessage->err));
    else
        fprintf(stderr,
                "%% Message delivered (%zd bytes, offset %" PRId64
                ", "
                "partition %" PRId32 "): %.*s\n",
                rkmessage->len, rkmessage->offset, rkmessage->partition,
                (int)rkmessage->len, (const char *)rkmessage->payload);
}

static void sig_usr1(int sig) {
    rd_kafka_dump(stdout, rk);
}

int main(int argc, char **argv) {
    rd_kafka_topic_t *rkt;
    char *brokers = NULL;
    char *topic   = NULL;
    char *user    = NULL;
    char *password = NULL;
    char *mechanisms = NULL;
    int partition = RD_KAFKA_PARTITION_UA;
    int opt;
    rd_kafka_conf_t *conf;
    char errstr[512];
    char tmp[16];
    rd_kafka_resp_err_t err;
    bool printUsage = true;

    /* Kafka configuration */
    conf = rd_kafka_conf_new();

    /* Set logger */
    rd_kafka_conf_set_log_cb(conf, logger);

    /* Quick termination */
    snprintf(tmp, sizeof(tmp), "%i", SIGIO);
    rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);

    while ((opt = getopt(argc, argv, "h:t:b:u:p:m:e:d")) != -1) {
        switch (opt) {
            case 't':
                topic = optarg;
                break;
            case 'b':
                brokers = optarg;
                break;
            case 'u':
                user = optarg;
                break;
            case 'p':
                password = optarg;
                break;
            case 'm':
                mechanisms = optarg;
                break;
            case 'd':
                if (rd_kafka_conf_set(conf, "debug", optarg, errstr,
                                      sizeof(errstr)) !=
                    RD_KAFKA_CONF_OK) {
                    fprintf(stderr,
                            "%% Debug configuration failed: "
                            "%s: %s\n",
                            errstr, optarg);
                    exit(1);
                }
                break;
            default:
                goto usage;
        }
        printUsage = false;
    }

    if (printUsage) {
        usage:
        fprintf(stderr,
                "\n"
                " Options:\n"
                "  -t <topic>      必填,需要生产的主题,可以从kafka主题页获取\n"
                "  -b <brokers>    必填,kafka实例的接入点,可以从实例详情页获取\n"
                "  -u <user>       选填,使用SASL接入点时必填,可以从用户管理页面获取用户名称\n"
                "  -p <password>   选填,使用SASL接入点时必填,需要user对应的密码\n"
                "  -m <mechanisms> 选填, 使用SASL接入点时必填,加密类型,取值为PLAIN|SCRAM-SHA-256,可以从用户管理页面查询\n"
                "\n");
        exit(1);
    }

    signal(SIGINT, stop);
    signal(SIGUSR1, sig_usr1);

    /* Set bootstrap servers */
    if (brokers &&
        rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
                          sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    if (rd_kafka_conf_set(conf, "security.protocol", "sasl_ssl", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||   // 设置通信协议
        rd_kafka_conf_set(conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||     // 跳过ssl验证
        rd_kafka_conf_set(conf, "sasl.mechanisms", mechanisms, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||     // 设置加密协议
        rd_kafka_conf_set(conf, "sasl.username", user, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||             // 设置用户
        rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {    // 设置bootstrap地址
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    /*
     * Producer
     */
    char buf[2048];

    /* Set up a message delivery report callback.
     * It will be called once for each message, either on successful
     * delivery to broker, or upon failure to deliver to broker. */
    rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered);

    /* Create Kafka handle */
    if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
                            sizeof(errstr)))) {
        fprintf(stderr,
                "%% Failed to create new producer: %s\n",
                errstr);
        exit(1);
    }

    /* Create topic */
    rkt        = rd_kafka_topic_new(rk, topic, NULL);

    fprintf(stderr,
            "%% Type stuff and hit enter to send\n");

    while (run && fgets(buf, sizeof(buf), stdin)) {
        size_t len = strlen(buf);
        if (buf[len - 1] == '\n')
            buf[--len] = '\0';

        err = RD_KAFKA_RESP_ERR_NO_ERROR;

        /* Send/Produce message. */
        if (rd_kafka_produce(
                rkt, partition, RD_KAFKA_MSG_F_COPY,
                /* Payload and length */
                buf, len,
                /* Optional key and its length */
                NULL, 0,
                /* Message opaque, provided in
                 * delivery report callback as
                 * msg_opaque. */
                NULL) == -1) {
            err = rd_kafka_last_error();
        }

        if (err) {
            fprintf(stderr,
                    "%% Failed to produce to topic %s "
                    "partition %i: %s\n",
                    rd_kafka_topic_name(rkt), partition,
                    rd_kafka_err2str(err));

            /* Poll to handle delivery reports */
            rd_kafka_poll(rk, 0);
            continue;
        }

        fprintf(stderr,
                "%% Sent %zd bytes to topic "
                "%s partition %i\n",
                len, rd_kafka_topic_name(rkt),
                partition);

        /* Poll to handle delivery reports */
        rd_kafka_poll(rk, 0);
    }

    /* Poll to handle delivery reports */
    rd_kafka_poll(rk, 0);

    /* Wait for messages to be delivered */
    while (run && rd_kafka_outq_len(rk) > 0)
        rd_kafka_poll(rk, 100);

    /* Destroy topic */
    rd_kafka_topic_destroy(rkt);

    /* Destroy the handle */
    rd_kafka_destroy(rk);

    /* Let background threads clean up and terminate cleanly. */
    run = 5;
    while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
        printf("Waiting for librdkafka to decommission\n");
    if (run <= 0)
        rd_kafka_dump(stdout, rk);

    return 0;
}

3 消费消息

实现方法

  1. 创建 Consumer 订阅消息程序 consumer_ssl.cpp

  2. 执行以下命令编译 consumer_ssl.cpp

    gcc -lrdkafka ./consumer_ssl.cpp -o consumer_ssl
    
  3. 执行如下命令消费消息。

    ./consumer_ssl -b <bootstrap_servers> -t <topic> -p group -u <user> -p <password> -m PLAIN
    
  4. 查看运行结果。
    运行结果示例如下。其中,9492 为 SASL_SSL 协议端口。
    图片

示例代码

通过 SASL_SSL 接入点消费消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/consumer_ssl.cpp,实现相关业务逻辑。

/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012, Magnus Edenhill
 * All rights reserved. 
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * Apache Kafka consumer example programs
 * using the Kafka driver from librdkafka
 * (https://github.com/edenhill/librdkafka)
 */

#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <time.h>
#include <sys/time.h>
#include <getopt.h>

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is builtin from within the librdkafka source tree and thus differs. */
#include "librdkafka/rdkafka.h" /* for Kafka driver */


static volatile sig_atomic_t run = 1;
static rd_kafka_t *rk;
static int wait_eof = 0; /* number of partitions awaiting EOF */

static void stop(int sig) {
    run = 0;
    fclose(stdin); /* abort fgets() */
}

/**
 * Kafka logger callback (optional)
 */
static void
logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf) {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", (int)tv.tv_sec,
            (int)(tv.tv_usec / 1000), level, fac,
            rk ? rd_kafka_name(rk) : NULL, buf);
}

static void msg_consume(rd_kafka_message_t *rkmessage) {
    if (rkmessage->err) {
        if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
            fprintf(stderr,
                    "%% Consumer reached end of %s [%" PRId32
                    "] "
                    "message queue at offset %" PRId64 "\n",
                    rd_kafka_topic_name(rkmessage->rkt),
                    rkmessage->partition, rkmessage->offset);
            return;
        }

        fprintf(stderr,
                "%% Consume error for topic \"%s\" [%" PRId32
                "] "
                "offset %" PRId64 ": %s\n",
                rd_kafka_topic_name(rkmessage->rkt),
                rkmessage->partition, rkmessage->offset,
                rd_kafka_message_errstr(rkmessage));

        if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
            rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
            run = 0;
        return;
    }

    if (rkmessage->key_len) {
        printf("Key: %.*s\n", (int)rkmessage->key_len,
               (char *)rkmessage->key);

    }

    printf("%.*s\n", (int)rkmessage->len,
           (char *)rkmessage->payload);
}

static void
print_partition_list(FILE *fp,
                     const rd_kafka_topic_partition_list_t *partitions) {
    int i;
    for (i = 0; i < partitions->cnt; i++) {
        fprintf(fp, "%s %s [%" PRId32 "] offset %" PRId64,
                i > 0 ? "," : "", partitions->elems[i].topic,
                partitions->elems[i].partition,
                partitions->elems[i].offset);
    }
    fprintf(fp, "\n");
}
static void rebalance_cb (rd_kafka_t *rk,
                          rd_kafka_resp_err_t err,
                          rd_kafka_topic_partition_list_t *partitions,
                          void *opaque) {

    fprintf(stderr, "%% Consumer group rebalanced: ");

    switch (err)
    {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            fprintf(stderr, "assigned:\n");
            print_partition_list(stderr, partitions);
            rd_kafka_assign(rk, partitions);
            wait_eof += partitions->cnt;
            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            fprintf(stderr, "revoked:\n");
            print_partition_list(stderr, partitions);
            rd_kafka_assign(rk, NULL);
            wait_eof = 0;
            break;

        default:
            fprintf(stderr, "failed: %s\n",
                    rd_kafka_err2str(err));
            rd_kafka_assign(rk, NULL);
            break;
    }
}

static void sig_usr1(int sig) {
    rd_kafka_dump(stdout, rk);
}

int main(int argc, char **argv) {
    char *brokers = NULL;
    char *topic   = NULL;
    char *user    = NULL;
    char *password = NULL;
    char *mechanisms = NULL;
    char *group_id = NULL;
    int partition = RD_KAFKA_PARTITION_UA;
    int opt;
    rd_kafka_conf_t *conf;
    char errstr[512];
    char tmp[16];
    rd_kafka_topic_partition_list_t *topics;
    rd_kafka_resp_err_t err;
    bool printUsage = true;

    /* Kafka configuration */
    conf = rd_kafka_conf_new();

    /* Set logger */
    rd_kafka_conf_set_log_cb(conf, logger);

    /* Quick termination */
    snprintf(tmp, sizeof(tmp), "%i", SIGIO);
    rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);

    while ((opt = getopt(argc, argv, "h:t:b:u:p:g:m:e:d")) != -1) {
        switch (opt) {
            case 't':
                topic = optarg;
                break;
            case 'b':
                brokers = optarg;
                break;
            case 'u':
                user = optarg;
                break;
            case 'p':
                password = optarg;
                break;
            case 'm':
                mechanisms = optarg;
                break;
            case 'g':
                group_id = optarg;
                break;
            case 'd':
                if (rd_kafka_conf_set(conf, "debug", optarg, errstr,
                                      sizeof(errstr)) !=
                    RD_KAFKA_CONF_OK) {
                    fprintf(stderr,
                            "%% Debug configuration failed: "
                            "%s: %s\n",
                            errstr, optarg);
                    exit(1);
                }
                break;
            default:
                goto usage;
        }

        printUsage = false;
    }

    if (printUsage) {
        usage:
        fprintf(stderr,
                "\n"
                " Options:\n"
                "  -t <topic>      必填,需要生产的主题,可以从kafka主题页获取\n"
                "  -b <brokers>    必填,kafka实例的接入点,可以从实例详情页获取\n"
                "  -g <group>      必填, kafka消费组,当消费组不存在时,客户端会自动创建消费组\n"
                "  -u <user>       选填,使用SASL接入点时必填,可以从用户管理页面获取用户名称\n"
                "  -p <password>   选填,使用SASL接入点时必填,需要user对应的密码\n"
                "  -m <mechanisms> 选填, 使用SASL接入点时必填,加密类型,取值为PLAIN|SCRAM-SHA-256,可以从用户管理页面查询\n"
                "\n");
        exit(1);
    }

    signal(SIGINT, stop);
    signal(SIGUSR1, sig_usr1);

    /* Set bootstrap servers */
    if (brokers &&
        rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
                          sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    /* Set group id */
    if (rd_kafka_conf_set(conf, "group.id", group_id,
                          errstr, sizeof(errstr)) !=
        RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    } else {
        /* Callback called on partition assignment changes */
        rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
        rd_kafka_conf_set(conf, "enable.partition.eof", "true", NULL,0);
    }

    /* 设置其实offset */
    if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
                          errstr, sizeof(errstr)) !=
        RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    if (mechanisms != NULL) {
        if (rd_kafka_conf_set(conf, "security.protocol", "sasl_ssl", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||   // 设置通信协议
            rd_kafka_conf_set(conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||     // 跳过ssl验证
            rd_kafka_conf_set(conf, "sasl.mechanisms", mechanisms, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||              // 设置加密协议
            rd_kafka_conf_set(conf, "sasl.username", user, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||                    // 设置用户
            rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {    // 设置bootstrap地址
            fprintf(stderr, "%% %s\n", errstr);
            exit(1);
        }
    }

    /* Create Kafka handle */
    if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,
                            sizeof(errstr)))) {
        fprintf(stderr,
                "%% Failed to create new consumer: %s\n",
                errstr);
        exit(1);
    }

    /* Redirect rd_kafka_poll() to consumer_poll() */
    rd_kafka_poll_set_consumer(rk);
    topics = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(topics, topic, partition);

    /* 订阅topic */
    if ((err = rd_kafka_subscribe(rk, topics))) {
        fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
        exit(1);
    }

    while (run) {
        rd_kafka_message_t *rkmessage;

        rkmessage = rd_kafka_consumer_poll(rk, 1000);
        if (rkmessage) {
            msg_consume(rkmessage);
            rd_kafka_message_destroy(rkmessage);
        }
    }

    /* Stop consuming */
    err = rd_kafka_consumer_close(rk);
    if (err)
        fprintf(stderr, "%% Failed to close consumer: %s\n",
                rd_kafka_err2str(err));
    else
        fprintf(stderr, "%% Consumer closed\n");

    rd_kafka_topic_partition_list_destroy(topics);

    /* Destroy the handle */
    rd_kafka_destroy(rk);

    /* Let background threads clean up and terminate cleanly. */
    run = 5;
    while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
        printf("Waiting for librdkafka to decommission\n");
    if (run <= 0)
        rd_kafka_dump(stdout, rk);

    return 0;
}