23.IO线程与业务线程的合与分,以及提供的策略选择实现分析

io线程与业务线程合在一个线程上,还是把业务线程拆成单独的线程(线程池)去处理,一般来讲,取决于业务线程是否比较耗时。

如果业务线程比较耗时,就可以拆成单独的线程池处理,这样可以提升io吞吐。

如果业务线程不怎么耗时,处理非常快,那么就可以不拆成单独线程,因为这样可以降低上下文切换cs带来的开销。

这种方式在多个中间件都会有涉及。

dubbo中给予了多种选择,由使用方自行决定配置。默认是与io线程分开。

下面我们将分析dubbo中业务线程池的处理及对ChannelHandler与Dispatcher的处理。

我们先分析下HeaderExchangeXXX与上下层的交互。

HeaderExchangeXXX与上下层的交互示意图

                                           ①    +-------------------------+
                                                |    DubboProtocol        |
                                                +------------^------------+
                                                             |
                                                             |
                                                             |
          +-------------------------------+                  |
          |DecodeHandler                  |                  |
+--------->   +-------------------------+ |                  |
          |   |HeaderExchangeHandler    | |                  |
   ②      |   |  +--------------------+ | |                  |
          |   |  |  DubboProtocol$1   | | |                  |
 +--------+   |  |   requestHandler   | | |                  |
 |        |   |  +----------------------+ |      +-----------+------------+
 |        |   +---------------------------+      |  HeaderExchangeSerVer  +<--+
 |        +-------------------------------+      +------------------------+   |
 |                                                                            |
 |                                                                            |
 |                                                                            |
 |                                                                            |
 |                                                                            |
 |                                                                            |
 |         ChannelHandlers.wrapInternal                                       |
 |        +----------------------------------+                                |
 |        |MultiMessageHandler               +---------------------------+    |
 |        |   +----------------------------+ |                           |    |
 |    ③   |   |HeartbeatHandler            | |    +----------------------v----+-+
 |        |   |  +-----------------------+ | |    |  new NettySerVer(xxHandler) |
 |        |   |  |  Dispatcher.dispatch  | | |    +-----------------------------+
 |        |   |  |   AllChannelHandler   | | |
 |        |   |  +---------------------^-----+
 |        |   +------------------------------|
 |        +----------------------------------+
 |                                     |
 +-------------------------------------+

①是Protocol层。代码参见com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.createServer(URL) 287行。

private ExchangeServer createServer(URL url) {
    //默认开启server关闭时发送readonly事件
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    //默认开启heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler); // 287行
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

②是Exchange层。代码参见com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger.bind(URL, ExchangeHandler)

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

③是Transport层。代码参见com.alibaba.dubbo.remoting.transport.netty.NettyServer.NettyServer(URL, ChannelHandler)

public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                    .getAdaptiveExtension().dispatch(handler, url)));
}

整体概述

使用的地方在ChannelHandlers中(注意不是ChannelHandler):

// ChannelHandlers
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                    .getAdaptiveExtension().dispatch(handler, url)));
}

handler的多层叠加形式,依赖委托的玩法,即每次创建实例的时候将被委托的handler通过构造参数送进去传递给super的handler字段。

MultiMessageHandler –> HeartbeatHandler –> 通过扩展点拿到的Dispatcher的具体扩展,在根据这个Dispatcher实例dispatch出具体的handler实例。

每次handle时,按上面顺序进行。

MultiMessageHandler 负责如果是多消息时,挨个调用上层委托去handler

HeartbeatHandler 负责如果是心跳消息由他自己处理,否则调用上层委托去处理。并设置读写时间戳。读写时间戳对应Channel上的一对attr。

Dispatcher

Dispatcher的具体扩展有:

  1. AllDispatcher [默认实现] 对应AllChannelHandler实例,与io线程隔离。
  2. ConnectionOrderedDispatcher 对应ConnectionOrderedChannelHandler实例
  3. DirectDispatcher 将原handler直接返回,即不创建线程池
  4. ExecutionDispatcher 对应ExecutionChannelHandler实例
  5. MessageOnlyDispatcher 对应MessageOnlyChannelHandler

ChannelHandler

AllChannelHandler

整体逻辑比较简单:当有连接、断开、接收等事件时构造对应的ChannelEventRunnable实例丢进线程池执行。至于线程池的选用有点文章。这个地方也是netty的io线程跟dubbo server handler线程切换的地方。也是ChannelEventRunnable实例创建的地方(在我们业务方法调用栈上可以看到)。

对于dubbo server handler的线程池是否隔离的策略分析可以参见上文 业务侧共享线程池还是隔离线程池区别与实现分析。

ConnectionOrderedChannelHandler

连接和断开交给改handler定制的connectionExecutor来完成,其余的接收等事件与AllChannelhandler逻辑相同使用全局共享的或者实例级别的线程池。

connectionExecutor是用的LinkedBlockingQueue队列做的线程池,连接容量通过connect.queue.capacity配置项指定,队列满后打日志并抛异常。

ExecutionChannelHandler

这个handler是不走全局共享的线程,所有连接、断开、接收等事件全部使用实例级别的线程池。

MessageOnlyChannelHandler

只有接收事件走线程池,其他事件全部使用io线程。接收事件走线程池的策略与AllChannelHandler相同。