一、Linux安装RabbitMQ。html
参考网址:RPM安装RabbitMQ 仔细阅读。python
先安装erlang:react
su -c 'rpm -Uvh http://mirrors.neusoft.edu.cn/epel/epel-release-latest-7.noarch.rpm' ... su -c 'yum install foo'
####################################
建议使用这个安装erlang库
# 这里还有http://www.rabbitmq.com/releases/erlang/ erlang的各个版本下载,安装这个便可。
####################################wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
yum install erlang
下载rabbitmq 的rpm包并安装json
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc yum install rabbitmq-server-3.6.2-1.noarch.rpm
安装时可能会报须要安装esl-erlang_16.b.3。这个能够到这里下载https://www.erlang-solutions.com/resources/download.html服务器
二、开始编写rabbitmq代码,参照http://www.rabbitmq.com/getstarted.html,重点,下面就不写具体代码了。app
cp /usr/share/doc/rabbitmq-server-3.6.2/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config ,service rabbitmq-server restartide
1)遇到的问题,报错:pika.exceptions.ProbableAuthenticationError,能够查看日志(重要):函数
tail -f /var/log/rabbitmq/rabbit\@Minion.logoop
=ERROR REPORT==== 27-Jun-2016::02:06:19 ===
Error on AMQP connection <0.542.0> (192.168.243.131:43681 -> 192.168.243.131:5672, state: starting):
PLAIN login refused: user 'guest' can only connect via localhostfetch
解决办法:修改配置文件:%% {loopback_users, [<<"guest">>]}, 为 {loopback_users, []} ,service rabbitmq restart,参考http://www.rabbitmq.com/configure.html
二、rabbitmq-server启动,发送、接收、等各类命令执行慢的出奇,缘由是dns配置问题。
三、no_ack = False(默认),就拿最简单的"hello world“来讲,启动两个recive.py,callback()函数里面根据接收的消息的dot数来sleep,在send.py端连续发送7个消息(带6个点),这时中止一个recive.py,会看到这7个消息会发送到另外一个recive.py。可是这里你会发现执行rabbitmqctl list_queues显示队列的消息数并无减小。
这里呀no_ack = False应该只表示 revice告诉queue,我接收完消息会发acK的,可是发不发ack由:ch.basic_ack(delivery_tag = method.delivery_tag)控制,这个能够写到callback()最后面。
rabbitmqctl list_queues name messages_ready messages_unacknowledged能够看到没有收到ack的消息数量。
四、消息和队列持久化
队列持久化:
channel.queue_declare(queue='hello', durable=True)
消息持久化:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee then you can use publisher confirms.
五、公平分发
In order to defeat that we can use the basic.qos method with the prefetch_count=1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
channel.basic_qos(prefetch_count=1)
六、消息广播 ,下面这种方式当先开send的话,send的消息会丢失。
发送端定义exchange:
channel.exchange_declare(exchange='logs', type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body=message)
接收端绑定queue到exchange。
result = channel.queue_declare(exclusive=True) # exclusive=Ture,当客户端断开,则此queue也将随之被销毁。
channel.queue_bind(exchange='logs', queue=result.method.queue) # 绑定queue 到 exchange
[root@test2 ~]# rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-O86WeoCyhQu0YvjtJo7Dew queue amq.gen-O86WeoCyhQu0YvjtJo7Dew []
logs exchange amq.gen-fYeLgPULbT7hVLgahg21jQ queue amq.gen-fYeLgPULbT7hVLgahg21jQ []
七、消息路由,发送时指定exchange和routing_key,绑定了此routing_key的exchange和queue会接收到此消息。
A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with abasic_publish parameter we're going to call it a binding key. This is how we could create a binding with a key:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
The meaning of a binding key depends on the exchange type. The fanout exchanges, which we used previously, simply ignored its value.
We will use a direct exchange instead. The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing keyof the message.
To illustrate that, consider the following setup:
In this setup, we can see the direct exchange X with two queues bound to it. The first queue is bound with binding key orange, and the second has two bindings, one with binding key blackand the other one with green.
In such a setup a message published to the exchange with a routing key orange will be routed to queue Q1. Messages with a routing key of black or green will go to Q2. All other messages will be discarded.
一个binding_key也能够绑定到多个queue。
It is perfectly legal to bind multiple queues with the same binding key. In our example we could add a binding between X and Q1 with binding key black. In that case, the direct exchange will behave like fanout and will broadcast the message to all the matching queues. A message with routing key black will be delivered to both Q1 and Q2.
发送端:
Like always we need to create an exchange first:
channel.exchange_declare(exchange='direct_logs', type='direct')
And we're ready to send a message:
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
To simplify things we will assume that 'severity' can be one of 'info', 'warning', 'error'.
接收端:
Receiving messages will work just like in the previous tutorial, with one exception - we're going to create a new binding for each severity we're interested in.
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
启动接收端:python receive_logs_direct.py info warning error
在rabbitmq-server服务器:rabbitmqctl list_bindings
direct_logs exchange amq.gen-W4eTG2YC97A9aMwBVxeJJQ queue error []
direct_logs exchange amq.gen-W4eTG2YC97A9aMwBVxeJJQ queue info []
direct_logs exchange amq.gen-2jS9NghfwuydRFdY5XHkhw queue info []
direct_logs exchange amq.gen-W4eTG2YC97A9aMwBVxeJJQ queue warning []
八、更复杂的消息路由
In this example, we're going to send messages which all describe animals. The messages will be sent with a routing key that consists of three words (two dots). The first word in the routing key will describe a celerity, second a colour and third a species: "<celerity>.<colour>.<species>".
We created three bindings: Q1 is bound with binding key "*.orange.*" and Q2 with "*.*.rabbit" and "lazy.#".
These bindings can be summarised as:
A message with a routing key set to "quick.orange.rabbit" will be delivered to both queues. Message "lazy.orange.elephant" also will go to both of them. On the other hand "quick.orange.fox" will only go to the first queue, and "lazy.brown.fox" only to the second. "lazy.pink.rabbit" will be delivered to the second queue only once, even though it matches two bindings. "quick.brown.fox" doesn't match any binding so it will be discarded.
What happens if we break our contract and send a message with one or four words, like "orange" or "quick.orange.male.rabbit"? Well, these messages won't match any bindings and will be lost.
On the other hand "lazy.orange.male.rabbit", even though it has four words, will match the last binding and will be delivered to the second queue.
九、消息属性,经过在send端,basic_publish(properties = pika.BasicProperties(xx=xxx)配置。
The AMQP protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
十、RPC
Our RPC will work like this:
The code for rpc_server.py:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
The server code is rather straightforward:
The code for rpc_client.py:
#!/usr/bin/env python import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
The client code is slightly more involved:
Our RPC service is now ready. We can start the server:
$ python rpc_server.py
[x] Awaiting RPC requests
To request a fibonacci number run the client:
$ python rpc_client.py
[x] Requesting fib(30)
The presented design is not the only possible implementation of a RPC service, but it has some important advantages:
Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:
下章将分析上面这几个问题。