对于一些自建在VPC内的Kafka有暴露到外网的需求,那么我们就可以通过Nginx代理来做四层代理,转发请求。
预计部署时间:30分钟
级别:初级
相关产品:同VPC内的ECS两台(1台做Nginx代理,1台做Kafka Server)
受众: 通用
1.下载安装nginx,确保编译过程中添加"--with-stream"模块,如果需要其他模块可以自行参考Nginx官网文档
#下载Nginx源码包 wget https://nginx.org/download/nginx-1.20.1.tar.gz #解压源码包 tar -zxvf nginx-1.20.1.tar.gz #进入解压后的目录并编译安装软件 cd nginx-1.20.1 ./configure --with-stream make && make install
2.检查运行Nginx是否有启动stream模块
[root@JMS conf.d]# nginx -V 2>&1| grep stream configure arguments: --prefix=/etc/nginx --sbin-path=/usr/sbin/nginx --conf-path=/etc/nginx/nginx.conf --error-log-path=/var/log/nginx/error.log --http-log-path=/var/log/nginx/access.log --pid-path=/var/run/nginx.pid --lock-path=/var/run/nginx.lock --http-client-body-temp-path=/var/cache/nginx/client_temp --http-proxy-temp-path=/var/cache/nginx/proxy_temp --http-fastcgi-temp-path=/var/cache/nginx/fastcgi_temp --http-uwsgi-temp-path=/var/cache/nginx/uwsgi_temp --http-scgi-temp-path=/var/cache/nginx/scgi_temp --user=nginx --group=nginx --with-file-aio --with-threads --with-http_addition_module --with-http_auth_request_module --with-http_dav_module --with-http_flv_module --with-http_gunzip_module --with-http_gzip_static_module --with-http_mp4_module --with-http_random_index_module --with-http_realip_module --with-http_secure_link_module --with-http_slice_module --with-http_ssl_module --with-http_stub_status_module --with-http_sub_module --with-http_v2_module --with-mail --with-mail_ssl_module --with-stream --with-stream_realip_module --with-stream_ssl_module --with-stream_ssl_preread_module
3.修改配置文件,本实验只部署了单点的Kafka测试,如果是生产环境需要再upstream中添加多个kafka地址。
stream{ upstream brokers{ server 192.168.1.254:9092; } server{ listen 9092; proxy_pass brokers; } }
4.热加载配置文件,确认配置文件编写无误程序没有报错。
[root@JMS conf.d]# nginx -s reload
# 下载并解压软件包 wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz # 解压进入目录 tar -xzf kafka_2.13-3.0.0.tgz cd kafka_2.13-3.0.0 #启动自带的zookeeper `bin/zookeeper-server-start.sh config/zookeeper.properties`
# cat /etc/hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.1.254 opendts
listener.security.protocol.map=INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT listeners=INTERNAL://192.168.1.254:9092 # 配置刚才的域名opendts advertised.listeners=INTERNAL://opendts:9092 inter.broker.listener.name=INTERNAL
bin/kafka-server-start.sh config/server.properties
在两台ECS所在的安全组规则中设置开放9092端口
1.本地/etc/hosts文件配置域名解析
#这里一定要配置,因为第一次访问kafka返回的地址是opendts,如果不解析到nginx的公网ip180.184.70.* #数据将无法连接传输 180.184.70.* opendts
2.准备python环境运行如下脚本
from kafka import KafkaProducer from kafka import KafkaConsumer def test(): producer = KafkaProducer(bootstrap_servers=['180.184.70.*:9092']) msg='demomessage' future = producer.send('my_topic' , msg.encode('utf-8')) result = future.get(timeout= 10) print(result) consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['180.184.70.*:9092']) for msg1 in consumer: print(msg1) if __name__ == '__main__': test()
3.查看本地客户端日志输出,确认已经正常生产和消费消息。
RecordMetadata(topic='my_topic', partition=0, topic_partition=TopicPartition(topic='my_topic', partition=0), offset=3, timestamp=1652182581743, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=11, serialized_header_size=-1) ConsumerRecord(topic='my_topic', partition=0, offset=3, timestamp=1652182581743, timestamp_type=0, key=None, value=b'demomessage', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=11, serialized_header_size=-1)
如果您有其他问题,欢迎您联系火山引擎技术支持服务。