SpringBoot接入最新版RocketMq-Spring2.2.0,消费者指定返回消息处理状态java
由于用的是RocketMq4.8.0,所以接入最新的rocketmq-springgit
首先引入依赖github
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>
RocketMq-Spring提供了默认的rocket生产者,RocketMqTemplatespring
直接注入就能够直接使用,默认的是读取application.yml/properties文件里的rocketMq默认配置路径apache
@Autowired private RocketMqTemplate rocketMqTemplate
默认配置属性以下app
rocketmq: name-server: localhost:9876 producer: group: audit-group
也能够自定义RocketMqTemplate,这样就能够同时拥有多个不一样配置的生产者,自定义生产者很是简单,只须要直接继承RocketMqTemlate就能够了,而后从注解中配置属性:异步
@ExtRocketMQTemplateConfiguration(group = "audit-test",nameServer = "localhost:9876") public class MyRocketMqTemplate extends RocketMQTemplate { }
上面那个注解中能够配置不少属性,能够直接赋值读取,也能够用表达式好比${myrocket.nameserver}从yml/properties配置文件中读取,要用的时候直接注入这个类的对象就可使用了async
@Service public class Producer { String topic="Topic-test"; //若是须要标签过滤的话,topic能够是Topic:Tag的格式 String topicWithTag="Topic-test:TagA"; String msg="hi there!"; @Autowired private MyRocketMqTemplate rocketMQTemplate; //同步发送消息 public void sendSyncMsg(){ rocketMQTemplate.syncSend(topic,msg); } //异步发送消息 public void sendAsyncMsg(){ rocketMQTemplate.asyncSend(topic, msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { if(sendResult.getSendStatus()== SendStatus.SEND_OK){ System.out.println("send successful"); } } @Override public void onException(Throwable throwable) { System.out.println("send failed"); } }); } //发送顺序消息,最好对应的topic的写queue和读queue都设置为1 public void sendMsgOrderly(){ String hashKey="orderId:xxxx"; //hashKey的做用是指定queue,万一topic存在多个queue,能够指定顺序消息生产在这个特色的queue上,好比用orderId指定 rocketMQTemplate.syncSendOrderly(topic,msg,hashKey); } }
再看看消费者的示例:ide
主要是@RocketMessageListener注解,在其中配置消费者的属性,consumeMode能够配置是顺序消费仍是普通消费spring-boot
@Service @RocketMQMessageListener(topic = "audit-test",consumerGroup = "broker-a",consumeMode = ConsumeMode.ORDERLY) public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(MessageExt messageExt) { System.out.println(messageExt.toString()); } @Override public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) { defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis())); } }
细心的读者应该能看到,这里的onMessage方法是void类型的,没有返回状态,与咱们平时用的不同,那若是消费失败,怎么返回RECONSUME_LATER的状态呢,github上官方是回复throw exception的时候会自动处理消息返回RECONSUME_LATER,可是目前在onMessage方法中是无法主动抛出异常的,后续我看官方怎么回复再更新