前面講過一些RabbitMQ的安裝和用法,也說了說RabbitMQ在一般的業務場景下如何使用。不知道的可以看我前面的博客,http://www.cnblogs.com/zhangweizhong/category/855479.html
不過,最近有朋友問我,RabbitMQ RPC 是干嘛的,有什么用。
其實,RabbitMQ RPC 就是通過消息隊列(Message Queue)來實現rpc的功能,就是,客戶端向服務端發送定義好的Queue消息,其中攜帶的消息就應該是服務端將要調用的方法的參數 ,并使用Propertis告訴服務端將結果返回到指定的Queue。
1.RabbitMQ RPC的特點
- Message Queue把所有的請求消息存儲起來,然后處理,和客戶端解耦。
- Message Queue引入新的結點,系統的可靠性會受Message Queue結點的影響。
- Message Queue是異步單向的消息。發送消息設計成是不需要等待消息處理的完成。
所以對于有同步返回需求,Message Queue是個不錯的方向。
2.普通PRC的特點
- 同步調用,對于要等待返回結果/處理結果的場景,RPC是可以非常自然直覺的使用方式。當然RPC也可以是異步調用。
- 由于等待結果,客戶端會有線程消耗。
如果以異步RPC的方式使用,客戶端線程消耗可以去掉。但不能做到像消息一樣暫存消息請求,壓力會直接傳導到服務端。
3.適用場合說明
- 希望同步得到結果的場合,RPC合適。
- 希望使用簡單,則RPC;RPC操作基于接口,使用簡單,使用方式模擬本地調用。異步的方式編程比較復雜。
- 不希望客戶端受限于服務端的速度等,可以使用Message Queue。
4.RabbitMQ RPC工作流程:
基本概念:
Callback queue 回調隊列,客戶端向服務器發送請求,服務器端處理請求后,將其處理結果保存在一個存儲體中。而客戶端為了獲得處理結果,那么客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to。
Correlation id 關聯標識,客戶端可能會發送多個請求給服務器,當服務器處理完后,客戶端無法辨別在回調隊列中的響應具體和那個請求時對應的。為了處理這種情況,客戶端在發送每個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回調隊列中根據correlation_id字段的值就可以分辨此響應屬于哪個請求。
流程說明:
- 當客戶端啟動的時候,它創建一個匿名獨享的回調隊列。
- 在 RPC 請求中,客戶端發送帶有兩個屬性的消息:一個是設置回調隊列的 reply_to 屬性,另一個是設置唯一值的 correlation_id 屬性。
- 將請求發送到一個 rpc_queue 隊列中。
- 服務器等待請求發送到這個隊列中來。當請求出現的時候,它執行他的工作并且將帶有執行結果的消息發送給 reply_to 字段指定的隊列。
- 客戶端等待回調隊列里的數據。當有消息出現的時候,它會檢查 correlation_id 屬性。如果此屬性的值與請求匹配,將它返回給應用
5.完整代碼:
1. 創建兩個控制臺程序,作為RPC Server和RPC Client, 引用 RabbitMQ.Client,
2. RPC Server
class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", noAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); while (true) { string response = null; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } } } } /// <summary> /// Assumes only valid positive integer input. /// Don't expect this one to work for big numbers, /// and it's probably the slowest recursive implementation possible. /// </summary> private static int fib(int n) { if (n == 0 || n == 1) { return n; } Thread.Sleep(1000 * 10); return n; } }
3. RPC Client
class Program { static void Main(string[] args) { for (int i = 0; i < 10; i++) { Stopwatch watch = new Stopwatch(); watch.Start(); var rpcClient = new RPCClient(); Console.WriteLine(string.Format(" [x] Requesting fib({0})", i)); var response = rpcClient.Call(i.ToString()); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); watch.Stop(); Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds)); } Console.WriteLine(" complete!!!! "); Console.ReadLine(); } } class RPCClient { private IConnection connection; private IModel channel; private string replyQueueName; private QueueingBasicConsumer consumer; public RPCClient() { var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue: replyQueueName, noAck: true, consumer: consumer); } public string Call(string message) { var corrId = Guid.NewGuid().ToString(); var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); if (ea.BasicProperties.CorrelationId == corrId) { return Encoding.UTF8.GetString(ea.Body); } } } public void Close() { connection.Close(); } }
4.分別運行Server和Client
6.最后
1.參照RabbitMQ官方教程的RPC,地址:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
2.本文源代碼下載,http://files.cnblogs.com/files/zhangweizhong/Weiz.RabbitMQ.RPC.rar
![]() |
不含病毒。www.avast.com |