1、概述
本周又回顾了一下Spring Cloud Gateway的内容,主要参考《重新定义Spring Cloud实战》以及一些网络博客,虽然内容都差不多,但我还想照猫画虎总结一下子,加深印象。
SpringCloud gateway是Spring Cloud生态体系的第二代网关,是基于Spring5.0、Spring Boot 2.0、Reactor等技术开发的网关。底层主要是两大核心:Spring web Filter chain和Spring WebFlux。
对两大核心不了解的可以先补补知识,第二部分分析一下gateway中如何使用的filter,以及chain的实现:
Filter :Filter是一个Servlet规范组件;一个请求可以在Http请求到达Servlet前被一个或多个Filter处理,Servlet处理完后返回给Filter,最后返回给用户。
WebFlux:它是一个异步非阻塞式的web框架,它的作用不是提升接口请求时间,而是在一些阻塞的场景【例如请求DB,等待DB响应数据、打开大文件等】,可以把线程给其它请求使用,从而提升系统吞吐量。Gateway属于网络IO密集型【网关转发请求到下游服务】,通过WebFlux有效的提升网关转发的吞吐量。
Spring Cloud Gateway核心概念:
- 路由(Route.java):由一个id,一个目标uri,一组断言工厂和一组Filter组成
public class Route implements Ordered {
private final String id;
private final URI uri;
private final int order;
private final AsyncPredicate<ServerWebExchange> predicate;
private final List<GatewayFilter> gatewayFilters;
/**省略构造函数和get set方法/
}
- 断言(AsyncPredicate):java8中的断言函数,上述代码中的predicate属性
- 过滤器:标准的Spring web Filter;SpringCloud Gateway中包含GlobalFilter和GatewayFilter。
2、ServerWebExchange
SpringCloud gateway的上下文是ServerWebExchange,请求的信息都存储在ServerWebExchange中,在网关上的后续操作都是基于上下文操作的,在http请求到达网关之后,网关入口是ReactorHttpHandlerAdapter#apply方法,去获取请求的request和response,构建当次请求的上下文供后续filter使用:
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
@Override
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
//获取请求的Request,构建ReactorServerHttpRequest
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
//构建ServerHttpResponse
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
//交给HttpWebHandlerAdapter构建上下文ServerWebExchange
return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
}
构建完request和response后,交给HttpWebHandlerAdapter构建上下文ServerWebExchange
public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
//构建请求的上下文
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
}
3、Route和RouteDefinition
我们在配置文件中配置的一个路由规则,对应到Java类就是GatewayProperties,Spring boot会将配置文件映射为Java类,例如如下配置:
spring:
cloud:
gateway:
routes:
- id: add_request_header_route
uri: http://localhost:8080
predicates:
- Path=/test/**
filters:
- AddRequestHeader=NAME, test
RouteDefinition:路由定义,是GatewayProperties类中的一个属性,网关启动后,Springboot帮我们做了映射,上述配置的路由就设置到了 GatewayProperties对象中。
Route:是从路由定义
路由信息映射到GatewayProperties后如何获取其中的RouteDefinition?
答案是通过RouteDefinitionLocator,RouteDefinitionLocator有5个实现类
- PropertiesRouteDefinitionLocator:从Properties中读取
public class PropertiesRouteDefinitionLocator implements RouteDefinitionLocator {
private final GatewayProperties properties;
//构造函数设置properties
public PropertiesRouteDefinitionLocator(GatewayProperties properties) {
this.properties = properties;
}
//从properties中读取RouteDefinition
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return Flux.fromIterable(this.properties.getRoutes());
}
}
- InMemoryRouteDefinitionRepository:对RouteDefinition进行增、删、查操作,基于内存存储
- CompositeRouteDefinitionLocator:组合的Locator,在构造函数中设置委托,将PropertiesRouteDefinitionLocator和InMemoryRouteDefinitionRepository组合。
public class CompositeRouteDefinitionLocator implements RouteDefinitionLocator {
private final Flux<RouteDefinitionLocator> delegates;
//将PropertiesRouteDefinitionLocator和InMemoryRouteDefinitionRepository组合
public CompositeRouteDefinitionLocator(Flux<RouteDefinitionLocator> delegates) {
this.delegates = delegates;
}
//委托给PropertiesRouteDefinitionLocator或InMemoryRouteDefinitionRepository执行读取
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return this.delegates.flatMap(RouteDefinitionLocator::getRouteDefinitions);
}
}
4、GlobalFilter和GatewayFilter
两种Filter都是拦截到http请求后做一些处理,gateway为什么提供两种Filter?
总结:GlobalFilter是所有被gateway拦截的http请求都要做的处理;GatewayFilter是根据路由配置匹配predicate的http请求才会做的处理。
4.1、GlobalFilter
GlobalFilter:全局拦截器,是所有被拦截到的http请求都要去做的处理;例如拿到一个http请求后,我们的目的是转发到下游服务,请求结果并返回,那么所有被拦截到的http请求都需要做下列几件事:
- 按照predicate把符合规则的url转换为真正要去请求的url
- 调用真正的下游服务【SpringCloud Gateway是基于netty实现的http调用,具体代码在NettyRoutingFilter类中】
- 拿到response,返回给调用方
像这种每个被拦截到的http请求都要去做的处理抽象出来就是一个个的GlobalFilter
接口定义
public interface GlobalFilter {
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}
接口中只有一个filter方法,实现类实现该接口后在filter中去做具体拦截逻辑,这些Filter都实现了GlobalFilter接口,一起来看一下实现该接口的类都做了什么操作:
- AdaptCachedBodyGlobalFilter:优先级最高的Filter,请求到gateway后,将上下文【ServerWebExchange】中已有的缓存删除【请求信息】,将此次的请求信息缓存到上下文中。
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 获取上下文中已有的请求缓存
Flux<DataBuffer> body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_KEY,
null);
if (body != null) {
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return body;
}
};
// 删除上下文中的请求缓存
exchange.getAttributes().remove(CACHED_REQUEST_BODY_KEY);
// 将此次请求信息添加到上下文缓存中
return chain.filter(exchange.mutate().request(decorator).build());
}
return chain.filter(exchange);
}
- GatewayMetricsFilter:统计网关的性能指标
private void endTimerInner(ServerWebExchange exchange, Sample sample) {
// 此处省略一部分代码
// ...
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
// 统计网关的性能指标
Tags tags = Tags.of("outcome", outcome, "status", status, "routeId",
route.getId(), "routeUri", route.getUri().toString());
if (log.isTraceEnabled()) {
log.trace("Stopping timer 'gateway.requests' with tags " + tags);
}
sample.stop(meterRegistry.timer("gateway.requests", tags));
}
- NettyWriteResponseFilter:基于Web Flux,若上下文中存在CLIENT_RESPONSE_CONN_ATTR,将响应数据返回。
- ForwardPathFilter:如果该请求还未被路由或scheme【URI对象的属性】不是forward,则将该请求对应配置的Route信息中uri的path设置到上下文ServerWebExchange中。
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
URI routeUri = route.getUri();
String scheme = routeUri.getScheme();
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}
// 将该请求对应配置的Route信息中uri的path设置到上下文ServerWebExchange中
exchange = exchange.mutate()
.request(exchange.getRequest().mutate().path(routeUri.getPath()).build())
.build();
return chain.filter(exchange);
}
- RouteToRequestUrlFilter:将此次请求的uri和配置的Route规则做merged处理,拿到真正代理的下游服务的地址,将得到的url放到上下文中,key为GATEWAY_REQUEST_URL_ATTR
- NoLoadBalancerClientFilter:没有负载均衡的拦截器
- WebsocketRoutingFilter:路由WebSocket请求,校验逻辑在WebsocketRoutingFilter#changeSchemeIfIsWebSocketUpgrade中。
- NettyRoutingFilter:网关的http是基于netty实现的,若此次请求scheme是http或https则使用基于netty的httpClient执行调用,将返回结果写入上下文中。
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
- ForwardRoutingFilter:设置此次请求已被路由
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}
// 设置此次请求已被路由
setAlreadyRouted(exchange);
if (log.isTraceEnabled()) {
log.trace("Forwarding to URI: " + requestUrl);
}
return this.getDispatcherHandler().handle(exchange);
}
- WebClientHttpRoutingFilter:作用同NettyRoutingFilter,方式同LoadBalancerClientFilter
- WebClientWriteResponseFilter:作用同NettyWriteResponseFilter
- LoadBalancerClientFilter:网关提供了负载均衡的Filter,具体负载规则可以自己实现
public class LoadBalancerClientFilter implements GlobalFilter, Ordered {
// 可以自己去实现
protected final LoadBalancerClient loadBalancer;
}
4.2、GatewayFilter
GatewayFilter是面向开发人员的,因需适配,当我们需要给符合predicate的url做一些处理时通过配置就可添加,例如,我们想给path匹配上/test/**的url添加header,通过下列配置就可添加,这类配置是根据业务需求进行的特殊配置。工厂较多,也比较简单,具体配置方法可参考官网 :传送带
接口定义
public interface GatewayFilter extends ShortcutConfigurable {
/**
* Name key.
*/
String NAME_KEY = "name";
/**
* Value key.
*/
String VALUE_KEY = "value";
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}
4.3、GatewayFilter对比GlobalFilter
前者接口定义中多了NAME_KEY和VALUE_KEY,原因是GatewayFilter是面向开发人员的,例如我们需要配置给path符合/test/**的请求添加header时,header是key-value形式,这时候就用到了
spring:
cloud:
gateway:
routes:
- id: add_request_header_route
uri: http://localhost:8080
predicates:
- Path=/test/**
filters:
- AddRequestHeader=NAME, test
public class AddRequestHeaderGatewayFilterFactory
extends AbstractNameValueGatewayFilterFactory {
@Override
public GatewayFilter apply(NameValueConfig config) {
return (exchange, chain) -> {
// 将要添加的key-value添加到上下文的header中
ServerHttpRequest request = exchange.getRequest().mutate()
.header(config.getName(), config.getValue()).build();
return chain.filter(exchange.mutate().request(request).build());
};
}
}
4.4、应用
上述两种Filter作用不同,是在什么时候整合的?优先级如何处理?
每个Filter中都有一个Order属性,在执行时是在FilteringWebHandler#handle方法中对GlobalFilter和GatewayFilter进行的整合和排序,具体执行在FilteringWebHandler#filter方法
/**
* 整合Filter
*/
public Mono<Void> handle(ServerWebExchange exchange) {
// 根据Route信息取出配置的GatewayFilter集合
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
// 取出globalFilters
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
// 将GatewayFilter添加到combined
combined.addAll(gatewayFilters);
// combined根据Order排优先级
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
/**
* 执行Filter
*/
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
执行过程中,可以看到排完优先级的Filter,其中红框圈出来的是我写的一个Demo,设置的一个GatewayFilter。
5、自定义Filter
5.1、自定义GlobalFilter
GlobalFilter具体的实现方式是实现接口,每个filter都实现了GlobalFilter接口
public class GlobalTestFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
if("符合业务逻辑,处理完业务逻辑,继续执行下一个filter"){
return chain.filter(exchange);
}
//不符合业务逻辑,直接返回
return "按照不符合业务逻辑处理";
}
}
5.2、自定义GatewayFilter
GatewayFilter具体的实现方式是工厂,每个工厂都继承了AbstractGatewayFilterFactory
public class TestGatewayFilterFactory extends AbstractGatewayFilterFactory<TestGatewayFilterFactory.Config> {
public TestGatewayFilterFactory() {
super(TestGatewayFilterFactory.Config.class);
}
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
if("符合条件,处理业务逻辑,继续执行下一个Filter"){
return chain.filter(exchange);
}
// 不符合条件,直接返回
return "false";
};
}
public static class Config {
private String businessAttributes;
public String getBusinessAttributes() {
return businessAttributes;
}
public void setBusinessAttributes(String businessAttributes) {
this.businessAttributes = businessAttributes;
}
}
}
6、源码解析
6.1、网关启动阶段
①:yaml文件和GatewayProperties文件映射,映射处理源码在JavaBeanBinder.BeanProperty#getValue–>CollectionBinder#merge—>Binder#bindBean
②:加载Locator Bean,为后续读取RouteDefinition做准备【GatewayAutoConfiguration】,下述代码中的Locator就是本文的第二大节介绍的
public class GatewayAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public PropertiesRouteDefinitionLocator propertiesRouteDefinitionLocator(
GatewayProperties properties) {
return new PropertiesRouteDefinitionLocator(properties);
}
@Bean
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
return new InMemoryRouteDefinitionRepository();
}
@Bean
@Primary
public RouteDefinitionLocator routeDefinitionLocator(
List<RouteDefinitionLocator> routeDefinitionLocators) {
return new CompositeRouteDefinitionLocator(
Flux.fromIterable(routeDefinitionLocators));
}
@Bean
@Primary
// TODO: property to disable composite?
public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) {
return new CachingRouteLocator(
new CompositeRouteLocator(Flux.fromIterable(routeLocators)));
}
}
③:初始化GlobalFilters【FilteringWebHandler】
public class GatewayAutoConfiguration {
@Bean
public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {
return new FilteringWebHandler(globalFilters);
}
}
public class FilteringWebHandler implements WebHandler {
private final List<GatewayFilter> globalFilters;
//构造函数中设置globalFiltersglobalFilters
public FilteringWebHandler(List<GlobalFilter> globalFilters) {
this.globalFilters = loadFilters(globalFilters);
}
//设置globalFilters
private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
return filters.stream().map(filter -> {
GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
if (filter instanceof Ordered) {
int order = ((Ordered) filter).getOrder();
return new OrderedGatewayFilter(gatewayFilter, order);
}
return gatewayFilter;
}).collect(Collectors.toList());
}
}
④:初始化predicates,gatewayFilters,getRoutes【GatewayAutoConfiguration–>RouteDefinitionRouteLocator】
public class RouteDefinitionRouteLocator
implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {
//构造函数中初始化
public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,
List<RoutePredicateFactory> predicates,
List<GatewayFilterFactory> gatewayFilterFactories,
GatewayProperties gatewayProperties, ConversionService conversionService) {
this.routeDefinitionLocator = routeDefinitionLocator;
this.conversionService = conversionService;
initFactories(predicates);
gatewayFilterFactories.forEach(
factory -> this.gatewayFilterFactories.put(factory.name(), factory));
this.gatewayProperties = gatewayProperties;
}
//设置predicate工厂
private void initFactories(List<RoutePredicateFactory> predicates) {
predicates.forEach(factory -> {
String key = factory.name();
if (this.predicates.containsKey(key)) {
this.logger.warn("A RoutePredicateFactory named " + key
+ " already exists, class: " + this.predicates.get(key)
+ ". It will be overwritten.");
}
this.predicates.put(key, factory);
if (logger.isInfoEnabled()) {
logger.info("Loaded RoutePredicateFactory [" + key + "]");
}
});
}
public Flux<Route> getRoutes() {
//从RouteDefinitions转换为Route,转换过程在convertToRoute方法中实现
return this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute)
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition matched: " + route.getId());
}
return route;
});
}
//RouteDefinition到Route的转换
private Route convertToRoute(RouteDefinition routeDefinition) {
//从routeDefinition获取predicate
AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
//从routeDefinition获取gatewayFilters
List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
//构造Route
return Route.async(routeDefinition).asyncPredicate(predicate)
.replaceFilters(gatewayFilters).build();
}
//获取GatewayFilters
private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {
List<GatewayFilter> filters = new ArrayList<>();
//如果默认filter不为空,则去加载
if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
filters.addAll(loadGatewayFilters(DEFAULT_FILTERS,
this.gatewayProperties.getDefaultFilters()));
}
//如果Filter不为空,则
if (!routeDefinition.getFilters().isEmpty()) {
filters.addAll(loadGatewayFilters(routeDefinition.getId(),
routeDefinition.getFilters()));
}
AnnotationAwareOrderComparator.sort(filters);
return filters;
}
@SuppressWarnings("unchecked")
private List<GatewayFilter> loadGatewayFilters(String id,
List<FilterDefinition> filterDefinitions) {
List<GatewayFilter> filters = filterDefinitions.stream().map(definition -> {
//从gatewayFilterFactories中根据key获取factory
GatewayFilterFactory factory = this.gatewayFilterFactories
.get(definition.getName());
if (factory == null) {
throw new IllegalArgumentException(
"Unable to find GatewayFilterFactory with name "
+ definition.getName());
}
//获取definition设置的Filter值
Map<String, String> args = definition.getArgs();
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition " + id + " applying filter " + args + " to "
+ definition.getName());
}
Map<String, Object> properties = factory.shortcutType().normalize(args,
factory, this.parser, this.beanFactory);
//每一个工厂中都有一个静态内部类Config,目的是存储我们设置的Filter值
Object configuration = factory.newConfig();
//将后几个参数的信息绑定到configuration
ConfigurationUtils.bind(configuration, properties,
factory.shortcutFieldPrefix(), definition.getName(), validator,
conversionService);
//获得GatewayFilter
GatewayFilter gatewayFilter = factory.apply(configuration);
if (this.publisher != null) {
this.publisher.publishEvent(new FilterArgsEvent(this, id, properties));
}
return gatewayFilter;
}).collect(Collectors.toList());
ArrayList<GatewayFilter> ordered = new ArrayList<>(filters.size());
for (int i = 0; i < filters.size(); i++) {
GatewayFilter gatewayFilter = filters.get(i);
if (gatewayFilter instanceof Ordered) {
ordered.add(gatewayFilter);
}
else {
ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
}
}
return ordered;
}
}
6.2、请求处理阶段
①ReactorHttpHandlerAdapter#apply方法是请求到网关执行的入口
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
//获取请求的request和response
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
//给到HttpWebHandlerAdapter执行构建
return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
}
②HttpWebHandlerAdapter#handle构建网关上下文ServerWebExchange
public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
//根据请求的request、response构建网关上下文
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
}
③DispatcherHandler用于Http请求处理器/控制器的中央分发处理器,把请求分发给已经注册的处理程序处理,DispatcherHandler遍历Mapping获取对应的handler,网关一共有6个handlerMapping【此处会找到RoutePredicateHandlerMapping,通过RoutePredicateHandlerMapping获取FilteringWebHandler,通过FilteringWebHandler获取】
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
//遍历mapping获取handler
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
}
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
private final FilteringWebHandler webHandler;
private final RouteLocator routeLocator;
private final Integer managementPort;
private final ManagementPortType managementPortType;
//网关启动时进行了初始化
public RoutePredicateHandlerMapping(FilteringWebHandler webHandler,
RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties,
Environment environment) {
this.webHandler = webHandler;
this.routeLocator = routeLocator;
this.managementPort = getPortProperty(environment, "management.server.");
this.managementPortType = getManagementPortType(environment);
setOrder(1);
setCorsConfigurations(globalCorsProperties.getCorsConfigurations());
}
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug(
"Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
//返回FilteringWebHandler
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for ["
+ getExchangeDesc(exchange) + "]");
}
})));
}
}
④ RoutePredicateHandlerMapping#lookupRoute匹配路由,根据routeLocator获取我们在配置我文件中配置的Route,和当前请求的路由做匹配
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
//routeLocator获取我们在配置我文件中配置的Route
return this.routeLocator.getRoutes()
.concatMap(route -> Mono.just(route).filterWhen(r -> {
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
//当前请求的路由做匹配
return r.getPredicate().apply(exchange);
})
.doOnError(e -> logger.error(
"Error applying predicate for route: " + route.getId(),
e))
.onErrorResume(e -> Mono.empty()))
.next()
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});
}
}
⑤FilteringWebHandler创建过滤器链,执行过滤器
public class FilteringWebHandler implements WebHandler {
//创建过滤器链
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
//调用过滤器
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
//执行调用
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
}
}
版权声明:「DDKK.COM 弟弟快看,程序员编程资料站」本站文章,版权归原作者所有