kiss-rpc简介:
特性:模拟堆栈式调用方式,支持多值返回,调用简单安全, 服务器采用多线程异步模式,挖掘服务器性能。客户端支持多线程同步和异步模式,超时机制,linux下支持 epoll网络模型,类比grpc,thrift,dubbo快几倍甚至 几十倍。
环境: linux, unix, windows, macOS
传输协议:capnproto
开发语言:dlang
编译器: dmd
github:https://github.com/huntlabs/kiss-rpc
开发者笔记:开发笔记
kiss rpc 同步和异步测试:
环境:ubuntu 16.04 lts(64位)
硬件:xeon cpu e3-1230@3.3GHz x 8
内存:8G
网络:localhost(本地环回)
1.多线程异步非阻塞测试
单链接20万次rpc调用耗时为4秒,每秒的qps数量为5万左右:
1000个并发1000次调用,100万次rpc请求,总共耗时28秒。每秒qps约为3.5万次左右:
2.
多线程同步阻塞测试:
单链接100
万次rpc
调用耗时为53
秒,每秒qps
数量为1.8
万次左右:
1000
个连接1000
调用,总计100
万次rpc
调用,耗时46
秒,每秒qps
为2.1
万次:
海量互联网业务系统只能依赖分布式架构来解决,而分布式开发的基石则是RPC;本文主要针对两个开源的RPC框架(gRPC、 Apache Thrift),以及配合GoLang、C++两个开发语言进行性能对比分析。
测试场景client, server都是单进程,长连接,在单次连接内发起1w(5w)次rpc调用,计算耗时;
client, server都是单进程,短连接,共发起1w(5w)次连接,每次连接单次RPC调用,计算耗时;
并发4个client进程,每个进程长连接10w rpc,服务端单进程多线程(协程),计算耗时;
由于不同语言,耗时统计存在偏差,比如boost.timer在程序里计算出来的耗时明显偏小,所以统一使用linux命令time来计算耗时;
测试数据和分析
一、 单进程下,长短连接,两个RPC框架和两大语言对比
小结:
整体上看,长连接性能优于短连接,性能差距在两倍以上;
对比Go语言的两个RPC框架,Thrift性能明显优于gRPC,性能差距也在两倍以上;
对比Thrift框架下的的两种语言,长连接下Go 与C++的RPC性能基本在同一个量级,在短连接下,Go性能大概是C++的二倍;
对比Thrift&C++下的TSimpleServer与TNonblockingServer,在单进程客户端长连接的场景下,TNonblockingServer因为存在线程管理开销,性能较TSimpleServer差一些;但在短连接时,主要开销在连接建立上,线程池管理开销可忽略;
两套RPC框架,以及两大语言运行都非常稳定,5w次请求耗时约是1w次的5倍;
二、 多进程(线程,协程)下,两大RPC框架和两大语言对比
编写rpc接口时候,客户端的rpc文件和服务端rpc文件必须保持一致,务必保持对应的文件目录结构,文件名,类名,函数名,否则出现调用出错。
服务端接口:
1.网络事件触发模块,:interface server_socket_event_interface //服务端网络事件触发接口
{
void listen_failed(const string str); //监听失败
void inconming(rpc_socket_base_interface socket); //连接进入
void disconnectd(rpc_socket_base_interface socket); //连接断开
void write_failed(rpc_socket_base_interface socket); //写入失败
void read_failed(rpc_socket_base_interface socket); //读取失败
}
2.socket接口调用:
interface rpc_socket_base_interface //socket操作
{
bool doWrite(byte[] data); //写入数据
int getFd(); //获取fd
string getIp(); //获取ip
string getPort(); //获取端口
void disconnect(); //断开连接
}
3.绑定rpc:
rpc_server_impl!(hello) rp_impl; //绑定rpc类接口
rp_impl = new rpc_server_impl!(hello)(rp_server); //绑定对应的服务端口
rp_impl.bind_request_callback("say", &this.say); //绑定对应的rpc函数
rpc函数编写:
void say(rpc_request req)
{
auto resp = new rpc_response(req); //resp绑定对应的requst
string r_s; //取出的参数类型
int r_i, r_num;
double r_d;
req.pop(r_s, r_num, r_i, r_d); //取出对应的调用参数,必须和调用端的参数一致
//writefln("hello.say:%s, %s, %s, num:%s,", r_s, r_i, r_d, r_num);
resp.push(r_s ~ ":server response"~ to!string(r_i), r_num, r_i+1, r_d+0.2); //压入参数返回给调用端
rp_impl.response(resp); //返回参数给调用端
}
5.启动服务端口:
auto rp_server = new rpc_server(new server_socket); //服务端口绑定对应的 socket事件
auto hello_server_test = new hello(rp_server); //rpc类绑定对应的服务端口
auto poll = new GroupPoll!(); //创建线程管理组
rp_server.listen("0.0.0.0", 4444, poll); //监听端口,并绑定线程管理组
poll.start(); //启动线程管理
poll.wait(); //等待事件触发
客户端接口:
1.网络事件模块:
interface client_socket_event_interface //客户端网络事件触发接口
{
void connectd(rpc_socket_base_interface socket); //连接成功
void disconnectd(rpc_socket_base_interface socket); //断开连接
void write_failed(rpc_socket_base_interface socket); //写入失败
void read_failed(rpc_socket_base_interface socket); //读取失败
}
2.socket接口调用:
interface rpc_socket_base_interface //socket操作
{
bool doWrite(byte[] data); //写入数据
int getFd(); //获取fd
string getIp(); //获取ip
string getPort(); //获取端口
void disconnect(); //断开连接
}
3.绑定rpc:
rpc_client_impl!(hello) rp_impl; //绑定rpc调用的类
rp_impl = new rpc_client_impl!(hello)(rp_client); //rpc绑定socket
4.同步rpc调用:
auto req = new rpc_request; //创建一个rpc请求
req.push(s, 1, i, 0.1); //压入参数
rpc_response resp = rp_impl.sync_call(req); //同步调用服务器rpc接口
if(resp.get_status == RESPONSE_STATUS.RS_OK) //判断调用是否成功
{
string r_s;
int r_i, r_num;
double r_d;
resp.pop(r_s, r_num, r_i, r_d); //取出服务器返回的参数
//writefln("server response:%s, %s, %s", r_s, r_i, r_d);
if(r_i % 100000 == 0)
{
writefln("single connect test, sync rpc request num:%s, total time:%s", r_i, Clock.currStdTime().stdTimeToUnixTime!(long)()- start_clock);
}
}
5.异步调用:
auto req = new rpc_request; //创建一个rpc请求
req.push(s, 1, i, 0.1); //压入参数
rp_impl.async_call(req, delegate(rpc_response resp){ //远程异步回调
if(resp.get_status == RESPONSE_STATUS.RS_OK) //判断调用是否成功
{
string r_s;
int r_i, r_num;
double r_d;
resp.pop(r_s, r_num, r_i, r_d);//取出服务器返回的参数
//writefln("server response:%s, %s, %s", r_s, r_i, r_d);
if(r_i%20000 == 0)
{
writefln("single connect test, rpc request num:%s, total time:%s", r_i, Clock.currStdTime().stdTimeToUnixTime!(long)()- start_clock);
}
}else
{
writeln("error", resp.get_status);
}
});
6.客户端启动:
import kiss.util.Log;
load_log_conf("default.conf"); //日志配置
auto poll = new GroupPoll!(); //创建线程管理组
auto client = new client_socket; //创建客户端socket
client.connect_to_server(poll); //连接到服务器
poll.start; //启动线程管理组
poll.wait; //等待事件触发
服务端调用代码:
1.倒入头文件:
import KissRpc.unit;
import KissRpc.rpc_server;
import KissRpc.rpc_server_impl;
import KissRpc.rpc_response;
import KissRpc.rpc_socket_base_interface;
import KissRpc.rpc_request;
import kiss.event.GroupPoll;
2.监听端口:
auto rp_server = new rpc_server(new server_socket);
auto hello_server_test = new hello(rp_server);
auto poll = new GroupPoll!();
rp_server.listen("0.0.0.0", 4444, poll);
poll.start();
poll.wait();
3.绑定rpc事件:
class hello{
this(rpc_server rp_server)
{
rp_impl = new rpc_server_impl!(hello)(rp_server);
rp_impl.bind_request_callback("say", &this.say);
}
shared static int call_count = 0;
void say(rpc_request req)
{
auto resp = new rpc_response(req);
string r_s;
int r_i, r_num;
double r_d;
req.pop(r_s, r_num, r_i, r_d);
//writefln("hello.say:%s, %s, %s, num:%s,", r_s, r_i, r_d, r_num);
resp.push(r_s ~ ":server response"~ to!string(r_i), r_num, r_i+1, r_d+0.2);
rp_impl.response(resp);
}
rpc_server_impl!(hello) rp_impl;
}
4.socket事件:
class server_socket : server_socket_event_interface
{
void listen_failed(const string str)
{
de_writeln("server listen failed", str);
}
void disconnectd(rpc_socket_base_interface socket)
{
de_writeln("client is disconnect");
}
shared static int connect_num;
void inconming(rpc_socket_base_interface socket)
{
writefln("client inconming:%s:%s, connect num:%s", socket.getIp, socket.getPort, connect_num++);
}
void write_failed(rpc_socket_base_interface socket)
{
de_writefln("write buffer to client is failed, %s:%s", socket.getIp, socket.getPort);
}
void read_failed(rpc_socket_base_interface socket)
{
de_writefln("read buffer from client is failed, %s:%s", socket.getIp, socket.getPort);
}
}
客户端调用代码:
1.倒入头文件:
import KissRpc.rpc_request;
import KissRpc.rpc_client_impl;
import KissRpc.rpc_client;
import KissRpc.unit;
import KissRpc.rpc_response;
import KissRpc.rpc_socket_base_interface;
import kiss.event.GroupPoll;
2.连接服务器:
import kiss.util.Log;
load_log_conf("default.conf");
auto poll = new GroupPoll!();
for(int i= 0; i< test_client; i++)
{
auto client = new client_socket(i);
client.connect_to_server(poll);
}
poll.start;
poll.wait;
3.异步调用:
auto req = new rpc_request;
req.push(s, 1, i, 0.1);
rp_impl.async_call(req, delegate(rpc_response resp){ //异步调用接口
if(resp.get_status == RESPONSE_STATUS.RS_OK)
{
string r_s;
int r_i, r_num;
double r_d;
resp.pop(r_s, r_num, r_i, r_d);
//writefln("server response:%s, %s, %s", r_s, r_i, r_d);
if(r_i%20000 == 0)
{
writefln("single connect test, rpc request num:%s, total time:%s", r_i, Clock.currStdTime().stdTimeToUnixTime!(long)()- start_clock);
}
}else
{
writeln("error", resp.get_status);
}
});
4.同步调用: auto req = new rpc_request;
req.push(s, num, i, 0.1);
rpc_response resp = rp_impl.sync_call(req); //同步调用接口
if(resp.get_status == RESPONSE_STATUS.RS_OK)
{
string r_s;
int r_num;
int r_i;
double r_d;
resp.pop(r_s, r_num, r_i, r_d);
//writefln("hello.say:%s, %s, %s, num:%s", r_s, r_i, r_d, r_num);
finish_num++;
if(r_i == test_num)
{
writefln("%s connect test, client num:%s, rpc request num:%s, total time:%s",r_num, r_num, r_i, Clock.currStdTime().stdTimeToUnixTime!(long)()- start_clock);
if(finish_num == test_num* test_client)
{
writefln("$$$$$$$$$$$ %s connect test, client num:%s, rpc request num:%s, total time:%s",test_client, test_client, finish_num, Clock.currStdTime().stdTimeToUnixTime!(long)()- start_clock);
}
}
}else
{
writeln("error, ", resp.get_status);
}
5.socket事件:
class client_socket : client_socket_event_interface
{
this()
{
rp_client = new rpc_client(this);
}
void connect_to_server(GroupPoll!() poll)
{
rp_client.connect("0.0.0.0", 4444, poll);
}
void connectd(rpc_socket_base_interface socket)
{
de_writefln("connect to server, %s:%s", socket.getIp, socket.getPort);
auto hello_client = new hello(rp_client);
start_clock = Clock.currStdTime().stdTimeToUnixTime!(long)();
for(int i= 0; i < test_num; ++i)
{
hello_client.say("test hello client", i);
}
}
void disconnectd(rpc_socket_base_interface socket)
{
de_writefln("client disconnect ....");
}
void write_failed(rpc_socket_base_interface socket)
{
de_writefln("client write failed , %s:%s", socket.getIp, socket.getPort);
}
void read_failed(rpc_socket_base_interface socket)
{
de_writefln("client read failed , %s:%s", socket.getIp, socket.getPort);
}
private:
rpc_client rp_client;
}