本文将为您介绍火山引擎 E-MapReduce(EMR)kafka 组件相关的高阶使用,方便您更深入的使用 Kafka。
您可以在 EMR 控制台的集群管理页面,进行 Kafka 集群的扩容操作。
开源 Kafka 扩容新的 broker 后,流量不会自动迁移到新 broker 上。通常有两种方式将流量迁移到新的 broker。
扩分区:脚本直接扩容分区。比如之前有 12 个分区,扩容到 24 个分区。新分区会根据策略分配到新的 broker 上,是最简单的方式。缺点是老的分区还是在老的 broker 上,集群整体上流量是不均衡的。
Reassign:这种方式即迁移分区数据到新的 broker,步骤相对复杂。
执行以下命令实现扩分区操作:
/usr/lib/emr/current/kafka/bin/kafka-topics.sh --alter --zookeeper {zookeeper_connect} --topic {topic} --partitions {num}
针对已有分区数已经超过 24,且数据占比较大的情况,则考虑使用如下方式进行均衡。
脚本:kafka-reassign-partitions.sh,其主要的三个操作:
--generate:生成分区重分配计划
--execute:执行分区重分配计划
--verify:验证分区重分配结果
将要处理的 topic 信息按照如下格式保存到 JSON 文件。例如要处理的 topic 为 test,则将 topic 信息保存为 topic_test.json,内容如下:
{ "topics": [ { "topic": "test" } ], "version": 1 }
可以参考下面的命令。
注意
参数 --broker-list 中的 broker 标识,是 broker.id 值。集群的 broker.id 都填写上,即生成建议的分区信息。
sh bin/kafka-reassign-partitions.sh --zookeeper {zookeeper_connect} --topics-to-move-json-file {topic_test.json} --broker-list "1001,1002,1003,1004,1005,1006" --generate
执行完 generate 命令,会有两个 JSON 数据:
第一个是 Current partition replica assignment,即当前的分配。
另一个是建议的分配:Proposed partition reassignment configuration,即要保存的文件。
将建议的分配保存为 JSON,例如:topic_test_reassignment.json
sh bin/kafka-reassign-partitions.sh --zookeeper {zookeeper_connect} --reassignment-json-file {topic_test_reassignment.json} --execute
sh bin/kafka-reassign-partitions.sh --zookeeper {zookeeper_connect} --reassignment-json-file {topic_test_reassignment.json} --verify
结果都为 successfully 即成功。如果数据量大,可能需要等待一段时间。
说明
本节内容适用于 EMR 3.4.X 及以前的软件栈版本。EMR 3.5.0 及以后的软件栈版本无需做这里的配置操作。
Kafka 集群在创建时如果给 Broker 所在的 Core 节点组绑定了公网 IP,会自动配置 Kafka Broker 的 advertised.listeners 等参数,使 Kafka Broker 可以通过公网 IP(端口号:19092)和内网地址(端口号:9092)访问。
在 Kafka 集群扩容 Core 节点后,如果仍需要通过公网地址访问 Kafka Broker,需要执行如下操作:
说明
在 Kafka 集群扩容 Core 节点后,如果不执行下述操作,仍可以通过内网地址访问 Kafka Broker。
给所有新扩容出来的 Core 节点绑定公网 IP。
在集群列表 > Kafka 集群名称 > 服务列表 > Kafka服务名称 > 服务参数中,修改该集群的 Kafka 服务的配置参数:kafka_broker_hostname_eip_map_str。该参数的值为一个 JSON 字符串,包含 Kafka Broker 各个节点的 hostname 到公网 IP 的映射。在该参数的值中,增加新扩容出来的 Core 节点的 hostname 到公网 IP 的映射。
重启 Kafka 服务中心扩容出来的 Core 节点下的 Kafka Broker 组件。
完成上述操作执行成功后,就可以继续通过公网地址访问 Kafka Broker 了。