当前位置: 首页>编程语言>正文

Go 基于 errgroup 优雅的启动和关停服务

errgroup = error + sync.WaitGroup

不, 还有不可忽略的 context.

一. 回顾与分析

上一篇, 简单入门了 gRPC, 并手动实践了 gRPC 服务.

但还是有些瑕疵, 不够优雅, 主要体现在以下方面:

  1. 在实际的开发当中, 我们总是希望能够控制服务和 goroutine 的生命周期, 并且在结束的时候做 cleanup 操作.

  2. 我们在程序中, 使用了 log.Fatal 这个函数, 它的源码是 std.Output(2, fmt.Sprintf(format, v...)); os.Exit(1) , 而执行 os.Exit(1) 会导致整个程序直接退出, 不会执行到 defer 语句.

    这意味着使用了 defer 语句的 (如第三方包), 导致很多资源不能够以正确的姿势 cleanup, 比如数据库的长连接.

  3. error 和 log 的处理有待改进. 我们出现了大量的 if err != nil { log.Fatal(...) } 这样的代码.

    在实际的开发中, 服务大多都是异步执行的, 如果分散的打日志, 就会这里一坨那里一坨的, 日志记录一多, 就很难定位一坨异常的堆栈上下文是什么.

二. 改进

基于以上的分析, 下面将一步步改进

1. 服务端

proto 是通过编译 proto 文件生成的包, 自定义为包名为 proto.

定义了 server 结构体, 添加了 *grpc.Server 成员, 除了实现 proto 中定义的 SayHello 接口外, 还实现了 updown 两个方法. 功能分别是起 gRPC 服务和 shutdown 服务.

对错误的处理, 使用 errors.Wrap 替代了 log.Fatal .

type server struct {
    proto.UnimplementedGreeterServer

    rpcSrv *grpc.Server
}

func (s *server) SayHello(ctx context.Context, in *proto.HelloRequest) (*proto.HelloReply, error) {
    log.Println("Received:", in.GetName())
    return &proto.HelloReply{
        Message: in.GetName() + " say hello for gRPC.",
    }, nil
}

func (s *server) up() error {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        // wrap error, 不输出日志
        // 添加错误信息, 在查看日志时, 好定位错误
        return errors.Wrap(err, "failed to register on "+port)
    }

    // 赋值实例, 方便在 shutdown 服务的时候使用实例
    // shutdown 如: 通过接收道德 signal shutdown
    s.rpcSrv = grpc.NewServer()
    proto.RegisterGreeterServer(s.rpcSrv, s)

    log.Printf("serve %s is running ...", port)
    if err := s.rpcSrv.Serve(lis); err != nil {
        return errors.Wrap(err, "gRPC serve occured a little wrongs")
    }
    log.Printf("serve %s was stopped", port)
    return nil
}

func (s *server) down() error {
    if s.rpcSrv != nil {
        s.rpcSrv.GracefulStop()
    }
    return nil
}

使用 signal.Notify 对 Linux 信号进行监听, 通过信号方便我们处理要处理的事. 比如日志输出、shutdown 服务.

var (
    signalChan = make(chan os.Signal)
    port       = ":8080"
)

func init() {
    // 监听 linux signal
    // 用于 shutdown 服务
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
    flag.StringVar(&port, "port", ":8080", "the gRPC server listen on a port")
    log.Println("listening signals ...")
}

最后使用 errgroup 来并行执行 goroutine.

为什么这里使用 errgroup, 而不直接使用 sync.WaitGroup 呢? 因为 errgroup 内置了 sync.WaitGroup, 而且还有很好的 error 处理机制, 以及 context.WithCancel 和 goroutine 的生命周期管理等.

func main() {
    flag.Parse()

    // 这里使用 context.WithCancel 结束所有 goroutine 的生命周期
    rootCtx, cancel := context.WithCancel(context.Background())
    var eg, ctx = errgroup.WithContext(rootCtx)
    srv := &server{}

    // 先起 shutdown 的 goroutine,
    // 以免服务还没起来, 就收到 shutdown 等命令或错误.
    eg.Go(func() error {
        <-ctx.Done()
        return srv.down()
    })
    eg.Go(srv.up)
    eg.Go(func() error {
        for {
            select {
            case <-ctx.Done():
                return ctx.Err()
            case s := <-signalChan:
                switch s {
                case syscall.SIGINT, syscall.SIGTERM:
                    log.Printf("Received signal: %v, shutdown ...", s)
                    cancel()
                default:
                    log.Println("Undefined signal:", s)
                }
            }
        }
    })

    // 在这里统一把错误输出
    if err := eg.Wait(); err != nil && err != context.Canceled {
        log.Printf("happened a little wrongs: %+v", err.Error())
        return
    }
    log.Println("graceful shutdown gRPC server")
}

执行和 shutdown 日志:

2021/05/12 16:12:03 listening signals ...
2021/05/12 16:12:03 serve :8080 is running ...
2021/05/12 16:12:19 Received: aaaa
^C2021/05/12 16:12:24 Received signal: interrupt, shutdown ...
2021/05/12 16:12:24 serve :8080 was stopped
2021/05/12 16:12:24 graceful shutdown gRPC server

2. 客户端

这里也主要对 error 处理进行改进, 然后做一下拆分. 如果理解了上面的服务端部分, 这一部分可以忽略.

所以直接上代码:

func sender(c proto.GreeterClient, req *proto.HelloRequest) error {
    // 3 秒超时
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    r, err := c.SayHello(ctx, req)
    if err != nil {
        return errors.Wrap(err, "could not greet")
    }
    log.Printf("Greeting: %s", r.GetMessage())
    return nil
}

var (
    addr        = "localhost:8080"
    defaultName = "fango"
    conn        *grpc.ClientConn
)

func init() {
    flag.StringVar(&addr, "addr", "localhost:8080", "server address")
}

func NewClient(address string) proto.GreeterClient {
    var err error
    conn, err = grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
    if err != nil {
        // 这里是程序执行的第一步, 相当于执行 init 函数.
        // 可以使用 log.Fatal, Panic.
        log.Fatalf("did not connect: %v", err)
    }
    return proto.NewGreeterClient(conn)
}

func main() {
    flag.Parse()
    client := NewClient(addr)
    defer conn.Close()

    name := defaultName
    if len(os.Args) > 1 {
        // 通过命令行获取发送的内容
        name = os.Args[1]
    }
    if err := sender(client, &proto.HelloRequest{Name: name}); err != nil {
        log.Println(err)
    }
}

go.mod 没有多大改变, 新增的包大多都是标准库的, 所以这里不赘述, 也可以参考上一篇: gRPC 简介与实践

不足之处, 还望各位多多指教


https://www.xamrdz.com/lan/5q82016141.html

相关文章: