15.多个provider如何在客户端体现,即集群如何实现

多provider的test case

为了分析多个provider,我们在demo中再加一个node,代码如下:

package com.code260.ss.dubbo.demov.server.registercenter.zookeeper;

import java.io.IOException;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/com/code260/ss/dubbo/demov/server/conf/registercenter/zookeeper/applicationContextNode2.xml")
public class HelloServiceTestNode2
{

    @Test
    public void testSayHello()
    {
        try
        {
            System.out.println("zookeeper注册中心demo:I am node2:-)");
            System.in.read();
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }
}

applicationContextNode2.xml中关键配置如下:

<import resource="classpath:com/code260/ss/dubbo/demov/server/conf/registercenter/zookeeper/applicationContext.xml" />


<dubbo:registry protocol="zookeeper" address="${zookeeper_rigister_center}" />
<dubbo:monitor protocol="registry" />
   <dubbo:protocol name="dubbo" port="20890"/>

监听在20890端口

我们跑一下client demo侧的HelloClientTest。

消费端发现多个provider

回顾消费者如何发现服务一文中

注册协议到Invoker创建的堆栈:

DubboInvoker<T>.<init>(Class<T>, URL, ExchangeClient[], Set<Invoker<?>>) line: 62	
DubboProtocol.refer(Class<T>, URL) line: 303	
ProtocolFilterWrapper.refer(Class<T>, URL) line: 62	
ProtocolListenerWrapper.refer(Class<T>, URL) line: 65	
Protocol$Adpative.refer(Class, URL) line: not available	
RegistryDirectory<T>.toInvokers(List<URL>) line: 395	
RegistryDirectory<T>.refreshInvoker(List<URL>) line: 224	
RegistryDirectory<T>.notify(List<URL>) line: 195	
ZookeeperRegistry(AbstractRegistry).notify(URL, NotifyListener, List<URL>) line: 449	
ZookeeperRegistry(FailbackRegistry).doNotify(URL, NotifyListener, List<URL>) line: 273	
ZookeeperRegistry(FailbackRegistry).notify(URL, NotifyListener, List<URL>) line: 259	
ZookeeperRegistry.doSubscribe(URL, NotifyListener) line: 170

ZookeeperRegistry.doSubscribe的部分代码:

164                    zkClient.create(path, false);
165                    List<String> children = zkClient.addChildListener(path, zkListener);
166                    if (children != null) {
167                    	urls.addAll(toUrlsWithEmpty(url, path, children));
168                    }
169                }
170               notify(url, listener, urls);

当有多个provider时,165行能查到多个children,表示获取到了多个provider,然后在RegistryDirectory.toInvokers(List) line: 395 会转换成多个invoker。当然,这种描述的是先启动了多个provider,再启动消费者的case。对于那种后启动加进来的provider,消费者也是能感知到的。

选择一个Invoker进行调用并封装了集群语义

既然能在消费端发现了多个Invoker,那么在调用时就要选择一个。在哪里选择的呢?代码如下:

FailoverClusterInvoker.doInvoke(Invocation, List<Invoker>, LoadBalance) line: 78

53    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    	// ......
73           Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
74            invoked.add(invoker);
75            RpcContext.getContext().setInvokers((List)invoked);
76            try {
77                Result result = invoker.invoke(invocation);

此时的入参invokers示例值是:[com.alibaba.dubbo.registry.integration.RegistryDirectory$InvokerDelegete@c65a5ef, com.alibaba.dubbo.registry.integration.RegistryDirectory$InvokerDelegete@6b5176f2]

也就是会将发现的invoker列表传进来,然后选一个Invoker(73行)进行调用(77行)。

FailoverClusterInvoker这个类名已经发现,此处封装了集群语义,也实现了容错。容错具体细节在后面文章阐述。此处先描述一路到底的调用过程。当然FailoverClusterInvoker是默认的实现选择,还有其他策略,诸如:FailfastClusterInvoker,FailbackClusterInvoker等实现。

相关调用堆栈如下:

FailoverClusterInvoker<T>.doInvoke(Invocation, List<Invoker<T>>, LoadBalance) line: 78	
FailoverClusterInvoker<T>(AbstractClusterInvoker<T>).invoke(Invocation) line: 227	
MockClusterInvoker<T>.invoke(Invocation) line: 72	
InvokerInvocationHandler.invoke(Object, Method, Object[]) line: 52	
proxy0.sayHello(String) line: not available	
HelloClientTest.testSayHello() line: 37

proxy0就是前面 消费者consumer侧 reference bean生成逻辑 文章中讲到的生成的代理。