消息中间件之RabbitMQ初识

做者:threedayman 恒生LIGHT云社区html

RabbitMQ是什么

RabbitMQ是部署最普遍的开源消息代理。RabbitMQ有轻量级且易部署的特色。支持多种消息协议。java

为何使用RabbitMQ

常见的使用场景有解耦、异步、削峰填谷。下面咱们经过例子来感觉下各自场景下使用MQ带来的效益。mysql

解耦git

假设有系统A,依赖系统B、系统C、系统D,依赖关系在代码中已经写死,结构以下图。程序员

1621939617(1).png

假设此时又来了一个新需求,系统A须要调用系统E进行一些新的业务操做,那么系统A的程序员又免不了一顿操做,处理接入系统E的需求。同理若是要去掉某个系统的依赖好比系统C,也须要系统A负责的开发进行处理。github

那么此时咱们若是引入了MQ来看看会带来什么样的变化。sql

image-20210525185231787.png

系统A发送消息到MQ,系统B、C、D订阅对应的消息进行业务处理。那么咱们再来看看以前的场景,假设须要增长一个依赖系统E,只须要系统E的开发人员进行对应的订阅消费便可,同理若是要取消系统C的依赖,只须要系统C取消订阅对应的消息。数据库

异步app

假设系统A操做耗时30ms,系统A还将同步调用系统B(300ms)、系统C(600ms)、系统D(200ms)那么这个请求的响应时间将会达到1130ms。过长的响应时间会给客户带来很差的用户体验。异步

1621940629(1).png

引入MQ以后咱们看看会发生什么变化

image-20210525190839346.png

系统A将消息发送给MQ(7ms)以后就返回,系统B、C、D分别监听MQ进行业务处理。那么咱们看到针对刚才长耗时的同步依赖,引入MQ进行异步处理后,整体的响应时间从1130ms降到了37ms。

削峰填谷

假设咱们有个业务高峰期的请求量可以到达7000 /s而业务低谷流量只有100/s,可是咱们的mysql数据库只能承受2000/s的请求。

1621941575(1).png

在这种状况下会致使在高峰期超过了mqsql最高的负载能力而直接打挂,而低峰期没有将mqsql的资源合理利用起来。

引入MQ以后咱们看看会发生什么变化

image-20210525192439287.png

此时系统能够按照本身最大的消费能力2000/s去拉取消息,能够平稳度过业务高峰期,同时将一部分消息延迟到业务低谷时期进行处理。不至于出现因为高流量致使数据库被打挂,出现总体服务不可用的现象。

怎样使用RabbitMQ

本小节主要针对RabbitMQ的java客户端编写的几个经常使用的例子,若是您对使用RabbitMQ已熟练掌握,可跳过本小节。查看完整的RabbitMQ使用说明,请访问官方文档

Hello world

咱们经过一个Hello world 的例子来感觉下RabbitMQ。首先介绍下本例中使用到的术语

  • Producer:生产者,用来发送消息。
  • Queue:消息队列,用于存储消息,消息经由生产者投递到消息队列,最终被投递到消费者进行消费,消息队列收到机器内存和硬盘资源的限制。
  • Consumer:消费者,用于接收并处理消息。

本例中咱们咱们将生产Hello World的消息,经过消费者接受并打印出消息。

1621942986(1).png

生产者Send 关键步骤见注释说明

public class Send {
	//队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //建立和server之间的链接 connection、channel
        ConnectionFactory factory = new ConnectionFactory();
        //请设置实际部署节点ip
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //声明一个queue去发送消息
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            //发布消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

完整Send代码查阅

消费者Recv 关键步骤见注释说明

public class Recv {
	//队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //建立和server之间的链接 connection、channel
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
		//声明要去消费的队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		//经过该类来处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

完整Recv代码查阅

Work Queues

在本例中咱们将介绍经过RabbitMQ分发耗时任务给多个工做者。RabbitMQ会经过轮询(round-robin)的方式将消息投递给消费者,这是的咱们可以很容易的扩展消费能力。

1621944984(1).png

生产者NewTask 关键步骤见注释说明

public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //将队列设置成持久化
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

            String message = String.join(" ", argv);
			//将消息设置成持久化
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

}

完整NewTask代码查阅

消费者Woker 关键步骤见注释说明

public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
		//将队列设置成持久化
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		//一个消费者最多同时处理一个未确认的消息
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
	//模拟耗时任务,一个.表明耗时1S
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

完整Worker代码查阅

Publish/Subscribe

上面咱们已经介绍过RabbitMQ的核心消息模型,生产者、消费者、队列,在本小节咱们将接触到另外一个消息模型exchange** ,它负责从生产者中接收消息,并把消息投递到队列中。exchage主要有如下几种类型**

  • direct
  • topic
  • headers
  • fanout

本例中咱们将已fanout类型做为讲解,经过名称咱们大概也能猜到此类型exchange会广播接收到的消息到其绑定的队列中。

1622082071(1).jpg

生产者EmitLog 关键步骤见注释说明

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //建立一个exchange 并指定类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = argv.length < 1 ? "info: Hello World!" :
                    String.join(" ", argv);
			//此处和以前发消息不同,指定具体的exchange没有指定具体的queue
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

}

EmitLog完整代码查阅

消费者ReceiveLogs 关键步骤见注释说明

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
		//建立fanout类型的exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //获取一个独有的,非持久化的,自动删除的队列
        String queueName = channel.queueDeclare().getQueue();
        //经过绑定方法将exchage和queue之间简历关系
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

ReceiveLogs完整代码查阅

Routing

上一个例子中exchange将接收到的信息广播给了绑定的队列中,本例中咱们将增长绑定的一些特定,使exchange有能力经过routingKey(全匹配)来投递不一样的消息到不一样的队列中。例如平常日志区分error日志进单独的队列。

image-20210526095222998.png

生产者EmitLogDirect 关键步骤见注释说明

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //声明一个direct类型的exchange
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            String severity = getSeverity(argv);
            String message = getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        }
    }

    private static String getSeverity(String[] strings) {
        if (strings.length < 1)
            return "info";
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0) return "";
        if (length <= startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

EmitLogDirect完整代码查阅

消费者ReceiveLogsDirect 关键步骤见注释说明

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for (String severity : argv) {
            //创建exchange和queue之间关系并设置routingKey
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

ReceiveLogsDirect完整代码查阅

Topics

提供更丰富的exchange到queue之间的路由规则。规则经过.分隔的routingKey,最高限制 255bytes。跟以前的全匹配routingKey不一样,topic类型的exchange的routingKey主要增长了两个特性。

  • *表明一个单词**。**
  • **#** 表明0个或一个单词。

image-20210526101017541.png

生产者EmitLogTopic 关键步骤见注释说明

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String routingKey = getRouting(argv);
            String message = getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }

    private static String getRouting(String[] strings) {
        if (strings.length < 1)
            return "anonymous.info";
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0) return "";
        if (length < startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

EmitLogTopic完整代码查阅

消费者ReceiveLogsTopic 关键步骤见注释说明

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for (String bindingKey : argv) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

ReceiveLogsTopic完整代码查阅

引入RabbitMQ带来什么挑战

看到这,各位看官是否是越越欲试想在项目中引入RabbitMQ去优化如今的使用场景,那么是否是咱们部署一个RabbitMQ服务,而后发送消息就高枕无忧了呢?其实在引入一个中间件时,同时伴随着一些问题,若是咱们对这些问题了解不够深刻或者全面,那恭喜你将进入挖坑选手序列。为了成为一个靠谱的程序员,咱们要充分了解引入中间件给咱们 项目带来的挑战,才能在以后的应用上从容应对。下面列了下消息中间件中常见的几类问题

  • 消息丢失
  • 消息重复
  • 消息堆积
  • RabbitMQ的可用性保证

以后的文章,咱们将逐个去讲解上述问题的解决方案。 下一讲:RabbitMQ消息可靠性传输

参考文档

https://www.rabbitmq.com/ RabbitMQ官方文档

tips:做者我的经验有限,不足之处烦请指正。