springboot+rabbitMq整合开发实战二:模拟用户下单的过程

springboot+rabbitMq整合开发实战二:模拟用户下单的过程html

    上一篇博客简单介绍了rabbitMQ的原理以及生产消费的过程,还介绍了一个采用“确认消费模式”的demo。这一篇博客,将介绍另一种消费模式 “直接消费”,并介绍一种在实际项目频繁使用的队列模式 “延迟队列”。java

     延迟队列,也叫“延时队列”,顾名思义,其实就是“生产者生产消息,消息进入队列以后,并不会当即被指定的消费者所消费,而是会延时一段指定的时间ttl,最终才被消费者消费”。mysql

     介绍了这个概念以后,咱们接下来实战一个在项目中常见的场景:“用户建立下单记录以后,会对其进行付款,付款成功以后,该条记录将变为已支付而且有效,不然的话,一旦过了指定的时间,即超时了,则该记录将置为无效,而且不能被用于后续的业务逻辑”。web

     对于这个场景,咱们就去掉了“付款的环节”,直接实现ttl一到,处理下单记录为无效。spring

     首先是项目的配置文件:sql

 

#profile
#spring.profiles.active=production
#spring.profiles.active=local
#spring.profiles.active=dev

server.port=8098
server.tomcat.accesslog.enabled=true
server.tomcat.accesslog.directory=log
server.tomcat.uri-encoding=UTF-8
logging.file=springbootMQ

spring.mvc.view.prefix=/WEB-INF/views/
spring.mvc.view.suffix=.jsp
multipart.max-request-size=20Mb
multipart.max-file-size=2Mb

logging.level.org.springframework = INFO
logging.level.com.fasterxml.jackson = INFO
logging.level.com.debug.steadyjack.springbootMQ = DEBUG

spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.date-format-exact=yyyy-MM-dd HH:mm:ss SSS
spring.jackson.time-zone=GMT+8

spring.datasource.initialize=false

datasource.url=jdbc:mysql://127.0.0.1:3306/db_springboot?characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
datasource.username=root
datasource.password=linsen

#mybatis
mybatis.config-location=classpath:mybatis-config.xml
mybatis.checkConfigLocation = true
mybatis.mapper-locations=classpath:mappers/*.xml


############################### rabbitmq ########################

spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.rabbitmq.listener.concurrency=5
spring.rabbitmq.listener.max-concurrency=10
spring.rabbitmq.listener.prefetch=1
spring.rabbitmq.listener.transaction-size=1

########################### queue 配置 ##########################

mq.env=test

register.exchange.name=${mq.env}.user.register.exchange
register.delay.queue.name=${mq.env}.user.register.delay.queue

register.delay.exchange.name=${mq.env}.user.register.delay.exchange
register.queue.name=${mq.env}.user.register.queue


#交易记录失效时间:10s
trade.record.ttl=10000

 

 

    另外,贴一下这个业务涉及的一张表的信息:数据库

 

CREATE TABLE `order_trade_record` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `customer_id` int(11) DEFAULT NULL COMMENT '客户id',
  `order_id` int(11) DEFAULT NULL COMMENT '订单id',
  `price` decimal(15,2) DEFAULT NULL COMMENT '收款金额',
  `status` int(11) DEFAULT '0' COMMENT '状态(0=未支付,1=已支付)',
  `create_time` datetime DEFAULT NULL COMMENT '建立时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='订单交易记录';

 

 

 

    生成的model以及mapper接口以及mapper配置文件以下:api

 

package com.debug.steadyjack.springbootMQ.model.entity;

import java.math.BigDecimal;
import java.util.Date;

public class OrderTradeRecord {
    private Integer id;

    private Integer customerId;

    private Integer orderId;

    private BigDecimal price;

    private Integer status=1;

    private Date createTime;

    private Date updateTime;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public Integer getCustomerId() {
        return customerId;
    }

    public void setCustomerId(Integer customerId) {
        this.customerId = customerId;
    }

    public Integer getOrderId() {
        return orderId;
    }

    public void setOrderId(Integer orderId) {
        this.orderId = orderId;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }

    @Override
    public String toString() {
        return "OrderTradeRecord{" +
                "id=" + id +
                ", customerId=" + customerId +
                ", orderId=" + orderId +
                ", price=" + price +
                ", status=" + status +
                ", createTime=" + createTime +
                ", updateTime=" + updateTime +
                '}';
    }
}

 

package com.debug.steadyjack.springbootMQ.model.mapper;

import com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord;

public interface OrderTradeRecordMapper {
    int deleteByPrimaryKey(Integer id);

    int insert(OrderTradeRecord record);

    int insertSelective(OrderTradeRecord record);

    OrderTradeRecord selectByPrimaryKey(Integer id);

    int updateByPrimaryKeySelective(OrderTradeRecord record);

    int updateByPrimaryKey(OrderTradeRecord record);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.debug.steadyjack.springbootMQ.model.mapper.OrderTradeRecordMapper" >
  <resultMap id="BaseResultMap" type="com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord" >
    <id column="id" property="id" jdbcType="INTEGER" />
    <result column="customer_id" property="customerId" jdbcType="INTEGER" />
    <result column="order_id" property="orderId" jdbcType="INTEGER" />
    <result column="price" property="price" jdbcType="DECIMAL" />
    <result column="status" property="status" jdbcType="INTEGER" />
    <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
    <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
  </resultMap>
  <sql id="Base_Column_List" >
    id, customer_id, order_id, price, status, create_time, update_time
  </sql>
  <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
    select 
    <include refid="Base_Column_List" />
    from order_trade_record
    where id = #{id,jdbcType=INTEGER}
  </select>
  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" >
    delete from order_trade_record
    where id = #{id,jdbcType=INTEGER}
  </delete>
  <insert id="insert" keyProperty="id" useGeneratedKeys="true" parameterType="com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord" >
    insert into order_trade_record (id, customer_id, order_id, 
      price, status, create_time, 
      update_time)
    values (#{id,jdbcType=INTEGER}, #{customerId,jdbcType=INTEGER}, #{orderId,jdbcType=INTEGER}, 
      #{price,jdbcType=DECIMAL}, #{status,jdbcType=INTEGER}, #{createTime,jdbcType=TIMESTAMP}, 
      #{updateTime,jdbcType=TIMESTAMP})
  </insert>
  <insert id="insertSelective" keyProperty="id" useGeneratedKeys="true" parameterType="com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord" >
    insert into order_trade_record
    <trim prefix="(" suffix=")" suffixOverrides="," >
      <if test="id != null" >
        id,
      </if>
      <if test="customerId != null" >
        customer_id,
      </if>
      <if test="orderId != null" >
        order_id,
      </if>
      <if test="price != null" >
        price,
      </if>
      <if test="status != null" >
        status,
      </if>
      <if test="createTime != null" >
        create_time,
      </if>
      <if test="updateTime != null" >
        update_time,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides="," >
      <if test="id != null" >
        #{id,jdbcType=INTEGER},
      </if>
      <if test="customerId != null" >
        #{customerId,jdbcType=INTEGER},
      </if>
      <if test="orderId != null" >
        #{orderId,jdbcType=INTEGER},
      </if>
      <if test="price != null" >
        #{price,jdbcType=DECIMAL},
      </if>
      <if test="status != null" >
        #{status,jdbcType=INTEGER},
      </if>
      <if test="createTime != null" >
        #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="updateTime != null" >
        #{updateTime,jdbcType=TIMESTAMP},
      </if>
    </trim>
  </insert>
  <update id="updateByPrimaryKeySelective" parameterType="com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord" >
    update order_trade_record
    <set >
      <if test="customerId != null" >
        customer_id = #{customerId,jdbcType=INTEGER},
      </if>
      <if test="orderId != null" >
        order_id = #{orderId,jdbcType=INTEGER},
      </if>
      <if test="price != null" >
        price = #{price,jdbcType=DECIMAL},
      </if>
      <if test="status != null" >
        status = #{status,jdbcType=INTEGER},
      </if>
      <if test="createTime != null" >
        create_time = #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="updateTime != null" >
        update_time = #{updateTime,jdbcType=TIMESTAMP},
      </if>
    </set>
    where id = #{id,jdbcType=INTEGER}
  </update>
  <update id="updateByPrimaryKey" parameterType="com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord" >
    update order_trade_record
    set customer_id = #{customerId,jdbcType=INTEGER},
      order_id = #{orderId,jdbcType=INTEGER},
      price = #{price,jdbcType=DECIMAL},
      status = #{status,jdbcType=INTEGER},
      create_time = #{createTime,jdbcType=TIMESTAMP},
      update_time = #{updateTime,jdbcType=TIMESTAMP}
    where id = #{id,jdbcType=INTEGER}
  </update>
</mapper>

 

 

 

 

 

  接下来,则是RabbitMq的全局配置加载文件,这个在实际项目中是很实用的(因此,诸位博友能够考虑直接拿在项目上使用):tomcat

 

package com.debug.steadyjack.springbootMQ.server.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import java.util.HashMap;
import java.util.Map;

/**
 * rabbitmq 配置
 * Created by steadyjack on 2017/12/01.
 */
@Configuration
public class RabbitmqConfig {

    private final static Logger log = LoggerFactory.getLogger("mqLog");

    @Autowired
    private Environment env;

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    /**延迟队列配置**/

    @Bean(name = "registerDelayQueue")
    public Queue registerDelayQueue(){
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange",env.getProperty("register.exchange.name"));
        params.put("x-dead-letter-routing-key","all");
        return new Queue(env.getProperty("register.delay.queue.name"), true,false,false,params);
    }

    @Bean
    public DirectExchange registerDelayExchange(){
        return new DirectExchange(env.getProperty("register.delay.exchange.name"));
    }

    @Bean
    public Binding registerDelayBinding(){
        return BindingBuilder.bind(registerDelayQueue()).to(registerDelayExchange()).with("");
    }

    /**延迟队列配置**/

    /**指标消费队列配置**/

    @Bean
    public TopicExchange registerTopicExchange(){
        return new TopicExchange(env.getProperty("register.exchange.name"));
    }

    @Bean
    public Binding registerBinding(){
        return BindingBuilder.bind(registerQueue()).to(registerTopicExchange()).with("all");
    }

    @Bean(name = "registerQueue")
    public Queue registerQueue(){
        return new Queue(env.getProperty("register.queue.name"),true);
    }

    /**指标消费队列配置**/

    /**
     * 单一消费者
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        return factory;
    }

    /**
     * 多个消费者
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        return factory;
    }


    @Bean
    public RabbitTemplate rabbitTemplate(){
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }

}

 

    上图,即配置了两个队列:做为缓冲使用的延迟队列,真正去消费消息的队列。流程图能够大概用下图表示:springboot

 

    紧接着,我写了个controller以及request和service:

 

package com.debug.steadyjack.springbootMQ.server.controller;

import com.debug.steadyjack.springbootMQ.api.enums.StatusCode;
import com.debug.steadyjack.springbootMQ.api.response.BaseResponse;
import com.debug.steadyjack.springbootMQ.server.request.OrderTradeRecordRequest;
import com.debug.steadyjack.springbootMQ.server.service.OrderTradeRecordService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.validation.Valid;

/**
 * 订单交易记录controller
 * Created by steadyjack on 2017/12/11.
 */
@RestController
public class OrderTradeRecordController {

    private static final Logger log= LoggerFactory.getLogger(OrderTradeRecordController.class);

    private static final String prefix="order/trade/record";

    @Autowired
    private OrderTradeRecordService orderTradeRecordService;

    /**
     * 建立用户下单记录
     * @param requestData
     * @param bindingResult
     * @return
     * @throws Exception
     */
    @RequestMapping(value = prefix+"/create",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public BaseResponse createRecord(@Valid @RequestBody OrderTradeRecordRequest requestData, BindingResult bindingResult) throws Exception{
        if (bindingResult.hasErrors()){
            return new BaseResponse(StatusCode.Invalid_Params);
        }
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            orderTradeRecordService.createTradeRecord(requestData);
        }catch (Exception e){
            log.error("用户下单记录异常:{} ",requestData,e.fillInStackTrace());
            return new BaseResponse(StatusCode.Fail);
        }
        return response;
    }


}

 

package com.debug.steadyjack.springbootMQ.server.request;

import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.math.BigDecimal;

/**
 * 订单交易记录request
 * Created by steadyjack on 2017/12/11.
 */
public class OrderTradeRecordRequest implements Serializable{

    @NotNull
    private Integer customerId;
    @NotNull
    private Integer orderId;
    @NotNull
    private BigDecimal price;

    private Integer status=0;


    public Integer getCustomerId() {
        return customerId;
    }

    public void setCustomerId(Integer customerId) {
        this.customerId = customerId;
    }

    public Integer getOrderId() {
        return orderId;
    }

    public void setOrderId(Integer orderId) {
        this.orderId = orderId;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    @Override
    public String toString() {
        return "OrderTradeRecordRequest{" +
                "customerId=" + customerId +
                ", orderId=" + orderId +
                ", price=" + price +
                ", status=" + status +
                '}';
    }
}

 

 

 

 

package com.debug.steadyjack.springbootMQ.server.service;

import com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord;
import com.debug.steadyjack.springbootMQ.model.entity.User;
import com.debug.steadyjack.springbootMQ.model.mapper.OrderTradeRecordMapper;
import com.debug.steadyjack.springbootMQ.server.request.OrderTradeRecordRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * Created by steadyjack on 2017/12/11.
 */
@Service
public class OrderTradeRecordService {

    private static final Logger log= LoggerFactory.getLogger(OrderTradeRecordService.class);

    @Autowired
    private Environment env;

    @Autowired
    public OrderTradeRecordMapper orderTradeRecordMapper;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void createTradeRecord(OrderTradeRecordRequest requestData) throws Exception{
        //TODO:其他业务逻辑上的校验。。

        //TODO:建立交易记录
        OrderTradeRecord record=new OrderTradeRecord();
        BeanUtils.copyProperties(requestData,record);
        record.setCreateTime(new Date());
        record.setStatus(1);
        orderTradeRecordMapper.insertSelective(record);

        //TODO:设置超时,用mq处理已超时的下单记录(一旦记录超时,则处理为无效)
        final Long ttl=env.getProperty("trade.record.ttl",Long.class);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setExchange(env.getProperty("register.delay.exchange.name"));
        rabbitTemplate.setRoutingKey("");
        rabbitTemplate.convertAndSend(record, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class.getName());
                message.getMessageProperties().setExpiration(ttl+"");
                return message;
            }
        });
    }

}


     最终,咱们固然是开发mq延迟队列对应的消费者:

 

 

package com.debug.steadyjack.springbootMQ.server.listener;

import com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord;
import com.debug.steadyjack.springbootMQ.model.entity.User;
import com.debug.steadyjack.springbootMQ.model.mapper.OrderTradeRecordMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.Objects;


@Component
public class RabbitMQListener {

    private final static Logger log= LoggerFactory.getLogger(RabbitMQListener.class);

    @Autowired
    private OrderTradeRecordMapper orderTradeRecordMapper;

    /*@RabbitListener(queues = "${register.queue.name}",containerFactory = "singleListenerContainer")
    public void test(@Payload User user){
        try {
            log.debug("消费者监听消费到消息: {} ",user);

        }catch (Exception e){
            log.error("消息体解析 发生异常; ",e.fillInStackTrace());
        }
    }*/

    //直接消费模式
    @RabbitListener(queues = "${register.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMessage(@Payload OrderTradeRecord record){
        try {
            log.debug("消费者监听交易记录信息: {} ",record);

            //TODO:表示已经到ttl了,却还没付款,则须要处理为失效
            if (Objects.equals(1,record.getStatus())){
                record.setStatus(0);
                record.setUpdateTime(new Date());
                orderTradeRecordMapper.updateByPrimaryKeySelective(record);
            }
        }catch (Exception e){
            log.error("消息体解析 发生异常; ",e.fillInStackTrace());
        }
    }

}


 其余像Log4j的配置等就不贴出来了。下面看看效果

 

 首先,run项目以后,能够在mq后台看到两条队列:

  而后,在controller发起一条消息(至关于生产者):

 

 最后看idea控制台以及mq控制台会发现这样一个现象:首先是延迟队列消费了消息,在数据库插入一条记录;而后在到达ttl后(这里是10s)进行转发到实际的消费队列:

 

 

 

 

 

 好了,对于本篇博客,若是有相关疑问,能够留言,或者加入群讨论:java开源技术交流:583522159。我叫debug,我的QQ:1948831260

 另外,诸位 如有兴趣,能够关注个人公众号,后续会分享干货在上面哦!