假如咱们想要调用远程的一个方法或函数并等待执行结果,也就是咱们一般说的远程过程调用(Remote Procedure Call)。怎么办?dom
今天咱们就用RabbitMQ来实现一个简单的RPC系统:客户端发送一个请求消息,服务端以一个响应消息回应。为了可以接收到响应,客户端在发送消息的同时发送一个回调队列用来告诉服务端响应消息发送到哪一个队列里面。也就是说每一个消息一个回调队列,在此基础上咱们变下,将回调队列定义成类的属性,这个每一个客户端一个队列,同一个客户端的请求共用一个队列。那么接下来有个问题,怎么知道这个队列里面的响应消息是属于哪一个队列的呢?ide
咱们会用到关联标识(correlationId),每一个请求咱们都会生成一个惟一的值做为correlationId,这样每次有响应消息来的时候,咱们就去看correlationId来肯定究竟是哪一个请求的响应消息,将请求和响应关联起来。若是收到一个不知道的correlationId,就能够肯定不是这个客户端的请求的响应,能够直接丢弃掉。函数
1、工做模型ui
rpc_queue
队列的请求。当有请求到来时,它就会开始干活并将结果经过发送消息来返回,该返回消息发送到replyTo
指定的队列。correlation id
属性。若是该属性值和请求匹配,就将响应返回给程序。2、代码实现spa
接下来看代码实现:线程
public class RpcClient { Connection connection = null; Channel channel = null; //回调队列:用来接收服务端的响应消息 String queueName = ""; // 定义RpcClient public RpcClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); queueName = channel.queueDeclare().getQueue(); } // 真正的处理逻辑 public String call(String msg) throws IOException, InterruptedException { final String uuid = UUID.randomUUID().toString(); //后续,服务端根据"replyTo"来指定将返回信息写入到哪一个队列 //后续,服务端根据关联标识"correlationId"来指定返回的响应是哪一个请求的 AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().replyTo(queueName).correlationId(uuid).build(); channel.basicPublish("", RpcServer.QUEUE_NAME, prop, msg.getBytes()); final BlockingQueue<String> blockQueue = new ArrayBlockingQueue<String>(1); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(uuid)) { String msg = new String(body, "UTF-8"); blockQueue.offer(msg); System.out.println("**** rpc client reciver response :[" + msg + "]"); } } }); return blockQueue.take(); } //关闭链接 public void close() throws IOException { connection.close(); } public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { RpcClient client = new RpcClient(); client.call("4"); client.close(); } }
发送请求的时候,它是生产者;接受响应的时候,它是消费者。3d
public class RpcServer { //RPC队列名 public static final String QUEUE_NAME = "rpc_queue"; //斐波那契数列,用来模拟工做任务 public static int fib(int num) { if (num == 0) return 0; if (num == 1) return 1; return fib(num - 1) + fib(num - 2); } public static void main(String[] args) throws InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; try { // 1.connection & channel connection = factory.newConnection(); final Channel channel = connection.createChannel(); // 2.declare queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("****** rpc server waiting for client request ......"); // 3.每次只接收一个消息(任务) channel.basicQos(1); //4.获取消费实例 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { BasicProperties prop = new BasicProperties().builder().correlationId(properties.getCorrelationId()) .build(); String resp = ""; try { String msg = new String(body, "UTF-8"); resp = fib(Integer.valueOf(msg)) + ""; System.out.println("*** will response to rpc client :" + resp); } catch (Exception ex) { ex.printStackTrace(); } finally { channel.basicPublish("", properties.getReplyTo(), prop, resp.getBytes()); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 5.消费消息(处理任务) channel.basicConsume(QUEUE_NAME, false, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
接受请求的时候,它是消费者;发送响应的时候,它是生产者。code
服务端(多了一条打印): ****** rpc server waiting for client request ...... *** will response to rpc client :3 客户端: **** rpc client reciver response :[3]
3、小插曲server
刚开始我在写demo的时候,client中没有用到阻塞队列final BlockingQueue<String> blockQueue = new ArrayBlockingQueue<String>(1);,而是直接这样写:blog
@Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(uuid)) { String msg = new String(body, "UTF-8"); //blockQueue.offer(msg); System.out.println("**** rpc client reciver response :[" + msg + "]"); } }
指望能打印出结果来,可是运行后发现并无打印:**** rpc client reciver response :[" + msg + "]的值。
缘由是handleDelivery()这个方法是在子线程中运行的,这个子线程运行的时候,主线程会继续日后执行直到执行了client.close();方法而结束了。
因为主线程终止了,致使没有打印出结果。加了阻塞队列以后将主线程阻塞不执行close()方法,问题就解决了。