当前位置: 首页>编程语言>正文

Tomcat——Tomcat处理请求流程

前言

tomcat从架构上看,包含Service,Engine,Host,Context,Wrapper。那么,当用户发起一个请求时,tomcat是如何将url映射到具体的Wrapper上的呢?

Mapper组件机制

Tomcat 设计了 Mapper(映射)组件 完成 url和Host、Context、Wrapper等组件容器的映射。

Mapper组件的核心功能是提供请求路径的路由映射,根据某个请求路径通过计算得到相应的Servlet(Wrapper)。

与url到Wrapper映射相关的类位于org.apache.catalina.mapper包下,包含四个类:

  • Mapper:映射关系最核心的、最重要的类。完成url与Host,Context,Wrapper映射关系的初始化、变更、存储及映射
  • MapperListener:实现了ContainerListener与 LifecycleListener接口,监听tomcat组件的变化,当有Host,Context及Wrapper变更时,调用Mapper相关方法,增加或者删除Host,Context,Wrapper等。
  • MappingData:url映射后的数据,表示一个url具体映射到哪个host,哪个context,哪个wrapper上。
  • WrapperMappingInfo:表示一个Wrapper的信息,是一个普通的类,不太重要。

Mapper主要功能是完成url到Wrapper的映射,有三个主要的功能

  • 1、映射关系存储:存储所有的Host,context及Wrapper的对应关系;
  • 2、映射关系初始化及变更:当新增一个组件或者移除一个组件时,mapper如何维护url到Wrapper的映射关系;
  • 3、映射关系使用:根据url,映射到具体的host,context和wrapper。

具体流程

Tomcat——Tomcat处理请求流程,第1张

请求连接和协议解析

  • Connector组件的Endpoint中的Acceptor监听客户端套接字连接并接收Socket
  • 将连接交给线程池Exectuor处理,开始执行请求响应任务
  • Processor组件读取到消息报文,解析请求行、请求头、请求体,封装成Request对象

请求路由和处理

  • Mapper组件根据请求行的URL值和请求头的Host值匹配由那个host、context、wrapper容器进行处理
  • CoyoteAdaptor组件负责将Connector组件和Engine容器适配关联,把生成的Request对象和响应的Response对象传递给Engine容器,并调用Pipeline
  • Engine容器的管道开始处理,管道中包含若干个Valve,每个Valve负责部分处理逻辑,执行完Valve后会执行基础的Valve – StandardEngineValve,负责调用Host容器的Pipeline
  • Host容器的管道开始同上流程处理,最后执行Content容器的Pipeline
  • Content容器的管道开始同上流程处理,最后执行Wrapper容器的Pipeline
  • Wrapper容器的管道开始同上流程处理
  • 最后执行Wrapper容器对应的Servlet对象的处理方法

总体结论

一个Service有一个Engine,而一个Engine中有一个Mapper。根据Engine,Host,Context及Wrapper的对应关系,易得到以下的结论。

  • 一个Mapper中,应该保存有多个Host对象(的确是这样的,每个Host对象称之为MappedHost,多个MappedHost以数组形式组合,各元素通过其name进行排序)

  • 一个Host对象包含多个context(Context在Mapper中定义为MappedContext,也是通过数组形式存在,并且元素根据MappedContext的命名排序。但是与组件不同的是,每一个Context可以有多个版本,因此每一个MappedContext 包含了多个ContextVersion,每一个MappedContext下的多个ContextVersion表示同一个Context的多个版本)

  • 一个Context包含多个Wrapper(此处的Context在Mapper中为ContextVersion,包含多个Wrapper,这些Wrapper分成四类,精确匹配的Wrapper,前缀匹配的Wrapper,扩展名匹配的Wrapper,默认的Wrapper,在Mapper中,每一个Wrapper都通过一个MappedWrapper表示)

因此,Mapper的构成可以用下图表示 Tomcat——Tomcat处理请求流程,第2张

当一个请求到来时,Mapper组件通过解析请求URL里的域名和路径,再到自己保存的 Map里去查找,就能定位到一个Servlet。请你注意,一个请求URL最后只会定位到一个 Wrapper容器,也就是一个Servlet。

从Tomcat的设计架构层面来分析Tomcat的请求处理。

Tomcat——Tomcat处理请求流程,第3张

步骤如下:

  • 1、Connector组件Endpoint中的Acceptor监听客户端套接字连接并接收Socket。
  • 2、将连接交给线程池Executor处理,开始执行请求响应任务。
  • 3、Processor组件读取消息报文,解析请求行、请求体、请求头,封装成Request对象。
  • 4、Mapper组件根据请求行的URL值和请求头的Host值匹配由哪个Host容器、Context容器、Wrapper容器处理请求。
  • 5、CoyoteAdaptor组件负责将Connector组件和Engine容器关联起来,把生成的 Request对象和响应对象Response传递到Engine容器中,调用 Pipeline。
  • 6、Engine容器的管道开始处理,管道中包含若干个Valve、每个Valve负责部分处理逻 辑。执行完Valve后会执行基础的 Valve--StandardEngineValve,负责调用Host容器的 Pipeline。
  • 7、Host容器的管道开始处理,流程类似,最后执行 Context容器的Pipeline。
  • 8、Context容器的管道开始处理,流程类似,最后执行 Wrapper容器的Pipeline。
  • 9、Wrapper容器的管道开始处理,流程类似,最后执行 Wrapper容器对应的Servlet对象的处理方法。

Connector

<Connector port="8080" protocol="HTTP/1.1"
               connectionTimeout="20000"
               redirectPort="8443" />
<Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />

Connector用于接收请求并将请求封装成Request和Response来具体处理。最底层使用的是 Socket。Request和Response封装后交给Container(Servlet的容器)处理请求,Container处理后返回给Connector,最后由Socket返回给客户端。

结构

Connector中具体是用 ProtocolHandler处理请求的,代表不同的连接类型。Http11Protocol使用普通Socket连接,Http11NioProtocol使用NioSocket连接。

ProtocolHandler中有三个重要的组件:

  • Endpoint:处理底层的Socket网络连接;
  • Processor:将Endpoint接收的Socket封装成Request;
  • Adapter:将封装后的Request交给Container处理;

Endpoint的抽象类 AbstractEndpoint定义了 Acceptor和 AsyncTimeout两个内部类 和 Handler接口。

  • Acceptor:监听请求;
  • AsyncTimeout:检查异步Request请求的超时;
  • Handler:处理接收的Socket,内部调用 Processor进行处理;
public class Connector extends LifecycleMBeanBase  {

    protected final String protocolHandlerClassName;

    public Connector() {
        this("org.apache.coyote.http11.Http11NioProtocol");
    }
	
    public Connector(String protocol) {
        boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
                AprLifecycleListener.getUseAprConnector();

		// 根据 server.xml 中 Connector 属性 protocol 的值找合适的 className
        // 此处返回  org.apache.coyote.http11.Http11NioProtocol
        // 赋值给  ProtocolHandler
        if ("HTTP/1.1".equals(protocol) || protocol == null) {
            if (aprConnector) {
                protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol";
            } else {
                protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";
            }
        } else if ("AJP/1.3".equals(protocol)) {
            if (aprConnector) {
                protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol";
            } else {
                protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol";
            }
        } else {
            protocolHandlerClassName = protocol;
        }

        // Instantiate protocol handler
        ProtocolHandler p = null;
        try {
            Class<?> clazz = Class.forName(protocolHandlerClassName);
			// 反射生成 Http11NioProtocol 时,在构造函数中 生成了 NioEndpoint。
            p = (ProtocolHandler) clazz.getConstructor().newInstance();
        } catch (Exception e) {
            log.error(sm.getString(
                    "coyoteConnector.protocolHandlerInstantiationFailed"), e);
        } finally {
            this.protocolHandler = p;
        }

        // Default for Connector depends on this system property
        setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"));
    }
	

	// 初始化操作
    @Override
    protected void initInternal() throws LifecycleException {

        super.initInternal();
		
        if (protocolHandler == null) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"));
        }

        // Initialize adapter
		// 初始化是配置  CoyoteAdapter(Connector 作为参数)
        adapter = new CoyoteAdapter(this);
		// 协议处理器绑定 适配器
        protocolHandler.setAdapter(adapter);
        if (service != null) {
            protocolHandler.setUtilityExecutor(service.getServer().getUtilityExecutor());
        }

        // Make sure parseBodyMethodsSet has a default
        if (null == parseBodyMethodsSet) {
            setParseBodyMethods(getParseBodyMethods());
        }

        if (protocolHandler.isAprRequired() && !AprLifecycleListener.isInstanceCreated()) {
            throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoAprListener",
                    getProtocolHandlerClassName()));
        }
        if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
            throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoAprLibrary",
                    getProtocolHandlerClassName()));
        }
        if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
                protocolHandler instanceof AbstractHttp11JsseProtocol) {
            AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
                    (AbstractHttp11JsseProtocol<?>) protocolHandler;
            if (jsseProtocolHandler.isSSLEnabled() &&
                    jsseProtocolHandler.getSslImplementationName() == null) {
                // OpenSSL is compatible with the JSSE configuration, so use it if APR is available
                jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
            }
        }

        try {
			// 执行 协议处理器初始化操作
            protocolHandler.init();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
        }
    }
	
	
	// 使用此 连接器处理 请求
    @Override
    protected void startInternal() throws LifecycleException {

        // Validate settings before starting
		// 验证端口
        if (getPortWithOffset() < 0) {
            throw new LifecycleException(sm.getString(
                    "coyoteConnector.invalidPort", Integer.valueOf(getPortWithOffset())));
        }
		// 设置生命周期状态值
        setState(LifecycleState.STARTING);

        try {
			// 调用 协议处理器 start 方法 
            protocolHandler.start();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
        }
    }	
}	

Tomcat——Tomcat处理请求流程,第4张

Http11NioProtocol

主要包含 NioEndpoint组件和 Http11NioProcessor组件。启动时由NioEndpoint组件启动端口监听,连接到来被注册到NioChannel队列中,由Poller轮询器负责检测Channel的读写事件,并在创建任务后放入线程池中,线程池进行任务处理。

Tomcat——Tomcat处理请求流程,第5张

public class Http11NioProtocol extends AbstractHttp11JsseProtocol<NioChannel> {
    // 创建了  NioEndpoint 类
    public Http11NioProtocol() {
        // 最终调用 AbstractProtocol 类
        super(new NioEndpoint());
    }
}

// 父类 AbstractProtocol
public abstract class AbstractProtocol implements ProtocolHandler,MBeanRegistration {
    @Override
    public void init() throws Exception {
        // endpointName---"http-nio-8080": http-nio  +  8080 组成。
        String endpointName = getName();
        // 去掉 双引号
        endpoint.setName(endpointName.substring(1, endpointName.length()-1));
        // domain ==  catalina
        endpoint.setDomain(domain);
        // Endpoint的初始化操作
        // serverSocketChannel 绑定端口
        endpoint.init();
    }
    @Override
    public void start() throws Exception {
        // Endpoint 的 start 方法,重点
        // 创建 Executor、最大连接、开启 Poller thread
        endpoint.start();
        // 异步超时 线程,
        asyncTimeout = new AsyncTimeout();
        Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
        int priority = endpoint.getThreadPriority();
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            priority = Thread.NORM_PRIORITY;
        }
        timeoutThread.setPriority(priority);
        timeoutThread.setDaemon(true);
        timeoutThread.start();
    }
}

// AsyncTimeout 实现 Runnable 接口
protected class AsyncTimeout implements Runnable {
    private volatile boolean asyncTimeoutRunning = true;
    // 后台线程检查异步请求并在没有活动时触发超时。
    @Override
    public void run() {
        // 死循环,直到接收到 shutdown 命令
        while (asyncTimeoutRunning) {
            Thread.sleep(1000);
            long now = System.currentTimeMillis();
            for (Processor processor : waitingProcessors) {
                processor.timeoutAsync(now);
            }
            // 循环,直到 Endpoint pause
            while (endpoint.isPaused() && asyncTimeoutRunning) {
                Thread.sleep(1000);
            }
        }
    }
}

请求流程源码解析

Tomcat——Tomcat处理请求流程,第6张

在Tomcat的整体架构中,我们发现Tomcat中的各个组件各司其职,组件之间松耦合,确保了整体架构的可伸缩性和可拓展性,那么在组件内部,如何增强组件的灵活性和拓展性呢? 在Tomcat中,每个Container组件采用责任链模式来完成具体的请求处理。

在Tomcat中定义了Pipeline 和 Valve 两个接口,Pipeline 用于构建责任链, 后者代表责 任链上的每个处理器。Pipeline 中维护了一个基础的Valve,它始终位于Pipeline的末端 (最后执行),封装了具体的请求处理和输出响应的过程。当然,我们也可以调用 addValve()方法, 为Pipeline 添加其他的Valve, 后添加的Valve 位于基础的Valve之 前,并按照添加顺序执行。Pipiline通过获得首个Valve来启动整合链条的执行 。

Tomcat——Tomcat处理请求流程,请求时序图,第7张

源码入口

  • org.apache.tomcat.util.net.NioEndpoint#startInternal (启动完成,接收请求)
  • org.apache.tomcat.util.net.NioEndpoint.Poller#run (处理接收的请求)
  • org.apache.tomcat.util.net.NioEndpoint.Poller#processKey
  • org.apache.tomcat.util.net.AbstractEndpoint#processSocket

执行AbstractEndpoint.processSocket

  • org.apache.tomcat.util.net.AbstractEndpoint#processSocket
  • 由Executor线程池执行业务SocketWrapperBase.run
  • Connector组件的Endpoint中的Acceptor监听客户端套接字连接并接收Socket
public abstract class AbstractEndpoint<S,U> {

    public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<S> sc = processorCache.pop();
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }
}

执行SocketWrapperBase.run

  • org.apache.tomcat.util.net.SocketProcessorBase#run
  • 调用doRun方法 ○ 执行3次握手 ○ 获取handler并进行请求处理
  • 将连接交给线程池Exectuor处理,开始执行请求响应任务
public abstract class SocketProcessorBase<S> implements Runnable {

    protected SocketWrapperBase<S> socketWrapper;
    protected SocketEvent event;
	
    @Override
    public final void run() {
        synchronized (socketWrapper) {
            // It is possible that processing may be triggered for read and
            // write at the same time. The sync above makes sure that processing
            // does not occur in parallel. The test below ensures that if the
            // first event to be processed results in the socket being closed,
            // the subsequent events are not processed.
            if (socketWrapper.isClosed()) {
                return;
            }
            doRun();
        }
    }

    protected abstract void doRun();
}	


public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {

    protected class SocketProcessor extends SocketProcessorBase<NioChannel> {

        public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
            super(socketWrapper, event);
        }

        @Override
        protected void doRun() {
            NioChannel socket = socketWrapper.getSocket();
            SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

            try {
				//执行3次握手
                int handshake = -1;

                try {
                    if (key != null) {
                        if (socket.isHandshakeComplete()) {
                            // No TLS handshaking required. Let the handler
                            // process this socket / event combination.
                            handshake = 0;
                        } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                                event == SocketEvent.ERROR) {
                            // Unable to complete the TLS handshake. Treat it as
                            // if the handshake failed.
                            handshake = -1;
                        } else {
                            handshake = socket.handshake(key.isReadable(), key.isWritable());
                            // The handshake process reads/writes from/to the
                            // socket. status may therefore be OPEN_WRITE once
                            // the handshake completes. However, the handshake
                            // happens when the socket is opened so the status
                            // must always be OPEN_READ after it completes. It
                            // is OK to always set this as it is only used if
                            // the handshake completes.
                            event = SocketEvent.OPEN_READ;
                        }
                    }
                } catch (IOException x) {
                    handshake = -1;
                    if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
                } catch (CancelledKeyException ckx) {
                    handshake = -1;
                }
                if (handshake == 0) {
                    SocketState state = SocketState.OPEN;
                    // Process the request from this socket
                    if (event == null) {
                        state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                    } else {
						//获取handler并进行请求处理
                        state = getHandler().process(socketWrapper, event);
                    }
                    if (state == SocketState.CLOSED) {
                        close(socket, key);
                    }
                } else if (handshake == -1 ) {
                    close(socket, key);
                } else if (handshake == SelectionKey.OP_READ){
                    socketWrapper.registerReadInterest();
                } else if (handshake == SelectionKey.OP_WRITE){
                    socketWrapper.registerWriteInterest();
                }
            } catch (CancelledKeyException cx) {
                socket.getPoller().cancelledKey(key);
            } catch (VirtualMachineError vme) {
                ExceptionUtils.handleThrowable(vme);
            } catch (Throwable t) {
                log.error(sm.getString("endpoint.processing.fail"), t);
                socket.getPoller().cancelledKey(key);
            } finally {
                socketWrapper = null;
                event = null;
                //return to cache
                if (running && !paused) {
                    processorCache.push(this);
                }
            }
        }
    }
}

执行AbstractProtocol.process

  • 获取当前Processor
  • 调用processor的process方法(在此Endpoint将请求的信息交给Processor)
  • Processor组件读取到消息报文,解析请求行、请求头、请求体,封装成Request对象
public abstract class AbstractProtocol<S> implements ProtocolHandler,
        MBeanRegistration {
	public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {

		S socket = wrapper.getSocket();
		//拿到当前Processor
		Processor processor = (Processor) wrapper.getCurrentProcessor();
		//....
		SocketState state = SocketState.CLOSED;
		//调用processor的process方法
		state = processor.process(wrapper, status);
		//...
	}
}

执行AbstractProcessorLight.process

  • 执行dispatch或执行service方法
public abstract class AbstractProcessorLight implements Processor {

    @Override
    public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
            throws IOException {

        SocketState state = SocketState.CLOSED;
        Iterator<DispatchType> dispatches = null;
        do {
            if (dispatches != null) {
                DispatchType nextDispatch = dispatches.next();
				//执行dispatch
                state = dispatch(nextDispatch.getSocketStatus());
            } else if (status == SocketEvent.DISCONNECT) {
                // Do nothing here, just wait for it to get recycled
            } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
				//执行dispatch
                state = dispatch(status);
                if (state == SocketState.OPEN) {
                    // There may be pipe-lined data to read. If the data isn't
                    // processed now, execution will exit this loop and call
                    // release() which will recycle the processor (and input
                    // buffer) deleting any pipe-lined data. To avoid this,
                    // process it now.
                    state = service(socketWrapper);
                }
            } else if (status == SocketEvent.OPEN_WRITE) {
                // Extra write event likely after async, ignore
                state = SocketState.LONG;
            } else if (status == SocketEvent.OPEN_READ){
				 //执行service方法
                state = service(socketWrapper);
            } else {
                // Default to closing the socket if the SocketEvent passed in
                // is not consistent with the current state of the Processor
                state = SocketState.CLOSED;
            }

            if (getLog().isDebugEnabled()) {
                getLog().debug("Socket: [" + socketWrapper +
                        "], Status in: [" + status +
                        "], State out: [" + state + "]");
            }

            if (state != SocketState.CLOSED && isAsync()) {
                state = asyncPostProcess();
                if (getLog().isDebugEnabled()) {
                    getLog().debug("Socket: [" + socketWrapper +
                            "], State after async post processing: [" + state + "]");
                }
            }

            if (dispatches == null || !dispatches.hasNext()) {
                // Only returns non-null iterator if there are
                // dispatches to process.
                dispatches = getIteratorAndClearDispatches();
            }
        } while (state == SocketState.ASYNC_END ||
                dispatches != null && state != SocketState.CLOSED);

        return state;
    }
}public abstract class AbstractProcessorLight implements Processor {

    @Override
    public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
            throws IOException {

        SocketState state = SocketState.CLOSED;
        Iterator<DispatchType> dispatches = null;
        do {
            if (dispatches != null) {
                DispatchType nextDispatch = dispatches.next();
				//执行dispatch
                state = dispatch(nextDispatch.getSocketStatus());
            } else if (status == SocketEvent.DISCONNECT) {
                // Do nothing here, just wait for it to get recycled
            } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
				//执行dispatch
                state = dispatch(status);
                if (state == SocketState.OPEN) {
                    // There may be pipe-lined data to read. If the data isn't
                    // processed now, execution will exit this loop and call
                    // release() which will recycle the processor (and input
                    // buffer) deleting any pipe-lined data. To avoid this,
                    // process it now.
                    state = service(socketWrapper);
                }
            } else if (status == SocketEvent.OPEN_WRITE) {
                // Extra write event likely after async, ignore
                state = SocketState.LONG;
            } else if (status == SocketEvent.OPEN_READ){
				 //执行service方法
                state = service(socketWrapper);
            } else {
                // Default to closing the socket if the SocketEvent passed in
                // is not consistent with the current state of the Processor
                state = SocketState.CLOSED;
            }

            if (getLog().isDebugEnabled()) {
                getLog().debug("Socket: [" + socketWrapper +
                        "], Status in: [" + status +
                        "], State out: [" + state + "]");
            }

            if (state != SocketState.CLOSED && isAsync()) {
                state = asyncPostProcess();
                if (getLog().isDebugEnabled()) {
                    getLog().debug("Socket: [" + socketWrapper +
                            "], State after async post processing: [" + state + "]");
                }
            }

            if (dispatches == null || !dispatches.hasNext()) {
                // Only returns non-null iterator if there are
                // dispatches to process.
                dispatches = getIteratorAndClearDispatches();
            }
        } while (state == SocketState.ASYNC_END ||
                dispatches != null && state != SocketState.CLOSED);

        return state;
    }
}

执行Http11Processor.service

  • org.apache.coyote.http11.Http11Processor#service
  • 解析请求行、请求头、请求体,封装成Request对象
  • 获取adapter(CoyoteAdaptor),调用service方法
  • CoyoteAdaptor组件负责将Connector组件和Engine容器适配关联
public class Http11Processor extends AbstractProcessor {

	public SocketState service(SocketWrapperBase<?> socketWrapper)  throws IOException {

		//......

		//获取adapter(CoyoteAdaptor),调用service方法
		getAdapter().service(request, response);
		
		//......
	}
}

执行CoyoteAdaptor.service

  • 获取到Request和Response对象
  • 调用容器,把生成的Request对象和响应的Response对象传递给Engine容器
public class CoyoteAdapter implements Adapter {

    @Override
    public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
            throws Exception {
		//获取到Request和Response对象
        Request request = (Request) req.getNote(ADAPTER_NOTES);
        Response response = (Response) res.getNote(ADAPTER_NOTES);

		//......

   
		postParseSuccess = postParseRequest(req, request, res, response);
		if (postParseSuccess) {
			//check valves if we support async
			request.setAsyncSupported(
					connector.getService().getContainer().getPipeline().isAsyncSupported());
			// Calling the
			//调用容器,获取Engin管道阀Valve
			connector.getService().getContainer().getPipeline().getFirst().invoke(
					request, response);
		}
   
        //......
    }
}

执行责任链xxxValve.invoke

  • 执行StandardServiceValve.invoke
    • a. 获取Host,调用Host容器的Pipeline
    • b. Engine容器的管道开始处理,管道中包含若干个Valve,每个Valve负责部分处理逻辑,执行完Valve后会执行基础的Valve
  • 执行StandardHostValve.invoke
    • a. 获取Context,调用Context容器的Pipeline
  • 执行StandardContextValve.invoke
    • a. 获取Wrapper,调用Wrapper容器的Pipeline

获取Servlet – Wrapper.invoke

  • 从Container中获取Wrapper
  • 从Wrapper中获取到Servlet,至此拿到的就是具体业务Servlet
  • 将Servlet封装到构造的FilterChain过滤器链中
  • 执行过滤器链中的filter,并传入ServletRequest和ServletResponse
final class StandardWrapperValve extends ValveBase {
	
    @Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {	
		
		//......
		
	    //从Container中获取Wrapper
		StandardWrapper wrapper = (StandardWrapper) getContainer();
		Servlet servlet = null;
		Context context = (Context) wrapper.getParent();
		
		//......
		
		//获取到servlet
		servlet = wrapper.allocate();

		//将Servlet封装到FilterChain过滤器链中
		ApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);
		
		//......
		
		//执行过滤器链中的filter
		filterChain.doFilter(request.getRequest(),response.getResponse());
		
		//......
	}
}

执行过滤器链中的filter

  • 获取到ServletRequest、ServletResponse执行过滤器
  • 调用servlet.service方法
  • 调用HttpServlet.service ○ 执行doGet、doPost方法
public final class ApplicationFilterChain implements FilterChain {

    @Override
    public void doFilter(ServletRequest request, ServletResponse response)
        throws IOException, ServletException {

		//......
		
        internalDoFilter(request,response);

    }
	
	private void internalDoFilter(ServletRequest request, ServletResponse response)
				throws IOException, ServletException {
				
		//......
		
		//执行过滤器
		//调用Servlet.service方法
		servlet.service(request, response);
	
		//......
	}
}

执行HttpServlet.service

  • 获取请求方式
  • 执行对应的请求方法,如:doPost、doGet等
public abstract class HttpServlet extends GenericServlet {

    protected void service(HttpServletRequest req, HttpServletResponse resp)
        throws ServletException, IOException {
		//获取请求方式
        String method = req.getMethod();

		//执行对应的请求方法
        if (method.equals(METHOD_GET)) {
            long lastModified = getLastModified(req);
            if (lastModified == -1) {
                // servlet doesn't support if-modified-since, no reason
                // to go through further expensive logic
                doGet(req, resp);
            } else {
                long ifModifiedSince;
                try {
                    ifModifiedSince = req.getDateHeader(HEADER_IFMODSINCE);
                } catch (IllegalArgumentException iae) {
                    // Invalid date header - proceed as if none was set
                    ifModifiedSince = -1;
                }
                if (ifModifiedSince < (lastModified / 1000 * 1000)) {
                    // If the servlet mod time is later, call doGet()
                    // Round down to the nearest second for a proper compare
                    // A ifModifiedSince of -1 will always be less
                    maybeSetLastModified(resp, lastModified);
                    doGet(req, resp);
                } else {
                    resp.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
                }
            }

        } else if (method.equals(METHOD_HEAD)) {
            long lastModified = getLastModified(req);
            maybeSetLastModified(resp, lastModified);
            doHead(req, resp);

        } else if (method.equals(METHOD_POST)) {
            doPost(req, resp);

        } else if (method.equals(METHOD_PUT)) {
            doPut(req, resp);

        } else if (method.equals(METHOD_DELETE)) {
            doDelete(req, resp);

        } else if (method.equals(METHOD_OPTIONS)) {
            doOptions(req,resp);

        } else if (method.equals(METHOD_TRACE)) {
            doTrace(req,resp);

        } else {
            //
            // Note that this means NO servlet supports whatever
            // method was requested, anywhere on this server.
            //

            String errMsg = lStrings.getString("http.method_not_implemented");
            Object[] errArgs = new Object[1];
            errArgs[0] = method;
            errMsg = MessageFormat.format(errMsg, errArgs);

            resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED, errMsg);
        }
    }
}

Http长连接的支持

  • 客户端请求会携带Connection: keep-alive
  • service方法会解析,并使用keepAlive参数记录是否满足长连接条件
  • 不满足长连接条件则返回的响应头携带Connection: close,表示关闭连接
  • 满足长连接条件则不返回Connection参数
public class Http11Processor extends AbstractProcessor {

    @Override
    public SocketState service(SocketWrapperBase<?> socketWrapper)
        throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

        // Setting up the I/O
        setSocketWrapper(socketWrapper);
        inputBuffer.init(socketWrapper);
        outputBuffer.init(socketWrapper);

        // Flags
        keepAlive = true;
        openSocket = false;
        readComplete = true;
        boolean keptAlive = false;
        SendfileState sendfileState = SendfileState.DONE;

		//控制tomcat长连接原理 while循环  keepAlive
        while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
                sendfileState == SendfileState.DONE && !protocol.isPaused()) {

            // Parsing the request header
            try {
                if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),
                        protocol.getKeepAliveTimeout())) {
                    if (inputBuffer.getParsingRequestLinePhase() == -1) {
                        return SocketState.UPGRADING;
                    } else if (handleIncompleteRequestLineRead()) {
                        break;
                    }
                }

                if (protocol.isPaused()) {
                    // 503 - Service unavailable
                    response.setStatus(503);
                    setErrorState(ErrorState.CLOSE_CLEAN, null);
                } else {
                    keptAlive = true;
                    // Set this every time in case limit has been changed via JMX
                    request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());
                    if (!inputBuffer.parseHeaders()) {
                        // We've read part of the request, don't recycle it
                        // instead associate it with the socket
                        openSocket = true;
                        readComplete = false;
                        break;
                    }
                    if (!protocol.getDisableUploadTimeout()) {
                        socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());
                    }
                }
            } catch (IOException e) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.header.parse"), e);
                }
                setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
                break;
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                UserDataHelper.Mode logMode = userDataHelper.getNextMode();
                if (logMode != null) {
                    String message = sm.getString("http11processor.header.parse");
                    switch (logMode) {
                        case INFO_THEN_DEBUG:
                            message += sm.getString("http11processor.fallToDebug");
                            //$FALL-THROUGH$
                        case INFO:
                            log.info(message, t);
                            break;
                        case DEBUG:
                            log.debug(message, t);
                    }
                }
                // 400 - Bad Request
                response.setStatus(400);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
            }

            // Has an upgrade been requested?
            Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection");
            boolean foundUpgrade = false;
            while (connectionValues.hasMoreElements() && !foundUpgrade) {
                foundUpgrade = connectionValues.nextElement().toLowerCase(
                        Locale.ENGLISH).contains("upgrade");
            }

            if (foundUpgrade) {
                // Check the protocol
                String requestedProtocol = request.getHeader("Upgrade");

                UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol);
                if (upgradeProtocol != null) {
                    if (upgradeProtocol.accept(request)) {
                        response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
                        response.setHeader("Connection", "Upgrade");
                        response.setHeader("Upgrade", requestedProtocol);
                        action(ActionCode.CLOSE,  null);
                        getAdapter().log(request, response, 0);

                        InternalHttpUpgradeHandler upgradeHandler =
                                upgradeProtocol.getInternalUpgradeHandler(
                                        socketWrapper, getAdapter(), cloneRequest(request));
                        UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);
                        action(ActionCode.UPGRADE, upgradeToken);
                        return SocketState.UPGRADING;
                    }
                }
            }

            if (getErrorState().isIoAllowed()) {
                // Setting up filters, and parse some request headers
                rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
                try {
                    prepareRequest();
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("http11processor.request.prepare"), t);
                    }
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, t);
                }
            }

			//最大活跃的http请求数量
            int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
            if (maxKeepAliveRequests == 1) {
                keepAlive = false;
            } else if (maxKeepAliveRequests > 0 &&
                    socketWrapper.decrementKeepAlive() <= 0) {
                keepAlive = false;
            }

            // Process the request in the adapter
            if (getErrorState().isIoAllowed()) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    getAdapter().service(request, response);
                    // Handle when the response was committed before a serious
                    // error occurred.  Throwing a ServletException should both
                    // set the status to 500 and set the errorException.
                    // If we fail here, then the response is likely already
                    // committed, so we can't try and set headers.
                    if(keepAlive && !getErrorState().isError() && !isAsync() &&
                            statusDropsConnection(response.getStatus())) {
                        setErrorState(ErrorState.CLOSE_CLEAN, null);
                    }
                } catch (InterruptedIOException e) {
                    setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
                } catch (HeadersTooLargeException e) {
                    log.error(sm.getString("http11processor.request.process"), e);
                    // The response should not have been committed but check it
                    // anyway to be safe
                    if (response.isCommitted()) {
                        setErrorState(ErrorState.CLOSE_NOW, e);
                    } else {
                        response.reset();
                        response.setStatus(500);
                        setErrorState(ErrorState.CLOSE_CLEAN, e);
                        response.setHeader("Connection", "close"); // TODO: Remove
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("http11processor.request.process"), t);
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, t);
                    getAdapter().log(request, response, 0);
                }
            }

            // Finish the handling of the request
            rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
            if (!isAsync()) {
                // If this is an async request then the request ends when it has
                // been completed. The AsyncContext is responsible for calling
                // endRequest() in that case.
                endRequest();
            }
            rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

            // If there was an error, make sure the request is counted as
            // and error, and update the statistics counter
            if (getErrorState().isError()) {
                response.setStatus(500);
            }

            if (!isAsync() || getErrorState().isError()) {
                request.updateCounters();
                if (getErrorState().isIoAllowed()) {
                    inputBuffer.nextRequest();
                    outputBuffer.nextRequest();
                }
            }

            if (!protocol.getDisableUploadTimeout()) {
                int connectionTimeout = protocol.getConnectionTimeout();
                if(connectionTimeout > 0) {
                    socketWrapper.setReadTimeout(connectionTimeout);
                } else {
                    socketWrapper.setReadTimeout(0);
                }
            }

            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

            sendfileState = processSendfile(socketWrapper);
        }

        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

        if (getErrorState().isError() || protocol.isPaused()) {
            return SocketState.CLOSED;
        } else if (isAsync()) {
            return SocketState.LONG;
        } else if (isUpgrade()) {
            return SocketState.UPGRADING;
        } else {
            if (sendfileState == SendfileState.PENDING) {
                return SocketState.SENDFILE;
            } else {
                if (openSocket) {
                    if (readComplete) {
                        return SocketState.OPEN;
                    } else {
                        return SocketState.LONG;
                    }
                } else {
                    return SocketState.CLOSED;
                }
            }
        }
    }
}

参考: https://www.cnblogs.com/wansw/p/10244039.html

https://blog.csdn.net/nblife0000/article/details/60364847

https://blog.csdn.net/jiaomingliang/article/details/47414657

https://blog.csdn.net/nmjhehe/article/details/115533383

https://blog.csdn.net/qq_44377709/article/details/122833521

https://blog.csdn.net/weixin_42413454/article/details/125585199

https://blog.csdn.net/wr_java/article/details/116890919


https://www.xamrdz.com/lan/5w51967422.html

相关文章: