介绍
go-kit提供了限流模块,该模块采用令牌桶算法实现,其实是封装了一下golang自带的golang.org/x/time/rate包来实现的。
令牌桶
令牌桶这种控制机制基于令牌桶中是否存在令牌来指示什么时候可以发送流量。令牌桶中的每一个令牌都代表一个字节。如果令牌桶中存在令牌,则允许发送流量;而如果令牌桶中不存在令牌,则不允许发送流量。因此,如果突发门限被合理地配置并且令牌桶中有足够的令牌,那么流量就可以以峰值速率发送。
令牌桶算法的基本过程如下:
假如用户配置的平均发送速率为r,则每隔1/r秒一个令牌被加入到桶中;
假设桶最多可以存发b个令牌。如果令牌到达时令牌桶已经满了,那么这个令牌会被丢弃;
当一个n个字节的[数据包]到达时,就从令牌桶中删除n个令牌,并且数据包被发送到网络;
如果令牌桶中少于n个令牌,那么不会删除令牌,并且认为这个数据包在流量限制之外;
两种限流
1、DelayingLimiter【限流延迟访问】
2、ErroringLimiter【限流错误返回】
Middleware
因为endpoint的封装,我们在使用go-kit提供的其它中间件时十分简单。下面就是一个完整的限流延迟中间件
把已有的endPoint外再包一层endPoint,再从最外层向内一层层调用
func NewDelayingLimiter(limit Waiter) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
if err := limit.Wait(ctx); err != nil {
return nil, err
}
return next(ctx, request)
}
}
}
使用延迟限流
在go-kit 实现注册发现与负载均衡,Server处代码做一下更改:
原来代码:
bookServer := new(BookServer)
bookInfoHandler := grpc_transport.NewServer(
makeGetBookInfoEndpoint(),
decodeRequest,
encodeResponse,
)
bookServer.bookInfoHandler = bookInfoHandler
更改之后的代码:
bookServer := new(BookServer)
bookInfoEndPoint:=makeGetBookInfoEndpoint()
//rate路径:golang.org/x/time/rate
limiter := rate.NewLimiter(rate.Every(time.Second * 3), 1)//限流3秒,临牌数:1
//通过DelayingLimiter中间件,在bookInfoEndPoint 的外层再包裹一层限流的endPoint
bookInfoEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookInfoEndPoint)
bookInfoHandler := grpc_transport.NewServer(
bookInfoEndPoint, //限流的endpoint
decodeRequest,
encodeResponse,
)
bookServer.bookInfoHandler = bookInfoHandler
完整代码
1、Server端:
package main
import (
"MyKit"
"context"
"fmt"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/sd/etcdv3"
grpc_transport "github.com/go-kit/kit/transport/grpc"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"net"
"time"
)
type BookServer struct {
bookListHandler grpc_transport.Handler
bookInfoHandler grpc_transport.Handler
}
//一下两个方法实现了 protoc生成go文件对应的接口:
/*
// BookServiceServer is the server API for BookService service.
type BookServiceServer interface {
GetBookInfo(context.Context, *BookInfoParams) (*BookInfo, error)
GetBookList(context.Context, *BookListParams) (*BookList, error)
}
*/
//通过grpc调用GetBookInfo时,GetBookInfo只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
func (s *BookServer) GetBookInfo(ctx context.Context, in *book.BookInfoParams) (*book.BookInfo, error) {
_, rsp, err := s.bookInfoHandler.ServeGRPC(ctx, in)
if err != nil {
return nil, err
}
/*
if info,ok:=rsp.(*book.BookInfo);ok {
return info,nil
}
return nil,errors.New("rsp.(*book.BookInfo)断言出错")
*/
return rsp.(*book.BookInfo), err //直接返回断言的结果
}
//通过grpc调用GetBookList时,GetBookList只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
func (s *BookServer) GetBookList(ctx context.Context, in *book.BookListParams) (*book.BookList, error) {
_, rsp, err := s.bookListHandler.ServeGRPC(ctx, in)
if err != nil {
return nil, err
}
return rsp.(*book.BookList), err
}
//创建bookList的EndPoint
func makeGetBookListEndpoint()endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
b:=new(book.BookList)
b.BookList=append(b.BookList,&book.BookInfo{BookId:1,BookName:"Go语言入门到精通"})
b.BookList=append(b.BookList,&book.BookInfo{BookId:2,BookName:"微服务入门到精通"})
b.BookList=append(b.BookList,&book.BookInfo{BookId:2,BookName:"区块链入门到精通"})
return b,nil
}
}
//创建bookInfo的EndPoint
func makeGetBookInfoEndpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
//请求详情时返回 书籍信息
req := request.(*book.BookInfoParams)
b := new(book.BookInfo)
b.BookId = req.BookId
b.BookName = "Go入门到精通"
return b, nil
}
}
func decodeRequest(_ context.Context, req interface{}) (interface{}, error) {
return req, nil
}
func encodeResponse(_ context.Context, rsp interface{}) (interface{}, error) {
return rsp, nil
}
func main() {
var (
etcdServer = "127.0.0.1:2379" //etcd服务的IP地址
prefix = "/services/book/" //服务的目录
ServerInstance = "127.0.0.1:50052" //当前实例Server的地址
key = prefix + ServerInstance //服务实例注册的路径
value = ServerInstance
ctx = context.Background()
//服务监听地址
serviceAddress = ":50052"
)
//etcd连接参数
option := etcdv3.ClientOptions{DialTimeout: time.Second * 3, DialKeepAlive: time.Second * 3}
//创建连接
client, err := etcdv3.NewClient(ctx, []string{etcdServer}, option)
if err != nil {
panic(err)
}
//创建注册
registrar := etcdv3.NewRegistrar(client, etcdv3.Service{Key: key, Value: value}, log.NewNopLogger())
registrar.Register() //启动注册服务
bookServer := new(BookServer)
bookInfoEndPoint:=makeGetBookInfoEndpoint()
//rate路径:golang.org/x/time/rate
limiter := rate.NewLimiter(rate.Every(time.Second * 3), 1) //限流3秒,临牌数:1
//通过DelayingLimiter中间件,在bookInfoEndPoint 的外层再包裹一层限流的endPoint
bookInfoEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookInfoEndPoint)
bookListHandler := grpc_transport.NewServer(
makeGetBookListEndpoint(),
decodeRequest,
encodeResponse,
)
bookServer.bookListHandler = bookListHandler
bookInfoHandler := grpc_transport.NewServer(
bookInfoEndPoint,
decodeRequest,
encodeResponse,
)
bookServer.bookInfoHandler = bookInfoHandler
listener, err := net.Listen("tcp", serviceAddress) //网络监听,注意对应的包为:"net"
if err != nil {
fmt.Println(err)
return
}
gs := grpc.NewServer(grpc.UnaryInterceptor(grpc_transport.Interceptor))
book.RegisterBookServiceServer(gs, bookServer) //调用protoc生成的代码对应的注册方法
gs.Serve(listener) //启动Server
}
2、Client端
package main
import (
"MyKit"
"context"
"fmt"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/etcdv3"
"github.com/go-kit/kit/sd/lb"
"google.golang.org/grpc"
"io"
"time"
)
func main() {
var (
//注册中心地址
etcdServer = "127.0.0.1:2379"
//监听的服务前缀
prefix = "/services/book/"
ctx = context.Background()
)
options := etcdv3.ClientOptions{
DialTimeout: time.Second * 3,
DialKeepAlive: time.Second * 3,
}
//连接注册中心
client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
if err != nil {
panic(err)
}
logger := log.NewNopLogger()
//创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据
instancer, err := etcdv3.NewInstancer(client, prefix, logger)
if err != nil {
panic(err)
}
//创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint
endpointer := sd.NewEndpointer(instancer, reqFactory, logger) //reqFactory自定义的函数,主要用于端点层(endpoint)接受并显示数据
//创建负载均衡器
balancer := lb.NewRoundRobin(endpointer)
/**
我们可以通过负载均衡器直接获取请求的endPoint,发起请求
reqEndPoint,_ := balancer.Endpoint()
*/
/**
也可以通过retry定义尝试次数进行请求
*/
reqEndPoint := lb.Retry(30, 30*time.Second, balancer) //请求次数为30,时间为30S(时间需要多于服务器限流时间3s)
//现在我们可以通过 endPoint 发起请求了
for i:=0;i<10;i++ { //发送10次请求
req := struct{}{}
ctx=context.Background()
if _, err = reqEndPoint(ctx, req); err != nil {
panic(err)
}
}
}
//通过传入的 实例地址 创建对应的请求endPoint
func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) {
return func(ctx context.Context, request interface{}) (interface{}, error) {
conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure())
if err != nil {
fmt.Println(err)
panic("connect error")
}
defer conn.Close()
bookClient := book.NewBookServiceClient(conn)
bi, _ := bookClient.GetBookInfo(context.Background(), &book.BookInfoParams{BookId: 1})
fmt.Println("获取书籍详情")
fmt.Println("bookId: 1", " => ", "bookName:", bi.BookName)
fmt.Println("请求服务成功: ", instanceAddr,"当前时间为:",time.Now().Format("2006-01-02 15:04:05.99"))
/*bl, _ := bookClient.GetBookList(context.Background(), &book.BookListParams{Page: 1, Limit: 10})
fmt.Println("获取书籍列表")
for _, b := range bl.BookList {
fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName)
}*/
return nil, nil
}, nil, nil
}
运行效果及分析
1、启动etcd
2、启动Server
3、启动Client
client端运行效果如下:
可以看到Server对Client端进行了限流,限流时间间隔为:3s。