23.IO线程与业务线程的合与分,以及提供的策略选择实现分析
io线程与业务线程合在一个线程上,还是把业务线程拆成单独的线程(线程池)去处理,一般来讲,取决于业务线程是否比较耗时。
如果业务线程比较耗时,就可以拆成单独的线程池处理,这样可以提升io吞吐。
如果业务线程不怎么耗时,处理非常快,那么就可以不拆成单独线程,因为这样可以降低上下文切换cs带来的开销。
这种方式在多个中间件都会有涉及。
dubbo中给予了多种选择,由使用方自行决定配置。默认是与io线程分开。
下面我们将分析dubbo中业务线程池的处理及对ChannelHandler与Dispatcher的处理。
我们先分析下HeaderExchangeXXX与上下层的交互。
HeaderExchangeXXX与上下层的交互示意图
① +-------------------------+
| DubboProtocol |
+------------^------------+
|
|
|
+-------------------------------+ |
|DecodeHandler | |
+---------> +-------------------------+ | |
| |HeaderExchangeHandler | | |
② | | +--------------------+ | | |
| | | DubboProtocol$1 | | | |
+--------+ | | requestHandler | | | |
| | | +----------------------+ | +-----------+------------+
| | +---------------------------+ | HeaderExchangeSerVer +<--+
| +-------------------------------+ +------------------------+ |
| |
| |
| |
| |
| |
| |
| ChannelHandlers.wrapInternal |
| +----------------------------------+ |
| |MultiMessageHandler +---------------------------+ |
| | +----------------------------+ | | |
| ③ | |HeartbeatHandler | | +----------------------v----+-+
| | | +-----------------------+ | | | new NettySerVer(xxHandler) |
| | | | Dispatcher.dispatch | | | +-----------------------------+
| | | | AllChannelHandler | | |
| | | +---------------------^-----+
| | +------------------------------|
| +----------------------------------+
| |
+-------------------------------------+
①是Protocol层。代码参见com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.createServer(URL) 287行。
private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler); // 287行
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
②是Exchange层。代码参见com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger.bind(URL, ExchangeHandler)
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
③是Transport层。代码参见com.alibaba.dubbo.remoting.transport.netty.NettyServer.NettyServer(URL, ChannelHandler)
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
整体概述
使用的地方在ChannelHandlers中(注意不是ChannelHandler):
// ChannelHandlers
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
handler的多层叠加形式,依赖委托的玩法,即每次创建实例的时候将被委托的handler通过构造参数送进去传递给super的handler字段。
MultiMessageHandler –> HeartbeatHandler –> 通过扩展点拿到的Dispatcher的具体扩展,在根据这个Dispatcher实例dispatch出具体的handler实例。
每次handle时,按上面顺序进行。
MultiMessageHandler 负责如果是多消息时,挨个调用上层委托去handler
HeartbeatHandler 负责如果是心跳消息由他自己处理,否则调用上层委托去处理。并设置读写时间戳。读写时间戳对应Channel上的一对attr。
Dispatcher
Dispatcher的具体扩展有:
- AllDispatcher [默认实现] 对应AllChannelHandler实例,与io线程隔离。
- ConnectionOrderedDispatcher 对应ConnectionOrderedChannelHandler实例
- DirectDispatcher 将原handler直接返回,即不创建线程池
- ExecutionDispatcher 对应ExecutionChannelHandler实例
- MessageOnlyDispatcher 对应MessageOnlyChannelHandler
ChannelHandler
AllChannelHandler
整体逻辑比较简单:当有连接、断开、接收等事件时构造对应的ChannelEventRunnable实例丢进线程池执行。至于线程池的选用有点文章。这个地方也是netty的io线程跟dubbo server handler线程切换的地方。也是ChannelEventRunnable实例创建的地方(在我们业务方法调用栈上可以看到)。
对于dubbo server handler的线程池是否隔离的策略分析可以参见上文 业务侧共享线程池还是隔离线程池区别与实现分析。
ConnectionOrderedChannelHandler
连接和断开交给改handler定制的connectionExecutor来完成,其余的接收等事件与AllChannelhandler逻辑相同使用全局共享的或者实例级别的线程池。
connectionExecutor是用的LinkedBlockingQueue队列做的线程池,连接容量通过connect.queue.capacity
配置项指定,队列满后打日志并抛异常。
ExecutionChannelHandler
这个handler是不走全局共享的线程,所有连接、断开、接收等事件全部使用实例级别的线程池。
MessageOnlyChannelHandler
只有接收事件走线程池,其他事件全部使用io线程。接收事件走线程池的策略与AllChannelHandler相同。
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!