当前位置: 首页>前端>正文

RPC 双向数据流 grpc双向流原理

gRPC 通信是基于 HTTP/2 实现的,它的双向流映射到 HTTP/2 流。HTTP/2 具有流的概念,流是为了实现HTTP/2的多路复用。流是服务器和客户端在HTTP/2连接内用于交换帧数据的独立双向序列,逻辑上可看做一个较为完整的交互处理单元,即表达一次完整的资源请求、响应数据交换流程;一个业务处理单元,在一个流内进行处理完毕,这个流生命周期完结。

特点如下:

一个HTTP/2连接可同时保持多个打开的流,任一端点交换帧
流可被客户端或服务器单独或共享创建和使用
流可被任一端关闭
在流内发送和接收数据都要按照顺序
流的标识符自然数表示,1~2^31-1区间,有创建流的终端分配
流与流之间逻辑上是并行、独立存在
摘自 HTTP/2笔记之流和多路复用 by 聂永

四.gRPC中使用双向流调用
我们在前文中编写的RPC属于简单RPC,没有包含流调用,下面直接讲一下双向流,根据第二小节列举的四种服务类型,如果我们掌握了简单RPC和双向流RPC,那么服务端流式 RPC和客户端流式 RPC自然也就没有问题了。

这里我们继续使用前文的代码,要实现的目标是一次给多个猫洗澡。

① 首先在 LuCat.proto 定义两个rpc,一个 Count 用于统计猫的数量,一个 双向流 RPC BathTheCat 用于给猫洗澡
syntax = “proto3”;
option csharp_namespace = “AspNetCoregRpcService”;
import “google/protobuf/empty.proto”;
 package LuCat; //定义包名//定义服务
 service LuCat{
 //定义给猫洗澡双向流rpc
 rpc BathTheCat(stream BathTheCatReq) returns ( stream BathTheCatResp);
 //定义统计猫数量简单rpc
 rpc Count(google.protobuf.Empty) returns (CountCatResult);
 }message SuckingCatResult{
 string message=1;
 }
 message BathTheCatReq{
 int32 id=1;
 }
 message BathTheCatResp{
 string message=1;
 }
 message CountCatResult{
 int32 Count=1;
 }
 ② 添加服务的实现这里安利下Resharper,非常方便
private readonly ILogger _logger;
 private static readonly List Cats=new List(){“英短银渐层”,“英短金渐层”,“美短”,“蓝猫”,“狸花猫”,“橘猫”};
 private static readonly Random Rand=new Random(DateTime.Now.Millisecond);public LuCatService(ILogger logger)
 {
 _logger = logger;
 }public override async Task BathTheCat(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context)
 {
 var bathQueue=new Queue();
 while (await requestStream.MoveNext())
 {
 //将要洗澡的猫加入队列
 bathQueue.Enqueue(requestStream.Current.Id);_logger.LogInformation($"Cat {requestStream.Current.Id} Enqueue.");
}

//遍历队列开始洗澡
while (bathQueue.TryDequeue(out var catId))
{
    await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!" });

    await Task.Delay(500);//此处主要是为了方便客户端能看出流调用的效果
}}
public override Task Count(Empty request, ServerCallContext context)
 {
 return Task.FromResult(new CountCatResult()
 {
 Count = Cats.Count
 });
 }


BathTheCat 方法会接收多个客户端发来的CatId,然后将他们加入队列中,等客户端发送完成后开始依次洗澡并返回给客户端。

③ 客户端实现

随机发送10个猫Id给服务端,然后接收10个洗澡结果。

var channel = GrpcChannel.ForAddress(“https://localhost:5001”);
 var catClient = new LuCat.LuCatClient(channel);
 //获取猫总数
 var catCount = await catClient.CountAsync(new Empty());
 Console.WriteLine($“一共{catCount.Count}只猫。”);
 var rand = new Random(DateTime.Now.Millisecond);var bathCat = catClient.BathTheCat();
 //定义接收吸猫响应逻辑
 var bathCatRespTask = Task.Run(async() =>
 {
 await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
 {
 Console.WriteLine(resp.Message);
 }
 });
 //随机给10个猫洗澡
 for (int i = 0; i < 10; i++)
 {
 await bathCat.RequestStream.WriteAsync(new BathTheCatReq() {Id = rand.Next(0, catCount.Count)});
 }
 //发送完毕
 await bathCat.RequestStream.CompleteAsync();
 Console.WriteLine(“客户端已发送完10个需要洗澡的猫id”);
 Console.WriteLine(“接收洗澡结果:”);
 //开始接收响应
 await bathCatRespTask;Console.WriteLine(“洗澡完毕”);


④ 运行

可以看到双向流调用成功,客户端发送了10个猫洗澡请求对象,服务端返回了10个猫洗澡结果对象。且是实时推送的,这就是 gRPC 的双向流调用。

这里大家可以自行改进来演变成客户端流式或者服务端流式调用。客户端发送一个猫Id列表,然后服务端返回每个猫洗澡结果,这就是服务端流式调用。客户端依次发送猫Id,然后服务端一次性返回所有猫的洗澡结果(给所有猫洗澡看做是一个业务,返回这个业务的结果),就是客户端流式调用。这里我就不再演示了。

五.流控制
gRPC 的流式调用支持对流进行主动取消的控制,进而可以衍生出流超时限制等控制。

在流式调用是,可以传一个 CancellationToken 参数,它就是我们用来对流进行取消控制的:

1569467990882
改造一下我们在第四小节的代码:
① 客户端
var cts = new CancellationTokenSource();
 //指定在2.5s后进行取消操作
 cts.CancelAfter(TimeSpan.FromSeconds(2.5));
 var bathCat = catClient.BathTheCat(cancellationToken: cts.Token);
 //定义接收吸猫响应逻辑
 var bathCatRespTask = Task.Run(async() =>
 {
 try
 {
 await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
 {
 Console.WriteLine(resp.Message);
 }
 }
 catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
 {
 Console.WriteLine(“Stream cancelled.”);
 }
 });
 ② 服务端//遍历队列开始洗澡
 while (!context.CancellationToken.IsCancellationRequested && bathQueue.TryDequeue(out var catId))
 {
 _logger.LogInformation($“Cat {catId} Dequeue.”);
 await responseStream.WriteAsync(new BathTheCatResp() { Message = $“铲屎的成功给一只{Cats[catId]}洗了澡!” });await Task.Delay(500);//此处主要是为了方便客户端能看出流调用的效果}


③ 运行

设置的是双向流式调用2.5s后取消流,从客户端调用结果看到,并没有收到全部10个猫的洗澡返回结果,流就已经被取消了,这就是 gRPC 的流控制。


https://www.xamrdz.com/web/2ya1962075.html

相关文章: