当前位置: 首页>编程语言>正文

dubbo停机慢 dubbo优雅停机实现原理

这里我们使用的是Apache dubbo官方提供的starter[0.1.0版本]。其使用的dubbo版本是2.5.10。我们将通过源码分析一下dubbo 的优雅停机是如何实现的。

引入Apache dubbo starter

<!-- 注意这里要引入zkclient,否则会找不到zkclient jar包 -->
<dependency>
	<groupId>com.github.sgroschupf</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.1</version>
</dependency>
<dependency>
    <groupId>com.alibaba.boot</groupId>
    <artifactId>dubbo-spring-boot-starter</artifactId>
    <version>0.1.0</version>
</dependency>

dubbo停机慢 dubbo优雅停机实现原理,dubbo停机慢 dubbo优雅停机实现原理_Boo,第1张

官方优雅停机解释

Dubbo 是通过 JDK 的 ShutdownHook 来完成优雅停机的,所以如果用户使用 kill -9 PID 等强制关闭指令,是不会执行优雅停机的,只有通过 kill PID 时,才会执行。具体请参考官方文档: 优雅停机

源码分析

注册shutdownhook

dubbo在com.alibaba.dubbo.config.AbstractConfig中通过静态初始化块注册shutdownhook

static {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                if (logger.isInfoEnabled()) {
                    logger.info("Run shutdown hook now.");
                }
                ProtocolConfig.destroyAll();
            }
        }, "DubboShutdownHook"));
    }

我们进入到ProtocolConfig.destroyAll()

public static void destroyAll() {
        if (!destroyed.compareAndSet(false, true)) {
            return;
        }
        //1. 关闭注册中心
        //具体实现见zkClient.close():com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#destroy
        AbstractRegistryFactory.destroyAll();

        //2。 Wait for registry notification
        //这一句是新版dubbo的关键改动之处。老版本没有这几行sleep的代码【请读者自行阅读老版本的源码】。默认10秒,可以通过 [dubbo.service.shutdown.wait] 配置
        //
        try {
            Thread.sleep(ConfigUtils.getServerShutdownTimeout());
        } catch (InterruptedException e) {
            logger.warn("Interrupted unexpectedly when waiting for registry notification during shutdown process!");
        }

		//3. 销毁所有的protocol
        ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
        for (String protocolName : loader.getLoadedExtensions()) {
            try {
                Protocol protocol = loader.getLoadedExtension(protocolName);
                if (protocol != null) {
                    protocol.destroy();
                }
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }

上面第一步通过AbstractRegistryFactory.destroyAll()来“注销”在所有注册中心注册的服务,通过调用ZkClient客户端的zkClient.close()关闭ZK长连接。这样服务消费者就看不到已经被注销的服务了。当然这是理想情况。毕竟从服务提供者注销自己,到消费者发现提供者不可用中间存在一定的时间差。
第二步是等待一定时间。原因后面会讲。
步骤三是关闭自己暴露的服务和自己对下游服务的调用。假设我们使用的是dubbo协议,protocol.destroy()其实会调用com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#destroy方法:

public void destroy() {
        for (String key : new ArrayList<String>(serverMap.keySet())) {
            ExchangeServer server = serverMap.remove(key);
            if (server != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Close dubbo server: " + server.getLocalAddress());
                    }
                    //优雅的关闭提供的服务
                    server.close(ConfigUtils.getServerShutdownTimeout());
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }

        for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
            ExchangeClient client = referenceClientMap.remove(key);
            if (client != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
                    }
                    //优雅的关闭对下游服务的调用
                    client.close(ConfigUtils.getServerShutdownTimeout());
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }

        //关闭幽灵链接
        for (String key : new ArrayList<String>(ghostClientMap.keySet())) {
            ExchangeClient client = ghostClientMap.remove(key);
            if (client != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
                    }
                    client.close(ConfigUtils.getServerShutdownTimeout());
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
        stubServiceMethodsMap.clear();
        super.destroy();
    }

上面方法中优先关闭自身对外提供的服务,然后关闭外部的引用,最后关闭幽灵链接(ghostClient)。很容易理解,如果先关闭外部的引用链接,而自身的链接没有关闭,那么就会出现链接不可用而报错的情况。

关闭自身dubbo服务,分析一下com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeServer#close(int)的代码:

public void close(final int timeout) {
        startClose();//标记进入关闭流程
        if (timeout > 0) {
            final long max = (long) timeout;
            final long start = System.currentTimeMillis();
            //将channel标记为只读。广播 READONLY 事件给所有 Consumer 们,告诉它们不要在调用我了!!!目的是如果此处注册中心挂掉的情况,依然能达到告诉 Consumer ,我要下线了的功能。
            if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
                sendChannelReadOnlyEvent();
            }
            //等待所有正在进行中的调用都执行完成,或者达到了超时时间
            while (HeaderExchangeServer.this.isRunning()
                    && System.currentTimeMillis() - start < max) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        doClose();//关闭心跳检测等
        server.close(timeout);//关闭真正的netty的通信通道,如果你是用的netty的话
    }

那么dubbo是如何判断是否还有调用执行中的任何呢?这就要去看com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#CHANNELS中是如何实现的了。当发起一个请求时,会在CHANNELS中记录当前请求的id和channel的键值对;当有获得返回或者取消请求的时候,将该键值对从CHANNELS中删除。所以只要CHANNELS不为空,则表明有请求还未执行完毕。接下来看doClose方法:

private void doClose() {
//线程安全的标记已关闭状态
        if (!closed.compareAndSet(false, true)) {
            return;
        }
        //停止心跳检测
        stopHeartbeatTimer();
        try {
        //停止心跳检测的定时任务
            scheduled.shutdown();
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }

上面的代码主要是标记closed=true,并且关闭了心跳检测,这样一来,就不会处理新的请求了,注册中心也检测不到该服务。而server.close(timeout)则主要是关闭netty的通信通道channel,可以参看AbstractServer#closeNettyServer#doClose方法。

现在看一下当前服务作为客户端时,referenceClient的关闭过程 :com.alibaba.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient#close(int)

public void close(int timeout) {
        if (refenceCount.decrementAndGet() <= 0) {
            if (timeout == 0) {
                client.close();
            } else {
                client.close(timeout);
            }
            client = replaceWithLazyClient();
        }
    }

这里timeout是服务器停止的等待时间,用于优雅停机。进入
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient#close(int)

public void close(int timeout) {
        // 标记client进入关闭流程.具体代码见com.alibaba.dubbo.remoting.transport.AbstractPeer#startClose
        startClose();
        doClose();//停止心跳检测。具体实现见com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient#stopHeartbeatTimer
        channel.close(timeout);
    }

跟进channel.close方法com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#close(int)

//优雅关闭channel。这里会一直等待channel的关闭,直到超时强制关闭
    public void close(int timeout) {
        if (closed) {
            return;
        }
        closed = true;
        if (timeout > 0) {
            long start = System.currentTimeMillis();
            while (DefaultFuture.hasFuture(channel)
                    && System.currentTimeMillis() - start < timeout) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        close();
    }

那么什么是幽灵链接(ghostClient)呢?在获取链接的时候,如果链接为空或者已经被关闭了,那么就会创建ghostClientcom.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#getSharedClient

/**
     * Get shared connection
     */
    private ExchangeClient getSharedClient(URL url) {
        String key = url.getAddress();
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if (client != null) {
            if (!client.isClosed()) {
                client.incrementAndGetCount();
                return client;
            } else {
                referenceClientMap.remove(key);
            }
        }
        synchronized (key.intern()) {
            ExchangeClient exchangeClient = initClient(url);
            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
            referenceClientMap.put(key, client);
            ghostClientMap.remove(key);
            return client;
        }
    }

我们进入ReferenceCountExchangeClient:

public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
        this.client = client;
        refenceCount.incrementAndGet();
        this.url = client.getUrl();
        if (ghostClientMap == null) {
            throw new IllegalStateException("ghostClientMap can not be null, url: " + url);
        }
        this.ghostClientMap = ghostClientMap;
    }

去查找类ReferenceCountExchangeClientghostClientMap的赋值代码:

//幽灵client,
    private LazyConnectExchangeClient replaceWithLazyClient() {
        //这个操作只为了防止程序bug错误关闭client做的防御措施,初始client必须为false状态
        URL lazyUrl = url.addParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.FALSE)
                .addParameter(Constants.RECONNECT_KEY, Boolean.FALSE)
                .addParameter(Constants.SEND_RECONNECT_KEY, Boolean.TRUE.toString())
                .addParameter("warning", Boolean.TRUE.toString())
                .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true)
                .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient");

        String key = url.getAddress();
        //最差情况下只有一个幽灵连接
        LazyConnectExchangeClient gclient = ghostClientMap.get(key);
        if (gclient == null || gclient.isClosed()) {
            gclient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler());
            ghostClientMap.put(key, gclient);
        }
        return gclient;
    }

现在来说一下2.5.10版本最新的改动,用于实现更良好的优雅停机,也就是上面最开始ProtocolConfig.destroyAll()种关闭注册中心后的那一段等待时间。
因为provider从注册中心撤销服务和上游consumer将其服务从服务列表中删除并不是原子操作,可能导致上游consumer的服务列表还未更新完成,我们的provider这时发现当前没有进行中的调用就立马关闭服务暴露,导致上游consumer调用该服务失败。所以,dubbo默认的这种优雅停机方案,需要建立在上游consumer有重试机制的基础之上,但由于consumer增加重试特性会增加故障时的雪崩风险,所以大多数分布式服务不愿意增加服务内部之间的重试机制。其实dubbo.service.shutdown.wait的值主要是为了防止优雅停机时的无限等待,即限制等待上限,我们也应该用一个参数来设置等待下限,这样整个分布式系统几乎不需要通过重试来保证优雅停机,只需要给与上游consumer少许时间,让他们足够有机会更新完provider的列表就行,这就是ProtocolConfig.destroyAll()方法中sleep的原因。

总结

从整个微服务的调用链路的最上层,依次向下停止服务,并且每个步骤都需要有一个停止等待的过程,等待资源的释放或者依赖服务下线。

  1. 首先,从注册中心中取消注册自己,从而使消费者不要再拉取到它。
  2. 然后,sleep 10 秒( 可配 ),等到服务消费,接收到注册中心通知到该服务提供者已经下线,加大了在不重试情况下优雅停机的成功率。
  3. 优先关闭自身对外提供的服务,然后关闭外部的引用,最后关闭幽灵链接(ghostClient)。【下面4-8步,是第3步的细分】
  4. sendChannelReadOnlyEvent():将channel标记为只读, 内部实现是广播 READONLY 事件给所有 Consumer 们,告诉它们不要在调用我了!!!目的是如果此处注册中心挂掉的情况,依然能达到告诉 Consumer ,我要下线了的功能。
  5. sleep 10 毫秒,保证 Consumer 们,尽可能接收到该消息。
  6. 关闭心跳检测,不接收新请求,注册中心也检测不到该服务。
  7. 检测线程池中的线程是否正在运行,如果有,等待所有线程执行完成,除非超时,则强制关闭。
  8. 关闭真正的netty的通信通道,如果你是用的netty的话。


https://www.xamrdz.com/lan/5pc1962515.html

相关文章: