RocketMQ 入门实战(4)--Java 操作 RocketMQ

news/发布时间2024/7/15 3:03:09

本文主要介绍使用 Java 来操作 RocketMQ,文中所使用到的软件版本:Java 1.8.0_341、RocketMQ 5.1.3、rocketmq-client-java 5.0.5。

1、引入依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.5</version>
</dependency>

2、Java 操作 RocketMQ

2.1、创建主题

bin/mqadmin updateTopic -n 10.49.196.33:9876 -t NORMAL_TOPIC -c DefaultCluster -a +message.type=NORMAL #普通消息
bin/mqadmin updateTopic -n 10.49.196.33:9876 -t DELAY_TOPIC -c DefaultCluster -a +message.type=DELAY #定时/延时消息
bin/mqadmin updateTopic -n 10.49.196.33:9876 -t FIFO_TOPIC -c DefaultCluster -a +message.type=FIFO #顺序消息
bin/mqadmin updateTopic -n 10.49.196.33:9876 -t TRANSACTION_TOPIC -c DefaultCluster -a +message.type=TRANSACTION #事务消息

2.2、生产者

2.1.1、普通消息

A、同步发送

@Test
public void normal() throws ClientException, IOException {ClientServiceProvider provider = ClientServiceProvider.loadService();//SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider("RocketMQ", "12345678");ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints)//.setCredentialProvider(sessionCredentialsProvider)
            .build();String topic = "NORMAL_TOPIC";Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey", "messageKey2").setTag("messageTag").setBody("normalMessage".getBytes()).build();SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());producer.close();
}

B、异步发送

@Test
public void normalAsync() throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();String topic = "NORMAL_TOPIC";Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey", "messageKey2").setTag("messageTag").setBody("normalMessage".getBytes()).build();CompletableFuture<SendReceipt> sendReceiptCompletableFuture = producer.sendAsync(message);sendReceiptCompletableFuture.whenComplete(new BiConsumer<SendReceipt, Throwable>() {@Overridepublic void accept(SendReceipt sendReceipt, Throwable throwable) {log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());}});Thread.sleep(Long.MAX_VALUE);
}

2.1.2、定时/延时消息

@Test
public void delay() throws ClientException, IOException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();String topic = "DELAY_TOPIC";Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build();Long deliverTimeStamp = System.currentTimeMillis() + 1 * 60 * 1000;Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey", "messageKey2").setTag("messageTag").setBody(("delayMessage-" + LocalDateTime.now()).getBytes()).setDeliveryTimestamp(deliverTimeStamp).build();SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());producer.close();
}

2.1.3、顺序消息

@Test
public void fifo() throws ClientException, IOException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();String topic = "FIFO_TOPIC";Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey", "messageKey2").setTag("messageTag").setBody(("fifoMessage").getBytes()).setMessageGroup("fifoGroup").build();SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());producer.close();
}

2.1.4、事务消息

@Test
public void transaction() throws ClientException, IOException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();String topic = "TRANSACTION_TOPIC";Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).setTransactionChecker(new TransactionChecker() {@Overridepublic TransactionResolution check(MessageView messageView) {TransactionResolution result = TransactionResolution.COMMIT;//TODO:检查业务是否正常处理,如果失败则 result = TransactionResolution.ROLLBACKreturn result;}}).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey", "messageKey2").setTag("messageTag").setBody(("transactionMessage").getBytes()).build();Transaction transaction = producer.beginTransaction();try {SendReceipt sendReceipt = producer.send(message, transaction);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());//TODO:业务处理
transaction.commit();} catch (Exception e) {transaction.rollback();}producer.close();
}

2.3、消费者

2.3.1、PushConsumer

@Test
public void pushConsumer() throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);Map<String, FilterExpression> subscriptionExpressions = new HashMap<>();subscriptionExpressions.put("NORMAL_TOPIC", filterExpression);subscriptionExpressions.put("DELAY_TOPIC", filterExpression);subscriptionExpressions.put("FIFO_TOPIC", filterExpression);subscriptionExpressions.put("TRANSACTION_TOPIC", filterExpression);PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(group).setSubscriptionExpressions(subscriptionExpressions).setMessageListener(messageView -> {log.info("接受到消息:messageId={},body={}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody()));return ConsumeResult.SUCCESS;}).build();log.info("开始接受消息...");Thread.sleep(Long.MAX_VALUE);
}

2.3.2、SimpleConsumer

A、同步订阅

@Test
public void simpleConsumer() throws ClientException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);Map<String, FilterExpression> subscriptionExpressions = new HashMap<>();subscriptionExpressions.put("NORMAL_TOPIC", filterExpression);subscriptionExpressions.put("DELAY_TOPIC", filterExpression);subscriptionExpressions.put("FIFO_TOPIC", filterExpression);subscriptionExpressions.put("TRANSACTION_TOPIC", filterExpression);SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(group).setSubscriptionExpressions(subscriptionExpressions).setAwaitDuration(Duration.ofSeconds(30)).build();log.info("开始接受消息...");while (true) {List<MessageView> messageViews = simpleConsumer.receive(10, Duration.ofSeconds(30));for (MessageView messageView : messageViews) {log.info("接受到消息:messageId={},body={}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody()));simpleConsumer.ack(messageView);}}
}

B、异步订阅

@Test
public void simpleConsumerAsync() throws ClientException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);Map<String, FilterExpression> subscriptionExpressions = new HashMap<>();subscriptionExpressions.put("NORMAL_TOPIC", filterExpression);subscriptionExpressions.put("DELAY_TOPIC", filterExpression);subscriptionExpressions.put("FIFO_TOPIC", filterExpression);subscriptionExpressions.put("TRANSACTION_TOPIC", filterExpression);SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(group).setSubscriptionExpressions(subscriptionExpressions).setAwaitDuration(Duration.ofSeconds(30)).build();log.info("开始接受消息...");while (true) {CompletableFuture<List<MessageView>> future = simpleConsumer.receiveAsync(10, Duration.ofSeconds(30));future.whenCompleteAsync((messageViews, throwable) -> {if (throwable != null) {log.error("Failed to receive message", throwable);return;}for (MessageView messageView : messageViews) {log.info("接受到消息:messageId={},body={}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody()));CompletableFuture<Void> completableFuture = simpleConsumer.ackAsync(messageView);completableFuture.whenCompleteAsync(new BiConsumer<Void, Throwable>() {@Overridepublic void accept(Void unused, Throwable throwable) {if (null != throwable) {log.error("Message is failed to be acknowledged, messageId={}", messageView.getMessageId(), throwable);return;}log.info("Message is acknowledged successfully, messageId={}", messageView.getMessageId());}});}});}
}

异步订阅会报错:DEADLINE_EXCEEDED: deadline exceeded after 32.999993800s;可能是 RocketMQ 的 bug。

2.4、完整代码

2.4.1、生产者

package com.abc.demo.rocketmq;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.apis.producer.*;
import org.junit.Test;import java.io.IOException;
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;@Slf4j
public class ProducerCase {private static final String endpoints = "10.49.196.33:8081";@Testpublic void normal() throws ClientException, IOException {ClientServiceProvider provider = ClientServiceProvider.loadService();//SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider("RocketMQ", "12345678");ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints)//.setCredentialProvider(sessionCredentialsProvider)
                .build();String topic = "NORMAL_TOPIC";Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey", "messageKey2").setTag("messageTag").setBody("normalMessage".getBytes()).build();SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());producer.close();}@Testpublic void normalAsync() throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();String topic = "NORMAL_TOPIC";Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey", "messageKey2").setTag("messageTag").setBody("normalMessage".getBytes()).build();CompletableFuture<SendReceipt> sendReceiptCompletableFuture = producer.sendAsync(message);sendReceiptCompletableFuture.whenComplete(new BiConsumer<SendReceipt, Throwable>() {@Overridepublic void accept(SendReceipt sendReceipt, Throwable throwable) {log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());}});Thread.sleep(Long.MAX_VALUE);}@Testpublic void delay() throws ClientException, IOException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();String topic = "DELAY_TOPIC";Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build();Long deliverTimeStamp = System.currentTimeMillis() + 1 * 60 * 1000;Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey", "messageKey2").setTag("messageTag").setBody(("delayMessage-" + LocalDateTime.now()).getBytes()).setDeliveryTimestamp(deliverTimeStamp).build();SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());producer.close();}@Testpublic void fifo() throws ClientException, IOException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();String topic = "FIFO_TOPIC";Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey", "messageKey2").setTag("messageTag").setBody(("fifoMessage").getBytes()).setMessageGroup("fifoGroup").build();SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());producer.close();}@Testpublic void transaction() throws ClientException, IOException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();String topic = "TRANSACTION_TOPIC";Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).setTransactionChecker(new TransactionChecker() {@Overridepublic TransactionResolution check(MessageView messageView) {TransactionResolution result = TransactionResolution.COMMIT;//TODO:检查业务是否正常处理,如果失败则 result = TransactionResolution.ROLLBACKreturn result;}}).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey", "messageKey2").setTag("messageTag").setBody(("transactionMessage").getBytes()).build();Transaction transaction = producer.beginTransaction();try {SendReceipt sendReceipt = producer.send(message, transaction);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());//TODO:业务处理
transaction.commit();} catch (Exception e) {transaction.rollback();}producer.close();}
}
ProducerCase

2.4.2、消费者

package com.abc.demo.rocketmq;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.*;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.junit.Test;import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;@Slf4j
public class ConsumerCase {private static final String endpoints = "10.49.196.33:8081";private static final String group = "myGroup";@Testpublic void pushConsumer() throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);Map<String, FilterExpression> subscriptionExpressions = new HashMap<>();subscriptionExpressions.put("NORMAL_TOPIC", filterExpression);subscriptionExpressions.put("DELAY_TOPIC", filterExpression);subscriptionExpressions.put("FIFO_TOPIC", filterExpression);subscriptionExpressions.put("TRANSACTION_TOPIC", filterExpression);PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(group).setSubscriptionExpressions(subscriptionExpressions).setMessageListener(messageView -> {log.info("接受到消息:messageId={},body={}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody()));return ConsumeResult.SUCCESS;}).build();log.info("开始接受消息...");Thread.sleep(Long.MAX_VALUE);}@Testpublic void simpleConsumer() throws ClientException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);Map<String, FilterExpression> subscriptionExpressions = new HashMap<>();subscriptionExpressions.put("NORMAL_TOPIC", filterExpression);subscriptionExpressions.put("DELAY_TOPIC", filterExpression);subscriptionExpressions.put("FIFO_TOPIC", filterExpression);subscriptionExpressions.put("TRANSACTION_TOPIC", filterExpression);SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(group).setSubscriptionExpressions(subscriptionExpressions).setAwaitDuration(Duration.ofSeconds(30)).build();log.info("开始接受消息...");while (true) {List<MessageView> messageViews = simpleConsumer.receive(10, Duration.ofSeconds(30));for (MessageView messageView : messageViews) {log.info("接受到消息:messageId={},body={}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody()));simpleConsumer.ack(messageView);}}}/*** 该写法会报错:DEADLINE_EXCEEDED: deadline exceeded after 32.999993800s,可能是 RocketMQ 的 bug*/@Testpublic void simpleConsumerAsync() throws ClientException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);Map<String, FilterExpression> subscriptionExpressions = new HashMap<>();subscriptionExpressions.put("NORMAL_TOPIC", filterExpression);subscriptionExpressions.put("DELAY_TOPIC", filterExpression);subscriptionExpressions.put("FIFO_TOPIC", filterExpression);subscriptionExpressions.put("TRANSACTION_TOPIC", filterExpression);SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(group).setSubscriptionExpressions(subscriptionExpressions).setAwaitDuration(Duration.ofSeconds(30)).build();log.info("开始接受消息...");while (true) {CompletableFuture<List<MessageView>> future = simpleConsumer.receiveAsync(10, Duration.ofSeconds(30));future.whenCompleteAsync((messageViews, throwable) -> {if (throwable != null) {log.error("Failed to receive message", throwable);return;}for (MessageView messageView : messageViews) {log.info("接受到消息:messageId={},body={}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody()));CompletableFuture<Void> completableFuture = simpleConsumer.ackAsync(messageView);completableFuture.whenCompleteAsync(new BiConsumer<Void, Throwable>() {@Overridepublic void accept(Void unused, Throwable throwable) {if (null != throwable) {log.error("Message is failed to be acknowledged, messageId={}", messageView.getMessageId(), throwable);return;}log.info("Message is acknowledged successfully, messageId={}", messageView.getMessageId());}});}});}}
}
ConsumerCase

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.jwkm.cn/p/42125134.html

如若内容造成侵权/违法违规/事实不符,请联系宁远站长网进行投诉反馈email:xxxxxxxx@qq.com,一经查实,立即删除!

相关文章

新能源汽车adas数字化闭环系统的几点思考

数据闭环已经被大多数自动驾驶公司认为是提升自动驾驶能力的必经之路。主机厂,配置了自动驾驶硬件的车队采集通过规则及影子模式下的触发器筛选的数据,经过语义筛选后的数据被回传到云端。此后,工程师在云端用工具对数据做一些处理,再把处理好的数据放入数据集群,然后利用…

查找结果数量不确定

问题1:根据E、F的条件返回对应C列的结果 函数公式解决:=INDEX(FILTER(C$2:C$8,(A$2:A$8=E2)*(B$2:B$8=F2)),COUNTIFS(E$2:E2,E2,F$2:F2,F2))问题2:根据E、F的条件返回对应C列的结果,要求结果列纵向排列。 分析:满足条件1和条件2分别是甲和A的结果有多个,导致最终结果与E、…

米联客MLK-CM02-2CG-3EG-4EV-AMD MPSOC核心模块硬件手册

1 产品概述 MLK-CM02-2CG-3EG-4EV(MILIANKE-8X)是米联客电子Zynq UltraScale+ MPSOC系列开发平台的全新高端产品。其核心模块集成电源管理:0.85V核心电源,最大输出12A。用户基于核心模块设计功能底板(提供功能底板设计方案)。降低项目功能底板设计难度和生产成本,加速项…

Linux操作系统的安装

1.1操作系统的安装 1.1.1准备Linux操作系统安装文件 VMware-workstation下载链接:Download VMware Workstation Pro 镜像文件下载链接:http://mirrors.njupt.edu.cn/centos/7.9.2009/isos/x86_64/CentOS-7-x86_64-DVD-2009.iso 1.1.2配置虚拟机 1)打开VMware Workstation ,…

Wordpress安装主题及CSV文件的导入、导出、更新

Wordpress安装主题及CSV文件的导入、导出、更新安装主题外观->主题->安装主题->上传主题->立即安装->完成主题->启用即可导出数据A站点->商品管理->产品导出->生成CSV得到CSV表格文件导入数据B站点->插件->导入选择产品->选择导入方式->…

N80-第7期作业-N80042

总结pg和mysql的优劣势。 PostgreSQL相对于MySQL的优势 PostgreSQL遵循BSD协议,这意味着使用 PostgreSQL 无任何限制。 可支持 C、C++、Java、PHP、Python 及 Perl 等,使您的业务开发更简单更易用。 PostgreSQL 是架构、语法、数据类型等与 Oracle 最接近的开源数据库。 兼容…