先看下consumer端发起调用时的链路流程:
+---------------------------+ +---------------------------+ +---------------------------+
| helloService | | proxy | | InvokerInvocationHandler |
| sayHello +----------> | sayHello +----------> | invoke |
| | | | | proxy method args |
+---------------------------+ +---------------------------+ +-------------+-------------+
|
|
+---------------------------------+
| | |
| +------------v--------------+ |
| | MockClusterInvoker | |
| | invoke | |
| | | |
| +------------+--------------+ |
| | |
| | |
| | |
+---------------------------+ +---------------------------+ | +------------v--------------+ |
| Router | | RegistryDirectory | | | FailoverClusterInvoker | |
| route | <----------+ list | <-----------+ invoke | |
| MockInVokersSelector | | INVOCATION-->List INVOKER | | | | |
+------------+--------------+ +---------------------------+ | +---------------------------+ |
| | |
| +---------------------------------+
| cluster invoke,分布式调用容错机制也是在这做
|
|
|
|
|
+-------------v-------------+ +---------------------------+ +---------------------------+
| RandomLoadBalance | |InvokerDelegate | | ListenerInvokerWrap |
| select +-----------> |invoke +-----------> | invoke |
| List INVOKER-->INVOKER | | | | |
+---------------------------+ +---------------------------+ +---------------------------+
#1. 引入zookeeper作为注册中心后,服务查找过程
从建立spring到netty client建立连接的调用栈:
NettyClient.doOpen() line: 66
NettyClient(AbstractClient).
NettyClient.
NettyTransporter.connect(URL, ChannelHandler) line: 37
Transporter$Adpative.connect(URL, ChannelHandler) line: not available
Transporters.connect(URL, ChannelHandler…) line: 67
HeaderExchanger.connect(URL, ExchangeHandler) line: 37
Exchangers.connect(URL, ExchangeHandler) line: 102
DubboProtocol.initClient(URL) line: 378
DubboProtocol.getSharedClient(URL) line: 344
DubboProtocol.getClients(URL) line: 321
DubboProtocol.refer(Class
ProtocolListenerWrapper.refer(Class
ProtocolFilterWrapper.refer(Class
Protocol$Adpative.refer(Class, URL) line: not available
RegistryDirectory
RegistryDirectory
RegistryDirectory
ZookeeperRegistry(AbstractRegistry).notify(URL, NotifyListener, List
ZookeeperRegistry(FailbackRegistry).doNotify(URL, NotifyListener, List
ZookeeperRegistry(FailbackRegistry).notify(URL, NotifyListener, List
ZookeeperRegistry.doSubscribe(URL, NotifyListener) line: 170
ZookeeperRegistry(FailbackRegistry).subscribe(URL, NotifyListener) line: 189
RegistryDirectory
RegistryProtocol.doRefer(Cluster, Registry, Class
RegistryProtocol.refer(Class
ProtocolListenerWrapper.refer(Class
ProtocolFilterWrapper.refer(Class
Protocol$Adpative.refer(Class, URL) line: not available
ReferenceBean
ReferenceBean
ReferenceBean
ReferenceBean
DefaultListableBeanFactory(FactoryBeanRegistrySupport).doGetObjectFromFactoryBean(FactoryBean, String, boolean) line: 142
整体来说: 先由注册中心的协议处理器处理注册中心的地址,找到所有provider的地址,创建所有invoker,然后再由invoker在真正调用时发起调用。
注册中心的这个也抽象一种协议,由注册中心结合提供者的协议推导出提供者的协议地址,也就是目标端的地址与端口得知了。
每一个接口在zookeeper上都有节点,节点下面是provider,再下面是所有provider的具体地址。
#2. netty client的基本步骤
创建channelPipelineFactory,并在这个factory中返回加工好的ChannelPipeline,加工过程包括加入编解码器,连接事件处理组成的netty handler实现
(包括连接建立,断开,请求写出去,)
writeRequested(netty的)–>调用编码器(编码的这个对象中包括了 需要调用的目标接口名 方法名等信息)
(继承SimpleChannelHandler重写逻辑,可以定制channel的读取与写出逻辑)
#3. 在使用zookeeper作为注册中心时,如果有provider服务停掉, consumer端如何感知?再次启动刚停掉的provider呢?
provider停掉会触发zk客户端的监听,监听对客户端的invoker列表进行刷新。
再次启动会触发 zk的监听,代码在ZkclientZookeeperClient
public IZkChildListener createTargetChildListener(String path, final ChildListener listener) {
return new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds)
throws Exception {
listener.childChanged(parentPath, currentChilds);
}
};
}
然后再触发 com.alibaba.dubbo.registry.support.FailbackRegistry.doNotify(URL, NotifyListener, List
com.alibaba.dubbo.registry.integration.RegistryDirectory.refreshInvoker(List
如果有provider停掉了 走一样的监听逻辑
同时,dubbo支持 定时检查provider的状态并进行重连,具体参见
com.alibaba.dubbo.remoting.transport.AbstractClient.initConnectStatusCheckCommand()
reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
#4. 如果正在发服务的时候,provider停掉了,dubbo是如何处理的?
如果在发服务时,provider停掉了,那么此时会抛出异常,并在FailoverClusterInvoker doInvoke中捕获,
FailoverClusterInvoker支持调用失败时重试(可配置),此时达到再次重试的目的。
#5. client在多次调用时,与provider端的连接是建立几次,在prodvider端服务状态有变化时呢?
NettyClient 的doOpen doConnect均在初始化的时候调用,有几个provider就调用几次,真正rpc调用服务的时候是不会再调用open与connect的。
上面这个说法不严格,因为看他发送消息的代码就知道了,每次发消息时还会检查下:
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()){
connect();
}
Channel channel = getChannel();
//TODO getChannel返回的状态是否包含null需要改进
if (channel == null || ! channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
channel.send(message, sent);
}
#6. 对于多个provider,dubbo默认在哪里选择了一个invoker进行调用的
com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.select(LoadBalance, Invocation, List<Invoker
com.alibaba.dubbo.rpc.filter.EchoFilter@1fd14d74
com.alibaba.dubbo.rpc.filter.ClassLoaderFilter@563e4951
com.alibaba.dubbo.rpc.filter.GenericFilter@4066c471
com.alibaba.dubbo.rpc.filter.ContextFilter@2b175c00
com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter@3eb81efb
com.alibaba.dubbo.rpc.filter.TimeoutFilter@1ae8bcbc
com.alibaba.dubbo.monitor.support.MonitorFilter@6cdba6dc
com.alibaba.dubbo.rpc.filter.ExceptionFilter@2609b277
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!