在本教程中,您将学习如何使用JAVA开源SDK在火山消息队列 RocketMQ版上生成消息轨迹。
预计部署时间:30分钟
级别:初级
相关产品:消息队列 RocketMQ版 云服务器
受众: 通用
进入在控制台创建RocketMQ实例,并配置Topic、Group、以及秘钥,详见RocketMQ创建文档
<?xml version= 1.0 encoding= UTF-8 ?> <project xmlns= http://maven.apache.org/POM/4.0.0 xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance xsi:schemaLocation= http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd > <modelVersion>4.0.0</modelVersion> <groupId>com.alibaba.ons</groupId> <artifactId>apache-rocketmq-demo</artifactId> <name>apache-rocketmq-demo</name> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!-- compiler settings properties --> <java_source_version>1.6</java_source_version> <java_target_version>1.6</java_target_version> <file_encoding>UTF-8</file_encoding> </properties> <dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.8.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.8.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>${java_source_version}</source> <target>${java_target_version}</target> <encoding>${file_encoding}</encoding> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> </configuration> </plugin> </plugins> </build> </project>
/** * <p> * Licensed under the Apache License, Version 2.0 (the License ); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an AS IS BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.bytedance.demo; /** * MQ 配置 */ public class MqConfig { /** * * 启动测试之前请替换如下 XXX 为您的配置 * */ public static final String TOPIC = lxb ; public static final String GROUP_ID = GID_lxb ; public static final String ORDER_TOPIC = lxb ; public static final String ORDER_GROUP_ID = GID_lxb ; public static final String ACCESS_KEY = ZNmfp17*****LYaXDguy ; public static final String SECRET_KEY = qO45DwJ******crPaUOTsF ; public static final String TAG = mq_test_tag ; /** * * https://console.volcengine.com/rocketmq/region:rocketmq+cn-beijing/instance 通过 实例概览--服务访问--TCP协议接入点 获取 * */ public static final String NAMESRV_ADDR = http://MQ_INST_******a25tr_mrecx.rocketmq.ivolces.com:9876 ; }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the License ); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an AS IS BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.bytedance.demo.producer; import com.bytedance.demo.MqConfig; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQProducer { private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(MqConfig.ACCESS_KEY, MqConfig.SECRET_KEY)); } public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer(MqConfig.GROUP_ID, getAclRPCHook(), true, RMQ_SYS_TRACE_TOPIC ); producer.setNamesrvAddr(MqConfig.NAMESRV_ADDR); producer.start(); for (int i = 0; i < 3; i++) { try { Message msg = new Message(MqConfig.TOPIC, MqConfig.TAG, Hello Bytedance .getBytes()); SendResult sendResult = producer.send(msg); System.out.printf(sendResult); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the License ); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an AS IS BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.bytedance.demo.consumer; import com.bytedance.demo.MqConfig; import java.util.List; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; public class RocketMQConsumer { private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(MqConfig.ACCESS_KEY, MqConfig.SECRET_KEY)); } public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqConfig.GROUP_ID, getAclRPCHook(), new AllocateMessageQueueAveragely(), true, RMQ_SYS_TRACE_TOPIC ); ::wq:: consumer.setNamesrvAddr(MqConfig.NAMESRV_ADDR); consumer.subscribe(MqConfig.TOPIC, * ); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf( %s Receive New Messages: %s %n , Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf( Consumer Started.%n ); } }