本文提供使用 Java SDK 收发事务消息的示例代码供您参考。
您已完成准备工作。
发送事务消息包含以下两个步骤:
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQTransactionProducer { //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID, 以及替换RocketMQ实例的AccessKey ID和AccessKey Secret。 private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY")); } public static void main(String[] args) throws MQClientException { /** * 创建事务消息Producer * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.volces.com.com:9876” */ TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook()); transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT"); //如果 SSL 认证策略设置为仅SSL连接,则必须设置setUseTLS(true)。 //transactionMQProducer.setUseTLS(true) transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl()); transactionMQProducer.start(); for (int i = 0; i < 10; i++) { try { Message message = new Message("YOUR TRANSACTION TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() { @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { System.out.println("开始执行本地事务: " + msg); return LocalTransactionState.UNKNOW; } }, null); assert sendResult != null; } catch (Exception e) { e.printStackTrace(); } } } }
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.common.message.MessageExt; /** * RocketMQ发送事务消息本地Check接口实现类 */ public class LocalTransactionCheckerImpl implements TransactionCheckListener { /** * 本地事务Checker */ @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.println("收到事务消息的回查请求, MsgId: " + msg.getMsgId()); return LocalTransactionState.COMMIT_MESSAGE; } }
事务回查的整体流程使用两阶段提交的方式。
LocalTransactionState.UNKNOW
,或者业务逻辑中的事务触发了回滚逻辑时,需要将事务结果告知 RocketMQ 服务端。因此服务端需要知道之前投递的半事务消息是否提交或者回滚,定期触发事务结果回调函数来获取这部分的状态信息。事务消息的订阅方式与普通消息一致,示例代码如下所示。
import java.util.List; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; public class RocketMQPushConsumer { //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为RocketMQ实例的AccessKey ID和AccessKey Secret。 private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY")); } public static void main(String[] args) throws MQClientException { // 设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID, 以及替换为您RocketMQ实例的AccessKey ID和AccessKey Secret。 // 设置为火山引擎消息队列 RocketMQ版实例的接入点。 // 设置为您在火山引擎消息队列 RocketMQ版控制台上创建的Topic。 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely()); consumer.setNamesrvAddr("http://{INSTANCE_ID}.rocketmq.volces.com.com:9876"); consumer.subscribe("YOUR TOPIC", "*"); //如果 SSL 认证策略设置为仅SSL连接,则必须设置setUseTLS(true) //consumer.setUseTLS(true); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("Receive New Messages: %s %n", msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }