本文以 Java 客户端为例,介绍如何在 VPC 或公网环境下通过 SASL_SSL 接入点 SCRAM 机制接入消息队列 Kafka版,并收发消息。
已完成准备工作。详细说明请参考准备工作。
在 Java 项目的 pom.xml
中添加相关依赖。此处以 Kafka 2.2.2 版本为例。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.6</version> </dependency>
创建消息队列 Kafka版配置文件 config.properties
。
配置文件字段的详细说明,请参考配置文件。
说明
SCRAM 机制下,应使用具备对应 Topic 访问权限的 SCRAM 用户进行 SASL 认证。获取用户名及密码的方式请参考2 收集连接信息。
通过 SASL_SSL 接入点 SCRAM 机制接入时,配置文件示例如下。
bootstrap.servers=xxxxx security.protocol=SASL_SSL topic=my-topic consumer.group.id=test consumer.auto.offset.reset=earliest consumer.enable.auto.commit=false client.dns.lookup=use_all_dns_ips sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="xxxx" password="xxxxx";
创建配置文件加载程序KafkaConfigurer.java
。
package com.volcengine.openservice.kafka; import java.io.FileInputStream; import java.io.InputStream; import java.util.Properties; public class KafkaConfigurer { private static Properties properties; public synchronized static Properties getKafkaProperties(String path) { if (null != properties) { return properties; } //获取配置文件config.properties的内容 Properties kafkaProperties = new Properties(); try { FileInputStream conf = new FileInputStream(path); kafkaProperties.load(conf); } catch (Exception e) { //没加载到文件,程序要考虑退出 e.printStackTrace(); } properties = kafkaProperties; return kafkaProperties; } }
创建发送消息程序 ProducerDemo.java
。
编译并运行 ProducerDemo.java
发送消息。
查看运行结果。
运行结果示例如下。
Produce ok:sasl-0@0 Produce ok:sasl-0@1 Produce ok:sasl-0@2 Produce ok:sasl-0@3
说明
消息队列 Kafka版提供示例项目供您快速接入,下载并解压缩 Demo 后,可以直接执行以下命令发送消息。
java -cp kafka-demo.jar com.volcengine.openservice.kafka.ProducerDemo ./ssl.config.properties
通过 SASL_SSL 接入点 SCRAM 机制生产消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/src/main/java/com/volcengine/openservice/kafka/ProducerDemo.java
,实现相关业务逻辑。
package com.volcengine.openservice.kafka; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SslConfigs; class Producer { // 生产者使用的topic private static String topic; // 生产者使用的配置 private static Properties props = new Properties(); private static KafkaProducer<String, String> producer; // 构造生产者 public Producer(Properties kafkaProperties) { setProps(kafkaProperties); setSasl(kafkaProperties); setTopic(kafkaProperties); newProducer(); } // 设置生成的topic, 请在控制台申请之后,填写在这里 private void setTopic(Properties kafkaProperties) { topic = kafkaProperties.getProperty("topic"); } // 设置安全协议 private void setSasl(Properties kafkaProperties) { String protocol = kafkaProperties.getProperty("security.protocol"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol); // 需要获取加密类型以及sasl的config if (protocol.equals("SASL_SSL")) { props.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getProperty("sasl.mechanism")); props.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaProperties.getProperty("sasl.jaas.config")); return; } throw new IllegalArgumentException("security.protocol is not correct"); } // 设置生产者的启动参数 private void setProps(Properties kafkaProperties) { //设置接入点,请通过控制台获取对应Topic的接入点 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers")); //Kafka消息的序列化方式 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); //请求的最长等待时间 props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000); //设置客户端内部重试次数 props.put(ProducerConfig.RETRIES_CONFIG, 5); //设置客户端内部重试间隔 props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000); // 设置客户端的dns策略 props.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, kafkaProperties.getProperty("client.dns.lookup")); } // 构造生产者对象,也即生成一个生产实例 private void newProducer() { // 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可; // 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个 producer = new KafkaProducer<String, String>(props); } public void Produce() { //构造一个Kafka消息 String value = "this is demo message "; //消息的内容 //批量获取 futures 可以加快速度, 但注意,批量不要太大 List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128); for (int i =0; i < 100; i++) { //发送消息,并获得一个Future对象 ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, value + ": " + i); Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage); futures.add(metadataFuture); } producer.flush(); for (Future<RecordMetadata> future: futures) { //同步获得Future对象的结果 try { RecordMetadata recordMetadata = future.get(); System.out.println("Produce ok:" + recordMetadata.toString()); } catch (Throwable t) { t.printStackTrace(); } } } } public class ProducerDemo { public static void main(String args[]) { if (args.length < 1) { System.out.println("please input config path"); return; } // 获取生产者对象 Producer producer = new Producer(KafkaConfigurer.getKafkaProperties(args[0])); try { producer.Produce(); } catch (Exception e) { // 客户端内部重试之后,仍然发送失败,业务要应对此类错误 System.out.println("error occurred"); e.printStackTrace(); } } }
ConsumerDemo.java
。ConsumerDemo.java
消费消息。Consume: ConsumerRecord(topic = sasl, patition = 6, leaderEpoch=0, offset=0, CreateTime = 1637207092476, serialized key size = -1, serialized key size = 24, headers = ReacordHeaders(headers = [], isReadOnly = false), key = null, value = this is demo, message : 0) Consume: ConsumerRecord(topic = sasl, patition = 6, leaderEpoch=0, offset=0, CreateTime = 1637207092476, serialized key size = -1, serialized key size = 24, headers = ReacordHeaders(headers = [], isReadOnly = false), key = null, value = this is demo, message : 1)
说明
消息队列 Kafka版提供示例项目供您快速接入,下载并解压缩Demo后,可以直接执行以下命令消费消息。
java -cp kafka-demo.jar com.volcengine.openservice.kafka.ConsumerDemo ./ssl.config.properties
通过 SASL_SSL 接入点 SCRAM 机制消费消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/src/main/java/com/volcengine/openservice/kafka/ConsumerDemo.java
,实现相关业务逻辑。
package com.volcengine.openservice.kafka; import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; class Consumer { // 消费者的启动配置 private static Properties props = new Properties(); private KafkaConsumer<String, String> consumer; // 消费者实例构造函数 public Consumer(Properties kafkaProperties) { setProps(kafkaProperties); setSasl(kafkaProperties); newConsumer(); subscribed(kafkaProperties); } // 设置安全协议 private void setSasl(Properties kafkaProperties) { String protocol = kafkaProperties.getProperty("security.protocol"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol); // 需要获取加密类型以及sasl的config if (protocol.equals("SASL_SSL")) { props.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getProperty("sasl.mechanism")); props.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaProperties.getProperty("sasl.jaas.config")); return; } throw new IllegalArgumentException("security.protocol is not correct"); } // 设置消费者的启动参数 private void setProps(Properties kafkaProperties) { //设置接入点,请通过控制台获取对应Topic的接入点 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers")); props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); //可根据实际拉取数据和客户的版本等设置此值,默认30s props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); //每次poll的最大数量 //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); //消息的反序列化方式 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //当前消费实例所属的消费组 //属于同一个组的消费实例,会负载消费消息 props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("consumer.group.id")); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getProperty("consumer.auto.offset.reset")); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getProperty("consumer.enable.auto.commit")); // 设置客户端的dns策略 props.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, kafkaProperties.getProperty("client.dns.lookup")); } // 构造消费者对象,也即生成一个消费实例 private void newConsumer() { consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props); } // 订阅topic private void subscribed(Properties kafkaProperties) { //设置消费组订阅的Topic List<String> subscribedTopic = new ArrayList<String>(); //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样 //Topic需要先在控制台进行创建 subscribedTopic.add(kafkaProperties.getProperty("topic")); consumer.subscribe(subscribedTopic); } // 消费消息 public void poll() { ConsumerRecords<String, String> records = consumer.poll(1000); //必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG //建议开一个单独的线程池来消费消息,然后异步返回结果 for (ConsumerRecord<String, String> record : records) { System.out.printf("Consume: %s%n", record.toString()); } } } public class ConsumerDemo { public static void main(String args[]) { if (args.length < 1) { System.out.println("please input config path"); return; } // 获取消费者对象 Consumer consumer = new Consumer(KafkaConfigurer.getKafkaProperties(args[0])); //循环消费消息 while (true){ try { consumer.poll(); } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } } } }