tomcat管道模式

img

管道与阀门

img

管道是就像一条管道把多个对象连接起来,整体看起来就像若干个阀门嵌套在管道中,而处理逻辑放在阀门上。

在一个比较复杂的大型系统中,如果一个对象或数据流需要进行繁杂的逻辑处理,我们可以选择在一个大的组件中直接处理这些繁杂的逻辑处理,这个方式虽然达到目的,但是拓展性和可重用性差。因为牵一发而动全身。

管道是就像一条管道把多个对象连接起来,整体看起来就像若干个阀门嵌套在管道中,而处理逻辑放在阀门上。

它的结构和实现是非常值得我们学习和借鉴的。

首先要了解的是每一种container都有一个自己的StandardValve

Pipeline就像一个工厂中的生产线,负责调配工人(valve)的位置,valve则是生产线上负责不同操作的工人。

tomcat中的管道

img

tomcat按照包含关系有4个级别的容器,标准实现分别是:

  • StandardEngine
  • StandardHost
  • StandardContext
  • StandardWrapper

img

请求对象和响应对象分别在这4个容器之间通过管道机制进行传递。

img

Pipeline-Valve 是责任链模式,责任链模式是指在一个请求处理的过程中有很多处理者依次对请求进行处理,每个处理者负责做自己相应的处理,处理完之后将再调用下一个处理者继续处理。

Valve 表示一个处理点,比如权限认证和记录日志。如果你还不太理解的话,可以来看看 Valve 和 Pipeline 接口中的关键方法。

public interface Valve {
  public Valve getNext();
  public void setNext(Valve valve);
  // 用来处理请求
  public void invoke(Request request, Response response)
}

由于 Valve 是一个处理点,因此 invoke 方法就是来处理请求的。注意到 Valve 中有 getNext 和 setNext 方法,因此我们大概可以猜到有一个链表将 Valve 链起来了。请你继续看 Pipeline 接口:

public interface Pipeline extends Contained {
  public void addValve(Valve valve);
  public Valve getBasic();
  public void setBasic(Valve valve);
  public Valve getFirst();
}

Pipeline 中有 addValve 方法。Pipeline 中维护了 Valve 链表,Valve 可以插入到 Pipeline 中,对请求做某些处理。我们还发现 Pipeline 中没有 invoke 方法,因为整个调用链的触发是 Valve 来完成的,Valve 完成自己的处理后,调用 getNext.invoke() 来触发下一个 Valve 调用。

每一个容器都有一个 Pipeline 对象,只要触发这个 Pipeline 的第一个 Valve,这个容器里 Pipeline 中的 Valve 就都会被调用到。但是,不同容器的 Pipeline 是怎么链式触发的呢,比如 Engine 中 Pipeline 需要调用下层容器 Host 中的 Pipeline。

这是因为 Pipeline 中还有个 getBasic 方法。这个 BasicValve 处于 Valve 链表的末端,它是 Pipeline 中必不可少的一个 Valve,负责调用下层容器的 Pipeline 里的第一个 Valve。我还是通过一张图来解释。

image-20220112130623845

整个调用过程由连接器中的 Adapter 触发的,它会调用 Engine 的第一个 Valve:

connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);

Wrapper 容器的最后一个 Valve 会创建一个 Filter 链,并调用 doFilter() 方法,最终会调到 Servlet 的 service 方法。

源码分析

初始化

配置阀门

在 server.xml 中 配置 的默认 阀门:

<Engine name="Catalina" defaultHost="localhost">
    <Host name="localhost"  appBase="webapps"
          unpackWARs="true" autoDeploy="true">
        <!-- 默认 Valve -->
        <Valve className="org.apache.catalina.valves.AccessLogValve"
               directory="logs"
               prefix="localhost_access_log"
               suffix=".txt"
               <!-- maxDays="5" -->
               pattern="%h %l %u %t &quot;%r&quot; %s %b" />
        <!-- 自定义 valve -->
        <Valve className="org.apache.catalina.valves.WswAccessValve"/>
    </Host>
</Engine>
自定义Valve
public class WswAccessValve extends ValveBase {

    @Override
    public void invoke(Request request, Response response) {
        String uri = request.getRequestURI();
        System.out.println("uri = " + uri);
        getNext().invoke(request, response);
    }
}
四个基础Valve
public StandardEngine() {
    super();
    pipeline.setBasic(new StandardEngineValve());
}

public StandardHost() {
    super();
    pipeline.setBasic(new StandardHostValve());
}

public StandardContext() {
    super();
    pipeline.setBasic(new StandardContextValve());
}

public StandardWrapper() {
    super();
    pipeline.setBasic(new StandardWrapperValve());
}
构造Valve链
// 在处理 server.xml 中配置的 Valve 时
// StandardPipeline
@Override
public void setBasic(Valve valve) {
    
    Valve oldBasic = this.basic;
    if (oldBasic == valve) { return; }

    // 不会走的
    if (oldBasic != null) {
        if (getState().isAvailable() && (oldBasic instanceof Lifecycle)) {
            ((Lifecycle) oldBasic).stop();
        }
        if (oldBasic instanceof Contained) {
            ((Contained) oldBasic).setContainer(null);
    }

    if (valve == null) { return; }
    if (valve instanceof Contained) {
        ((Contained) valve).setContainer(this.container);
    }
    // 未执行
    if (getState().isAvailable() && valve instanceof Lifecycle) {
        ((Lifecycle) valve).start();
    }
    Valve current = first;
    while (current != null) {
        if (current.getNext() == oldBasic) {
            current.setNext(valve);
            break;
        }
        current = current.getNext();
    }
    this.basic = valve;
}


@Override
public void addValve(Valve valve) {
    // 验证 是否可以 绑定 容器(Engine、Host等)
    if (valve instanceof Contained)
        ((Contained) valve).setContainer(this.container);
    // 是否有必要调用 start 方法【默认没有调用,不属于LifeCycle】
    if (getState().isAvailable()) {
        if (valve instanceof Lifecycle) {
            ((Lifecycle) valve).start();
        }
    }
    // first valve == null,就设置为第一个
    if (first == null) {
        first = valve;
        valve.setNext(basic);
    } else {
        Valve current = first;
        while (current != null) {
            if (current.getNext() == basic) {
                // 按照在xml中定义的顺序+basicValve
                // 组装成链式结构,first + 新增 valve + basicValve
                current.setNext(valve);
                valve.setNext(basic);
                break;
            }
            current = current.getNext();
        }
    }
    // [没有执行]
    container.fireContainerEvent(Container.ADD_VALVE_EVENT, valve);
}

执行流程

CoyoteAdapter#service

在收到请求后,调用”容器过滤器”

// org.apache.catalina.connector.CoyoteAdapter
@Override
public void service(org.apache.coyote.Request req,                                                  org.apache.coyote.Response res) {
        // 此处调用 Pipeline Value 的 invoke 方法。(Engine是最顶层容器)
        connector.getService()
            .getContainer()
            .getPipeline()
            .getFirst()
            .invoke(request, response);
    }
}
// StandardPipeline
@Override
public Valve getFirst() {
    // 如果有注册Valve,就返回
    if (first != null) {
        return first;
    }
    // 返回注册的 BasicValve
    return basic;
}
StandardEngineValve
@Override
public final void invoke(Request request, Response response) {
    // 根据 当前 request 找到合适的 host,通过 MappingData 
    Host host = request.getHost();
    // 没有找到 host,不走了
    if (host == null) {
        response.sendError(HttpServletResponse.SC_BAD_REQUEST,
                 sm.getString("standardEngine.noHost"));
        return;
    }
    // 执行下一个 Valve
    host.getPipeline().getFirst().invoke(request, response);
}
StandardHostValve
@Override
public final void invoke(Request request, Response response) {
    Context context = request.getContext();
    // 未找到匹配的 项目
    if (context == null) {
        response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                           sm.getString("standardHost.noContext"));
        return;
    }
    // debug 后 都为 false
    boolean asyncAtStart = request.isAsync();
    boolean asyncDispatching = request.isAsyncDispatching();
    try {
        context.bind(Globals.IS_SECURITY_ENABLED, MY_CLASSLOADER);
        // 见下面的说明
        if (!asyncAtStart && !context.fireRequestInitEvent(request.getRequest())) {
            return;
        }
        try {
            if (!asyncAtStart || asyncDispatching) {
                context.getPipeline().getFirst().invoke(request, response);
            } else {
                if (!response.isErrorReportRequired()) {
                    throw new IllegalStateException("standardHost.asyncStateError");
                }
            }
        } catch (Throwable t) {
            if (!response.isErrorReportRequired()) {
                request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, t);
                throwable(request, response, t);
            }
        }
        response.setSuspended(false);
        // 是否有错误信息
        Throwable t = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
        // 判断 上下文状态是否可用
        if (!context.getState().isAvailable()) {
            return;
        }
        // 查找(如果找到则呈现)应用程序级别错误页面
        if (response.isErrorReportRequired()) {
            if (t != null) {
                throwable(request, response, t);
            } else {
                status(request, response);
            }
        }
        // 调用销毁 request 的方法,见说明
        if (!request.isAsync() && !asyncAtStart) {
            context.fireRequestDestroyEvent(request.getRequest());
        }
    } finally {
        if (ACCESS_SESSION) {
            request.getSession(false);
        }
        context.unbind(Globals.IS_SECURITY_ENABLED, MY_CLASSLOADER);
    }
}
fireRequestInitEvent
// 说明
@Override
public boolean fireRequestInitEvent(ServletRequest request) {
    Object instances[] = getApplicationEventListeners();
    if ((instances != null) && (instances.length > 0)) {
        ServletRequestEvent event =
            new ServletRequestEvent(getServletContext(), request);
        for (int i = 0; i < instances.length; i++) {
            if (instances[i] == null) { continue; }
            if (!(instances[i] instanceof ServletRequestListener))
                continue;
            ServletRequestListener listener =
                (ServletRequestListener) instances[i];
            try {
                listener.requestInitialized(event);
            } catch (Throwable t) {
                request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, t);
                return false;
            }
        }
    }
    return true;
}
// 获取所有的 applicationEventListener
// 判断是否是 ServletRequestListener 类型
// 调用 listener.requestInitialized(event); 完成初始化
fireRequestDestroyEvent
@Override
public boolean fireRequestDestroyEvent(ServletRequest request) {
    Object instances[] = getApplicationEventListeners();
    if ((instances != null) && (instances.length > 0)) {
        ServletRequestEvent event =
            new ServletRequestEvent(getServletContext(), request);
        for (int i = 0; i < instances.length; i++) {
            int j = (instances.length -1) -i;
            if (instances[j] == null) { continue; }
            if (!(instances[j] instanceof ServletRequestListener))
                continue;
            ServletRequestListener listener =
                (ServletRequestListener) instances[j];
            try {
                listener.requestDestroyed(event);
            } catch (Throwable t) {
                request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, t);
                return false;
            }
        }
    }
    return true;
}
// 和初始化方法极其相似
StandardContextValve
@Override
public final void invoke(Request request, Response response) {
    // 测试的请求路径【/index.html】
    MessageBytes requestPathMB = request.getRequestPathMB();
    // 不允许直接访问 WEB-INF or META-INF,在这里判断的
    if ((requestPathMB.startsWithIgnoreCase("/META-INF/", 0))
        || (requestPathMB.equalsIgnoreCase("/META-INF"))
        || (requestPathMB.startsWithIgnoreCase("/WEB-INF/", 0))
        || (requestPathMB.equalsIgnoreCase("/WEB-INF"))) {
        response.sendError(HttpServletResponse.SC_NOT_FOUND);
        return;
    }

    // 根据当前 request 选择合适的 wrapper【servlet】
    Wrapper wrapper = request.getWrapper();
    // 判断状态
    if (wrapper == null || wrapper.isUnavailable()) {
        response.sendError(HttpServletResponse.SC_NOT_FOUND);
        return;
    }
    
    // 发送确认请求,HTTP/1.1 100
    try {
        response.sendAcknowledgement();
    } catch (IOException ioe) {
        return;
    }
    // 判断是否支持异步
    if (request.isAsyncSupported()) {
            request.setAsyncSupported(wrapper.getPipeline().isAsyncSupported());
    }
    // 调用 Wrapper 的 pipeline 处理
    wrapper.getPipeline().getFirst().invoke(request, response);
}
StandardWrapperValve
@Override
public final void invoke(Request request, Response response)
    throws IOException, ServletException {

    boolean unavailable = false;
    requestCount.incrementAndGet();
    StandardWrapper wrapper = (StandardWrapper) getContainer();
    Servlet servlet = null;
    Context context = (Context) wrapper.getParent();
    // Check for the application being marked unavailable
    if (!context.getState().isAvailable()) {
        response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                           sm.getString("standardContext.isUnavailable"));
        unavailable = true;
    }
    // 分配 servlet 实例处理此次请求
    if (!unavailable) {
        // 会判断  Servlet 是否有  
        // ******* SingleThreadModel 接口 ******************
        servlet = wrapper.allocate();
    }

    // 创建 Filter
    ApplicationFilterChain filterChain =
        ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);

    // Call the filter chain for this request
    if ((servlet != null) && (filterChain != null)) {
        if (request.isAsyncDispatching()) {
            request.getAsyncContextInternal().doInternalDispatch();
        } else {
            // 此处 调用 过滤器方法
            filterChain.doFilter(request.getRequest(),
                                 response.getResponse());
        }
    } else {
        if (request.isAsyncDispatching()) {
            request.getAsyncContextInternal().doInternalDispatch();
        } else {
            filterChain.doFilter
                (request.getRequest(), response.getResponse());
        }
    }

    // 释放 filterChain
    if (filterChain != null) {
        filterChain.release();
    }

    // 释放 instance
    if (servlet != null) {
        wrapper.deallocate(servlet);
    }

    // 如果不可用,卸载 & 释放 instance
    if ((servlet != null) &&
        (wrapper.getAvailable() == Long.MAX_VALUE)) {
        wrapper.unload();
    }
}

注意点

Filter似乎也有相似的功能,那 Valve 和 Filter 有什么区别吗?它们的区别是:

  • Valve 是 tomcat 的私有机制,与 tomcat 的基础架构 /API 是紧耦合的。Servlet API 是公有的标准,所有的 Web 容器包括 Jetty 都支持 Filter 机制。
  • 另一个重要的区别是 Valve 工作在 Web 容器级别,拦截所有应用的请求;而 Servlet Filter 工作在应用级别,只能拦截某个 Web 应用的所有请求。如果想做整个 Web 容器的拦截器,必须通过 Valve 来实现。