当前位置: 首页>后端>正文

spring-cloud-gateway 处理请求的时候触发路由加载

环境

spring-cloud-gateway 版本

<dependency>
     <groupId>org.springframework.cloud</groupId>
     <artifactId>spring-cloud-starter-gateway</artifactId>
      <version>2.1.2.RELEASE</version>
</dependency>

现象

监控发现某些请求响应时间特别长,查看链路追踪,发现请求触发了路由加载,由于使用自定义MySqlRouteDefinitionRepository从数据库里获取路由,导致了数据库负载特别高。

请求查找路由堆栈

org.springframework.web.reactive.handler.AbstractHandlerMapping#getHandler
org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping#getHandlerInternal
org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping#lookupRoute
org.springframework.cloud.gateway.route.CachingRouteLocator#getRoutes
org.springframework.cloud.gateway.route.CompositeRouteLocator#getRoutes
org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator#getRoutes
org.springframework.cloud.gateway.route.CompositeRouteDefinitionLocator#getRouteDefinitions

由请求堆栈可以看到中间使用CachingRouteLocator对路由进行了缓存

CachingRouteLocator 源码

spring-cloud-gateway 2.1.2.RELEASE 版本

当收到RefreshRoutesEvent事件时,会调用refresh()方法清除缓存,导致了请求的时候也可能遇上缓存为空而导致触发加载路由。

由于当前项目使用了nacos,当发生心跳的时候会触发 RefreshRoutesEvent 导致清除缓存,刷新路由。细节可查看我之前的文章spring-cloud-gateway+nacos 每30s 加载路由一次

此时缓存清空了,刚好收到请求也会触发路由加载。

public class CachingRouteLocator
        implements RouteLocator, ApplicationListener<RefreshRoutesEvent> {

    private final RouteLocator delegate;

    private final Flux<Route> routes;

    private final Map<String, List> cache = new HashMap<>();

    public CachingRouteLocator(RouteLocator delegate) {
        this.delegate = delegate;
        routes = CacheFlux.lookup(cache, "routes", Route.class)
                .onCacheMissResume(() -> this.delegate.getRoutes()
                        .sort(AnnotationAwareOrderComparator.INSTANCE));
    }

    @Override
    public Flux<Route> getRoutes() {
        return this.routes;
    }

    /**
     * Clears the routes cache.
     * @return routes flux
     */
    public Flux<Route> refresh() {
        this.cache.clear();
        return this.routes;
    }

    @Override
    public void onApplicationEvent(RefreshRoutesEvent event) {
        refresh();
    }

    @Deprecated
    /* for testing */ void handleRefresh() {
        refresh();
    }

}

spring-cloud-gateway 4.0.7版本

可以看到收到RefreshRoutesEvent事件时,没有清除缓存,那么路由会直接从缓存中获取,不会触发路由加载。

package org.springframework.cloud.gateway.route;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.cache.CacheFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
import org.springframework.cloud.gateway.event.RefreshRoutesResultEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.ApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;

/**
 * @author Spencer Gibb
 */
public class CachingRouteLocator
        implements Ordered, RouteLocator, ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware {

    private static final Log log = LogFactory.getLog(CachingRouteLocator.class);

    private static final String CACHE_KEY = "routes";

    private final RouteLocator delegate;

    private final Flux<Route> routes;

    private final Map<String, List> cache = new ConcurrentHashMap<>();

    private ApplicationEventPublisher applicationEventPublisher;

    public CachingRouteLocator(RouteLocator delegate) {
        this.delegate = delegate;
        routes = CacheFlux.lookup(cache, CACHE_KEY, Route.class).onCacheMissResume(this::fetch);
    }

    private Flux<Route> fetch() {
        return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
    }

    private Flux<Route> fetch(Map<String, Object> metadata) {
        return this.delegate.getRoutesByMetadata(metadata).sort(AnnotationAwareOrderComparator.INSTANCE);
    }

    @Override
    public Flux<Route> getRoutes() {
        return this.routes;
    }

    /**
     * Clears the routes cache.
     * @return routes flux
     */
    public Flux<Route> refresh() {
        this.cache.remove(CACHE_KEY);
        return this.routes;
    }

    @Override
    public void onApplicationEvent(RefreshRoutesEvent event) {
        try {
            if (this.cache.containsKey(CACHE_KEY) && event.isScoped()) {
                final Mono<List<Route>> scopedRoutes = fetch(event.getMetadata()).collect(Collectors.toList())
                        .onErrorResume(s -> Mono.just(List.of()));

                scopedRoutes.subscribe(scopedRoutesList -> {
                    Flux.concat(Flux.fromIterable(scopedRoutesList), getNonScopedRoutes(event)).materialize()
                            .collect(Collectors.toList()).subscribe(signals -> {
                                applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this));
                                cache.put(CACHE_KEY, signals);
                            }, this::handleRefreshError);
                }, this::handleRefreshError);
            }
            else {
                final Mono<List<Route>> allRoutes = fetch().collect(Collectors.toList());

                allRoutes.subscribe(list -> Flux.fromIterable(list).materialize().collect(Collectors.toList())
                        .subscribe(signals -> {
                            applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this));
                            cache.put(CACHE_KEY, signals);
                        }, this::handleRefreshError), this::handleRefreshError);
            }
        }
        catch (Throwable e) {
            handleRefreshError(e);
        }
    }

    private Flux<Route> getNonScopedRoutes(RefreshRoutesEvent scopedEvent) {
        return this.getRoutes()
                .filter(route -> !RouteLocator.matchMetadata(route.getMetadata(), scopedEvent.getMetadata()));
    }

    private void handleRefreshError(Throwable throwable) {
        if (log.isErrorEnabled()) {
            log.error("Refresh routes error !!!", throwable);
        }
        applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this, throwable));
    }

    @Override
    public int getOrder() {
        return 0;
    }

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

}

参考资料

csdn Spring Cloud Gateway 之踩坑日记


https://www.xamrdz.com/backend/3cp1920983.html

相关文章: