长距离过程调用(Remote Proceddure call【RPC】)

(本实例都是用的Net的客户端,使用C#编写)

 
在老二只科目吃,我们学习了怎么下工作列在差不多单干活实例之间分配耗时的职责。

  但是,如果我们得以长距离计算机及运行效果并等候结果怎么处置?
那是一个不等之故事。 此模式通常称为远程过程调用或RPC。

 
以本教程中,我们用以RabbitMQ构建一个RPC系统:一个客户机和一个可是扩大的RPC服务器。
由于我们无其余值得分发的耗时任务,我们用创造一个回来斐波纳契数字的虚拟RPC服务。

1、客户端接口【Client Interface】

  为了证实什么采取RPC服务,我们拿创设一个简易的客户端类。
它以公开一个曰吧call的方,该措施发送RPC请求并阻塞,直到接收到答案:

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

   关于RPC的注释

  
虽然RPC是一个雅普遍的精打细算模式,但它们常给批评。
当系统出现问题的时候,程序员不理解函数调用是本土函数还是慢的RPC调用,这样的紊乱导致了系统的未可预测性,并加了调剂的扑朔迷离。
滥用RPC可能引致代码的可维护性很不同,这样的计划不仅没简化软件,而且光会是系还不行。

   铭记这或多或少,请考虑以下建议:

    
确保显而易见哪个函数调用是本土的,哪个是远程的。
     记录您的系。
使组件之间的凭关系消除。
     处理错误情况。
当RPC服务器停机很丰富时晚,客户端应该怎么样反馈?

    当有疑点避免RPC。
如果可以的话,您应该使异步管道 –
而非是近乎RPC的不通,将异步推送至下一个测算等。

2、回调队列【Callback queue】
 
   一般的话RPC对RabbitMQ来说十分轻。
客户端发送请求消息,服务器恢复同样漫长响应消息。
为了收到一个应,我们要发送一个告于’回调’的队列地址:

var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;

var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "rpc_queue",
                     basicProperties: props,
                     body: messageBytes);

// ... then code to read a response message from the callback_queue ...

信息属性

  AMQP
0-9-1商预先定义了一如既往组14独随附消息的性能。
大多数性质很少动,除了以下内容:

 
deliveryMode:将信息标记为persistent(值也2)或transient(任何其他价值)。
您可能会见打第二单学科被切记此属性。
  contentType:用于描述mime类型的编码。
例如对于时常利用的JSON编码,将这属性设置为:application /
json是一个不行好之做法。
 
replyTo:通常用来命名一个回调队列。
 
correlationId:用于将RPC响应和请求相关联。

3、相关标识【Correlation Id】

 
在方所提出的方被,我们建议为每个RPC请求创建一个回调队列。这是特别低效的,但万幸的是产生一个双重好的措施

  • 为我们啊每个客户端创建一个回调队列。

 
这将掀起了一个初题材,在拖欠队中接受响应,响应所属的请求是未明白之。此时正是利用correlationId属性的时。我们拿为每个请求设置一个唯一的价。稍后,当我们在回调队列中接到一模一样长达信息时,我们拿翻开此属性,并且根据这,我们将会将应与请求相匹配。如果我们看一个未知之correlationId值,我们好高枕无忧地废弃该消息

  • 它们不属于我们的伸手。

 
您或许会见问,为什么咱们应当忽视回调队列中的未知消息,而无是出新谬误?这是出于服务器端可能出现竞争状况。虽然不太可能,RPC服务器可能会见于发送答复后,但于殡葬请求的确认信息之前死亡。如果发这种景象,重新起动的RPC服务器将再也拍卖该要。这虽是为什么在客户端上,我们务必优雅地处理这些又的响应,并且RPC应该好好地是幂等的。

4、概要【Summary】

 图片 1
  我们的RPC将如这样工作:

     当客户端启动时,它创建一个匿名独占回调队列。
    
对于RPC请求,客户端发送一个有两单特性的信:replyTo,它于装置也回调队列和correlationId,它给设置为每个请求的绝无仅有值。
     请求于发送到rpc_queue队列。
     RPC worker(aka:server)正在等队列上之呼吁。
当请求出现时,它以实施该学业,并使replyTo字段中之阵将结果发送回客户端。
     客户端等待回呼队列中的数量。
当信息出现常常,它检查correlationId属性。
如果其与请求被的价值相配合,则回对应用程序的应。

5、整合

  斐波纳契【Fibonacci】任务:

private static int fib(int n)
  {
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
  }

  我们声明斐波那契函数。 它才如有效之正整数输入。
(不要期待这一个克也那个数字工作,而且这也许是极端缓慢的递归实现)

  
我们的RPC服务器RPCServer.cs的代码如下所示:

 1  using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5 
 6 class RPCServer
 7 {
 8     public static void Main()
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using (var connection = factory.CreateConnection())
12         using (var channel = connection.CreateModel())
13         {
14             channel.QueueDeclare(queue: "rpc_queue", durable: false,
15               exclusive: false, autoDelete: false, arguments: null);
16             channel.BasicQos(0, 1, false);
17             var consumer = new EventingBasicConsumer(channel);
18             channel.BasicConsume(queue: "rpc_queue",
19               noAck: false, consumer: consumer);
20             Console.WriteLine(" [x] Awaiting RPC requests");
21 
22             consumer.Received += (model, ea) =>
23             {
24                 string response = null;
25 
26                 var body = ea.Body;
27                 var props = ea.BasicProperties;
28                 var replyProps = channel.CreateBasicProperties();
29                 replyProps.CorrelationId = props.CorrelationId;
30 
31                 try
32                 {
33                     var message = Encoding.UTF8.GetString(body);
34                     int n = int.Parse(message);
35                     Console.WriteLine(" [.] fib({0})", message);
36                     response = fib(n).ToString();
37                 }
38                 catch (Exception e)
39                 {
40                     Console.WriteLine(" [.] " + e.Message);
41                     response = "";
42                 }
43                 finally
44                 {
45                     var responseBytes = Encoding.UTF8.GetBytes(response);
46                     channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
47                       basicProperties: replyProps, body: responseBytes);
48                     channel.BasicAck(deliveryTag: ea.DeliveryTag,
49                       multiple: false);
50                 }
51             };
52 
53             Console.WriteLine(" Press [enter] to exit.");
54             Console.ReadLine();
55         }
56     }
57 
58     ///
59 
60     /// Assumes only valid positive integer input.
61     /// Don't expect this one to work for big numbers, and it's
62     /// probably the slowest recursive implementation possible.
63     ///
64 
65     private static int fib(int n)
66     {
67         if (n == 0 || n == 1)
68         {
69             return n;
70         }
71 
72         return fib(n - 1) + fib(n - 2);
73     }
74 }

 服务器代码相当简单:

     像往常一样,我们开始成立连接,通道并扬言队列。
     我们兴许想使运行多独服务器进程。
为了当多单服务器上平均分配负载,我们用在channel.basicQos中安prefetchCount设置。
     我们利用basicConsume访问队列。
然后我们报了名一个付出处理程序,我们于里面开展工作并作回响应。

咱们的RPC客户端的代码RPCClient.cs:

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 using RabbitMQ.Client.Events;
 8 
 9 class RPCClient
10 {
11     private IConnection connection;
12     private IModel channel;
13     private string replyQueueName;
14     private QueueingBasicConsumer consumer;
15 
16     public RPCClient()
17     {
18         var factory = new ConnectionFactory() { HostName = "localhost" };
19         connection = factory.CreateConnection();
20         channel = connection.CreateModel();
21         replyQueueName = channel.QueueDeclare().QueueName;
22         consumer = new QueueingBasicConsumer(channel);
23         channel.BasicConsume(queue: replyQueueName,
24                              noAck: true,
25                              consumer: consumer);
26     }
27 
28     public string Call(string message)
29     {
30         var corrId = Guid.NewGuid().ToString();
31         var props = channel.CreateBasicProperties();
32         props.ReplyTo = replyQueueName;
33         props.CorrelationId = corrId;
34 
35         var messageBytes = Encoding.UTF8.GetBytes(message);
36         channel.BasicPublish(exchange: "",
37                              routingKey: "rpc_queue",
38                              basicProperties: props,
39                              body: messageBytes);
40 
41         while(true)
42         {
43             var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
44             if(ea.BasicProperties.CorrelationId == corrId)
45             {
46                 return Encoding.UTF8.GetString(ea.Body);
47             }
48         }
49     }
50 
51     public void Close()
52     {
53         connection.Close();
54     }
55 }
56 
57 class RPC
58 {
59     public static void Main()
60     {
61         var rpcClient = new RPCClient();
62 
63         Console.WriteLine(" [x] Requesting fib(30)");
64         var response = rpcClient.Call("30");
65         Console.WriteLine(" [.] Got '{0}'", response);
66 
67         rpcClient.Close();
68     }
69 }

客户端代码稍微复杂一些:

     我们成立一个连和通道,并为回覆声明一个独揽的’回调’队列。
     我们订阅’回调’队列,这样咱们好收起RPC响应。
     我们的调用方法让实际的RPC请求。
     在此地,我们首士人化作一个唯一的correlationId数字并保留其 –
while循环将动用此值来捕获适当的应。
    
接下来,我们揭晓请求消息,此要消息具有两单特性:replyTo和correlationId。
     在当下或多或少上,我们可因下来等待合适的响应到达。
    
while循环正在开一个非常简单的干活,对于每个响应消息,它检查correlationId是否是咱正找的。
如果是这般,它见面保留应。
     最后,我们拿响应返回给用户。

让客户端发送请求:

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

本凡省我们的RPCClient.cs和RPCServer.cs的共同体示例源代码(包括核心好处理)的好机会。

照常设置(参见教程一):

咱的RPC服务现在早已准备好了。 我们可启动服务器:

cd RPCServer
dotnet run
# => [x] Awaiting RPC requests

假定请求运行客户端的fibonacci号码:

cd RPCClient
dotnet run
# => [x] Requesting fib(30)

此地提出的统筹无是RPC服务之绝无仅有可能的实现,而是有着部分重大之助益:

     如果RPC服务器太慢,可以透过运行另一个RPC服务器进行扩展。
尝试以新的控制台中运作第二只RPCServer。
     在客户端,RPC需要发送和吸收一修消息。
不需像queueDeclare这样的联手调用。
因此,RPC客户端只需要一个网往返单个RPC请求。

咱们的代码仍然非常简单,没有尝试解决再复杂(但重点的)问题,例如:

     如果无服务器运行,客户端应该怎么反应?
     客户端是否要RPC的某种超时时间?
     如果服务器出故障并吸引那个,应该用那转会给客户端?
     在处理之前预防无效的扩散消息(例如检查边界,类型)。
好了,这个系列也赶紧结束了。

当拿原本地方贴出,让大家探听又多。地址如下:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

相关文章

网站地图xml地图