errgroup = error + sync.WaitGroup
不, 还有不可忽略的 context.
一. 回顾与分析
上一篇, 简单入门了 gRPC, 并手动实践了 gRPC 服务.
但还是有些瑕疵, 不够优雅, 主要体现在以下方面:
在实际的开发当中, 我们总是希望能够控制服务和 goroutine 的生命周期, 并且在结束的时候做 cleanup 操作.
-
我们在程序中, 使用了
log.Fatal
这个函数, 它的源码是std.Output(2, fmt.Sprintf(format, v...)); os.Exit(1)
, 而执行os.Exit(1)
会导致整个程序直接退出, 不会执行到defer
语句.这意味着使用了
defer
语句的 (如第三方包), 导致很多资源不能够以正确的姿势 cleanup, 比如数据库的长连接. -
error 和 log 的处理有待改进. 我们出现了大量的
if err != nil { log.Fatal(...) }
这样的代码.在实际的开发中, 服务大多都是异步执行的, 如果分散的打日志, 就会这里一坨那里一坨的, 日志记录一多, 就很难定位一坨异常的堆栈上下文是什么.
二. 改进
基于以上的分析, 下面将一步步改进
1. 服务端
proto
是通过编译 proto 文件生成的包, 自定义为包名为 proto.
定义了 server
结构体, 添加了 *grpc.Server
成员, 除了实现 proto 中定义的 SayHello
接口外, 还实现了 up
和 down
两个方法. 功能分别是起 gRPC 服务和 shutdown 服务.
对错误的处理, 使用 errors.Wrap
替代了 log.Fatal
.
type server struct {
proto.UnimplementedGreeterServer
rpcSrv *grpc.Server
}
func (s *server) SayHello(ctx context.Context, in *proto.HelloRequest) (*proto.HelloReply, error) {
log.Println("Received:", in.GetName())
return &proto.HelloReply{
Message: in.GetName() + " say hello for gRPC.",
}, nil
}
func (s *server) up() error {
lis, err := net.Listen("tcp", port)
if err != nil {
// wrap error, 不输出日志
// 添加错误信息, 在查看日志时, 好定位错误
return errors.Wrap(err, "failed to register on "+port)
}
// 赋值实例, 方便在 shutdown 服务的时候使用实例
// shutdown 如: 通过接收道德 signal shutdown
s.rpcSrv = grpc.NewServer()
proto.RegisterGreeterServer(s.rpcSrv, s)
log.Printf("serve %s is running ...", port)
if err := s.rpcSrv.Serve(lis); err != nil {
return errors.Wrap(err, "gRPC serve occured a little wrongs")
}
log.Printf("serve %s was stopped", port)
return nil
}
func (s *server) down() error {
if s.rpcSrv != nil {
s.rpcSrv.GracefulStop()
}
return nil
}
使用 signal.Notify
对 Linux 信号进行监听, 通过信号方便我们处理要处理的事. 比如日志输出、shutdown 服务.
var (
signalChan = make(chan os.Signal)
port = ":8080"
)
func init() {
// 监听 linux signal
// 用于 shutdown 服务
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
flag.StringVar(&port, "port", ":8080", "the gRPC server listen on a port")
log.Println("listening signals ...")
}
最后使用 errgroup 来并行执行 goroutine.
为什么这里使用 errgroup, 而不直接使用 sync.WaitGroup
呢? 因为 errgroup 内置了 sync.WaitGroup
, 而且还有很好的 error 处理机制, 以及 context.WithCancel
和 goroutine 的生命周期管理等.
func main() {
flag.Parse()
// 这里使用 context.WithCancel 结束所有 goroutine 的生命周期
rootCtx, cancel := context.WithCancel(context.Background())
var eg, ctx = errgroup.WithContext(rootCtx)
srv := &server{}
// 先起 shutdown 的 goroutine,
// 以免服务还没起来, 就收到 shutdown 等命令或错误.
eg.Go(func() error {
<-ctx.Done()
return srv.down()
})
eg.Go(srv.up)
eg.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case s := <-signalChan:
switch s {
case syscall.SIGINT, syscall.SIGTERM:
log.Printf("Received signal: %v, shutdown ...", s)
cancel()
default:
log.Println("Undefined signal:", s)
}
}
}
})
// 在这里统一把错误输出
if err := eg.Wait(); err != nil && err != context.Canceled {
log.Printf("happened a little wrongs: %+v", err.Error())
return
}
log.Println("graceful shutdown gRPC server")
}
执行和 shutdown 日志:
2021/05/12 16:12:03 listening signals ...
2021/05/12 16:12:03 serve :8080 is running ...
2021/05/12 16:12:19 Received: aaaa
^C2021/05/12 16:12:24 Received signal: interrupt, shutdown ...
2021/05/12 16:12:24 serve :8080 was stopped
2021/05/12 16:12:24 graceful shutdown gRPC server
2. 客户端
这里也主要对 error 处理进行改进, 然后做一下拆分. 如果理解了上面的服务端部分, 这一部分可以忽略.
所以直接上代码:
func sender(c proto.GreeterClient, req *proto.HelloRequest) error {
// 3 秒超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
r, err := c.SayHello(ctx, req)
if err != nil {
return errors.Wrap(err, "could not greet")
}
log.Printf("Greeting: %s", r.GetMessage())
return nil
}
var (
addr = "localhost:8080"
defaultName = "fango"
conn *grpc.ClientConn
)
func init() {
flag.StringVar(&addr, "addr", "localhost:8080", "server address")
}
func NewClient(address string) proto.GreeterClient {
var err error
conn, err = grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
// 这里是程序执行的第一步, 相当于执行 init 函数.
// 可以使用 log.Fatal, Panic.
log.Fatalf("did not connect: %v", err)
}
return proto.NewGreeterClient(conn)
}
func main() {
flag.Parse()
client := NewClient(addr)
defer conn.Close()
name := defaultName
if len(os.Args) > 1 {
// 通过命令行获取发送的内容
name = os.Args[1]
}
if err := sender(client, &proto.HelloRequest{Name: name}); err != nil {
log.Println(err)
}
}
go.mod
没有多大改变, 新增的包大多都是标准库的, 所以这里不赘述, 也可以参考上一篇: gRPC 简介与实践
不足之处, 还望各位多多指教