SpringBoot接入最新版RocketMq-Spring2.2.0,消费者指定返回消息处理状态

SpringBoot接入最新版RocketMq-Spring2.2.0,消费者指定返回消息处理状态java

由于用的是RocketMq4.8.0,所以接入最新的rocketmq-springgit

首先引入依赖github

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

RocketMq-Spring提供了默认的rocket生产者,RocketMqTemplatespring

直接注入就能够直接使用,默认的是读取application.yml/properties文件里的rocketMq默认配置路径apache

@Autowired
private RocketMqTemplate rocketMqTemplate

默认配置属性以下app

rocketmq:
  name-server: localhost:9876
  producer:
    group: audit-group

也能够自定义RocketMqTemplate,这样就能够同时拥有多个不一样配置的生产者,自定义生产者很是简单,只须要直接继承RocketMqTemlate就能够了,而后从注解中配置属性:异步

@ExtRocketMQTemplateConfiguration(group = "audit-test",nameServer = "localhost:9876")
public class MyRocketMqTemplate extends RocketMQTemplate {
}

上面那个注解中能够配置不少属性,能够直接赋值读取,也能够用表达式好比${myrocket.nameserver}从yml/properties配置文件中读取,要用的时候直接注入这个类的对象就可使用了async

@Service
public class Producer {

    String topic="Topic-test";
    //若是须要标签过滤的话,topic能够是Topic:Tag的格式
    String topicWithTag="Topic-test:TagA";
    String msg="hi there!";

    @Autowired
    private MyRocketMqTemplate rocketMQTemplate;

    //同步发送消息
    public void sendSyncMsg(){
        rocketMQTemplate.syncSend(topic,msg);
    }
    
    //异步发送消息
    public void sendAsyncMsg(){
        rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                if(sendResult.getSendStatus()== SendStatus.SEND_OK){
                    System.out.println("send successful");
                }
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("send failed");
            }
        });
    }
    
    //发送顺序消息,最好对应的topic的写queue和读queue都设置为1
    public void sendMsgOrderly(){
        String hashKey="orderId:xxxx";
        //hashKey的做用是指定queue,万一topic存在多个queue,能够指定顺序消息生产在这个特色的queue上,好比用orderId指定
        rocketMQTemplate.syncSendOrderly(topic,msg,hashKey);
    }
}

再看看消费者的示例:ide

主要是@RocketMessageListener注解,在其中配置消费者的属性,consumeMode能够配置是顺序消费仍是普通消费spring-boot

@Service
@RocketMQMessageListener(topic = "audit-test",consumerGroup = "broker-a",consumeMode = ConsumeMode.ORDERLY)
public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println(messageExt.toString());
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    }
}

细心的读者应该能看到,这里的onMessage方法是void类型的,没有返回状态,与咱们平时用的不同,那若是消费失败,怎么返回RECONSUME_LATER的状态呢,github上官方是回复throw exception的时候会自动处理消息返回RECONSUME_LATER,可是目前在onMessage方法中是无法主动抛出异常的,后续我看官方怎么回复再更新