做者:threedayman 恒生LIGHT云社区html
RabbitMQ是什么
RabbitMQ是部署最普遍的开源消息代理。RabbitMQ有轻量级且易部署的特色。支持多种消息协议。java
为何使用RabbitMQ
常见的使用场景有解耦、异步、削峰填谷。下面咱们经过例子来感觉下各自场景下使用MQ带来的效益。mysql
解耦git
假设有系统A,依赖系统B、系统C、系统D,依赖关系在代码中已经写死,结构以下图。程序员
假设此时又来了一个新需求,系统A须要调用系统E进行一些新的业务操做,那么系统A的程序员又免不了一顿操做,处理接入系统E的需求。同理若是要去掉某个系统的依赖好比系统C,也须要系统A负责的开发进行处理。github
那么此时咱们若是引入了MQ来看看会带来什么样的变化。sql
系统A发送消息到MQ,系统B、C、D订阅对应的消息进行业务处理。那么咱们再来看看以前的场景,假设须要增长一个依赖系统E,只须要系统E的开发人员进行对应的订阅消费便可,同理若是要取消系统C的依赖,只须要系统C取消订阅对应的消息。数据库
异步app
假设系统A操做耗时30ms,系统A还将同步调用系统B(300ms)、系统C(600ms)、系统D(200ms)那么这个请求的响应时间将会达到1130ms。过长的响应时间会给客户带来很差的用户体验。异步
引入MQ以后咱们看看会发生什么变化
系统A将消息发送给MQ(7ms)以后就返回,系统B、C、D分别监听MQ进行业务处理。那么咱们看到针对刚才长耗时的同步依赖,引入MQ进行异步处理后,整体的响应时间从1130ms降到了37ms。
削峰填谷
假设咱们有个业务高峰期的请求量可以到达7000 /s而业务低谷流量只有100/s,可是咱们的mysql数据库只能承受2000/s的请求。
在这种状况下会致使在高峰期超过了mqsql最高的负载能力而直接打挂,而低峰期没有将mqsql的资源合理利用起来。
引入MQ以后咱们看看会发生什么变化
此时系统能够按照本身最大的消费能力2000/s去拉取消息,能够平稳度过业务高峰期,同时将一部分消息延迟到业务低谷时期进行处理。不至于出现因为高流量致使数据库被打挂,出现总体服务不可用的现象。
怎样使用RabbitMQ
本小节主要针对RabbitMQ的java客户端编写的几个经常使用的例子,若是您对使用RabbitMQ已熟练掌握,可跳过本小节。查看完整的RabbitMQ使用说明,请访问官方文档。
Hello world
咱们经过一个Hello world 的例子来感觉下RabbitMQ。首先介绍下本例中使用到的术语
- Producer:生产者,用来发送消息。
- Queue:消息队列,用于存储消息,消息经由生产者投递到消息队列,最终被投递到消费者进行消费,消息队列收到机器内存和硬盘资源的限制。
- Consumer:消费者,用于接收并处理消息。
本例中咱们咱们将生产Hello World的消息,经过消费者接受并打印出消息。
生产者Send 关键步骤见注释说明
public class Send { //队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //建立和server之间的链接 connection、channel ConnectionFactory factory = new ConnectionFactory(); //请设置实际部署节点ip factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //声明一个queue去发送消息 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //发布消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
消费者Recv 关键步骤见注释说明
public class Recv { //队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //建立和server之间的链接 connection、channel ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明要去消费的队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //经过该类来处理消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
Work Queues
在本例中咱们将介绍经过RabbitMQ分发耗时任务给多个工做者。RabbitMQ会经过轮询(round-robin)的方式将消息投递给消费者,这是的咱们可以很容易的扩展消费能力。
生产者NewTask 关键步骤见注释说明
public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //将队列设置成持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); //将消息设置成持久化 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
消费者Woker 关键步骤见注释说明
public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //将队列设置成持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //一个消费者最多同时处理一个未确认的消息 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } //模拟耗时任务,一个.表明耗时1S private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
Publish/Subscribe
上面咱们已经介绍过RabbitMQ的核心消息模型,生产者、消费者、队列,在本小节咱们将接触到另外一个消息模型exchange** ,它负责从生产者中接收消息,并把消息投递到队列中。exchage主要有如下几种类型**
- direct
- topic
- headers
- fanout
本例中咱们将已fanout类型做为讲解,经过名称咱们大概也能猜到此类型exchange会广播接收到的消息到其绑定的队列中。
生产者EmitLog 关键步骤见注释说明
public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //建立一个exchange 并指定类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = argv.length < 1 ? "info: Hello World!" : String.join(" ", argv); //此处和以前发消息不同,指定具体的exchange没有指定具体的queue channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
消费者ReceiveLogs 关键步骤见注释说明
public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //建立fanout类型的exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //获取一个独有的,非持久化的,自动删除的队列 String queueName = channel.queueDeclare().getQueue(); //经过绑定方法将exchage和queue之间简历关系 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
Routing
上一个例子中exchange将接收到的信息广播给了绑定的队列中,本例中咱们将增长绑定的一些特定,使exchange有能力经过routingKey(全匹配)来投递不一样的消息到不一样的队列中。例如平常日志区分error日志进单独的队列。
生产者EmitLogDirect 关键步骤见注释说明
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //声明一个direct类型的exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } } private static String getSeverity(String[] strings) { if (strings.length < 1) return "info"; return strings[0]; } private static String getMessage(String[] strings) { if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length <= startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
消费者ReceiveLogsDirect 关键步骤见注释说明
public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for (String severity : argv) { //创建exchange和queue之间关系并设置routingKey channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
Topics
提供更丰富的exchange到queue之间的路由规则。规则经过.分隔的routingKey,最高限制 255bytes。跟以前的全匹配routingKey不一样,topic类型的exchange的routingKey主要增长了两个特性。
- *表明一个单词**。**
- **#** 表明0个或一个单词。
生产者EmitLogTopic 关键步骤见注释说明
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } private static String getRouting(String[] strings) { if (strings.length < 1) return "anonymous.info"; return strings[0]; } private static String getMessage(String[] strings) { if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length < startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
消费者ReceiveLogsTopic 关键步骤见注释说明
public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
引入RabbitMQ带来什么挑战
看到这,各位看官是否是越越欲试想在项目中引入RabbitMQ去优化如今的使用场景,那么是否是咱们部署一个RabbitMQ服务,而后发送消息就高枕无忧了呢?其实在引入一个中间件时,同时伴随着一些问题,若是咱们对这些问题了解不够深刻或者全面,那恭喜你将进入挖坑选手序列。为了成为一个靠谱的程序员,咱们要充分了解引入中间件给咱们 项目带来的挑战,才能在以后的应用上从容应对。下面列了下消息中间件中常见的几类问题
- 消息丢失
- 消息重复
- 消息堆积
- RabbitMQ的可用性保证
以后的文章,咱们将逐个去讲解上述问题的解决方案。 下一讲:RabbitMQ消息可靠性传输
参考文档
https://www.rabbitmq.com/ RabbitMQ官方文档
tips:做者我的经验有限,不足之处烦请指正。