08.提供者如何注册服务 如何延迟暴露

provider的服务(机器ip、服务侦听端口等信息)如何让消费者找到,就是通过注册中心完成的。

          +---------------+
          |               |
    +----->Register Center<-----+
    |     |               |     |
    |     +---------------+     |
    |                           |
    |                           |
+---+----+                 +----+---+
|        |                 |        |
|consumer|                 |provider|
|        |                 |        |
+--------+                 +--------+

一些问题

  1. ServiceBean的onApplicationEvent对接spring容器refresh事件时调用了export方法,那么export方法里面干了什么呢?
  2. provider什么时候注册且怎么像注册中心注册?
  3. provider的server是什么时候启动并监听在相应端口上的?
  4. provider注册后在provider内部留下了怎样的数据结构?

下面会逐一解释。

注册与启动server

既然我们知道了provider需要向注册中心注册,provider也要启server侦听端口用来接收consumer的调用请求。那么我们先看下这个两个事情的调用堆栈吧。

向注册中心注册的调用堆栈

ZookeeperRegistry.doRegister(URL) line: 100	
ZookeeperRegistry(FailbackRegistry).register(URL) line: 130	
RegistryProtocol.export(Invoker<T>) line: 111	
ProtocolFilterWrapper.export(Invoker<T>) line: 53	
ProtocolListenerWrapper.export(Invoker<T>) line: 54	
Protocol$Adpative.export(Invoker) line: not available	
ServiceBean<T>(ServiceConfig<T>).doExportUrlsFor1Protocol(ProtocolConfig, List<URL>) line: 485	
ServiceBean<T>(ServiceConfig<T>).doExportUrls() line: 281	
ServiceBean<T>(ServiceConfig<T>).doExport() line: 242	
ServiceBean<T>(ServiceConfig<T>).export() line: 143	
ServiceBean<T>.onApplicationEvent(ApplicationEvent) line: 109

ZookeeperRegistry.doRegister代码如下:

protected void doRegister(URL url) {
    try {
    	zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

在我们调试时url的值如下:

dubbo://192.168.2.3:20880/com.code260.ss.dubbo.demov.facade.service.HelloService?anyhost=true&application=hello-world-app&connections=50&dubbo=2.0.0&interface=com.code260.ss.dubbo.demov.facade.service.HelloService&methods=sayHello&pid=56078&service.filter=tpslimiter&side=provider&timestamp=1590823617615

启server侦听端口的调用堆栈

NettyServer.doOpen() line: 68	
NettyServer(AbstractServer).<init>(URL, ChannelHandler) line: 67	
NettyServer.<init>(URL, ChannelHandler) line: 63	
NettyTransporter.bind(URL, ChannelHandler) line: 33	
Transporter$Adpative.bind(URL, ChannelHandler) line: not available	
Transporters.bind(URL, ChannelHandler...) line: 48	
HeaderExchanger.bind(URL, ExchangeHandler) line: 41	
Exchangers.bind(URL, ExchangeHandler) line: 63	
DubboProtocol.createServer(URL) line: 287	
DubboProtocol.openServer(URL) line: 266	
DubboProtocol.export(Invoker<T>) line: 253	
ProtocolFilterWrapper.export(Invoker<T>) line: 55	
ProtocolListenerWrapper.export(Invoker<T>) line: 56	
Protocol$Adpative.export(Invoker) line: not available	
RegistryProtocol.doLocalExport(Invoker<T>) line: 153	
RegistryProtocol.export(Invoker<T>) line: 107	
ProtocolFilterWrapper.export(Invoker<T>) line: 53	
ProtocolListenerWrapper.export(Invoker<T>) line: 54	
Protocol$Adpative.export(Invoker) line: not available	
ServiceBean<T>(ServiceConfig<T>).doExportUrlsFor1Protocol(ProtocolConfig, List<URL>) line: 485	
ServiceBean<T>(ServiceConfig<T>).doExportUrls() line: 281	
ServiceBean<T>(ServiceConfig<T>).doExport() line: 242	
ServiceBean<T>(ServiceConfig<T>).export() line: 143	
ServiceBean<T>.onApplicationEvent(ApplicationEvent) line: 109

NettyServer.doOpen()的代码如下:

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);
    
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    // https://issues.jboss.org/browse/NETTY-365
    // https://issues.jboss.org/browse/NETTY-379
    // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            /*int idleTimeout = getIdleTimeout();
            if (idleTimeout > 10000) {
                pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
            }*/
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
}

getBindAddress()的返回值就是这个provider监听的地址和端口。
如果同一个地址和端口上有多个provider服务,那么显然server只会open一次。这个逻辑在哪里做的呢?在DubboProtocol.openServer这里,会判断server是否能取到,取不到才会createServer:

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client 也可以暴露一个只有server可以调用的服务。
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
    if (isServer) {
    	ExchangeServer server = serverMap.get(key);
    	if (server == null) {
    		serverMap.put(key, createServer(url));
    	} else {
    		//server支持reset,配合override功能使用
    		server.reset(url);
    	}
    }
}

ServiceBean的export导出

为什么要服务导出?

因为作者的设计意图是:

Protocol 负责export出Exporter,Exporter 里面 包了Invoker;Protocol 也负责refer出Invoker。

上述设计正好对应了服务被调用与服务启动暴露的两个行为。

注册协议服务导出时会做服务注册的事情。dubbo协议服务导出时会做server创建与开启的事情。

Protocol实例(Protocol$Adaptive)会根据不同的Invoker实例来做不同的导出逻辑处理,具体看下面分析。

关于服务导出的配置

服务导出分为:

  1. null(默认缺省配置,既暴露本地也暴露远程)
  2. none(不暴露)
  3. remote(远程暴露)
  4. local(本地暴露)

远程暴露时两种协议导出过程的衔接与过程

导出的两个步骤

远程暴露时服务导出分两种或者说两个步骤(在走注册中心的情况下):

  1. 注册协议导出(RegistryProtocol )。注册协议导出完成服务注册,定制unexport逻辑等。
  2. dubbo协议(DubboProtocol)导出(DubboExporter)。dubbo协议导出完成server的创建与开启

两个过程的细节后面再描述,先看在哪衔接了这两个过程呢?要看明白衔接过程,先要看明白协议与导出逻辑的关系。

协议与导出的逻辑关系

整体原则是:Protocol实例根据不同的Invoker实例做export,可以参见Protocol接口的export方法签名 Exporter export(Invoker invoker) throws RpcException;。

但是根据dubbo的扩展点机制,这是一个@Adaptive的接口。实际运行时会adaptive上如下几个类的实例:

注册协议发起export的点及调用栈

// ServiceConfig.doExportUrlsFor1Protocol(ProtocolConfig, List<URL>)

Exporter<?> exporter = protocol.export(invoker);

调用栈:

这个栈非常重要,表示了dubbo内部协议转换的过程

ServiceConfig.doExportUrlsFor1Protocol 485行 (Protocol$Adaptive)
    ProtocolListenerWrapper
        ProtocolFilterWrapper
            RegistryProtocol

dubbo协议发起export的点及调用栈,两种协议导出的衔接点也在此,代码如下:

// com.alibaba.dubbo.registry.integration.RegistryProtocol.doLocalExport(Invoker<T>)
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));// RegistryProtocol 152行
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); // RegistryProtocol 153行(Protocol$Adaptive)

调用栈:

这个栈非常重要,表示了dubbo内部协议转换的过程

RegistryProtocol 153行(Protocol$Adaptive)
     ProtocolListenerWrapper
        ProtocolFilterWrapper       
               DubboProtocol

其中ProtocolListenerWrapperProtocolFilterWrapper会在代码中硬编码判断根据是否是注册协议来做不同的逻辑处理。

两种导出的衔接点

为啥说在此衔接两种协议,注意看上述代码中两个invoker对应的url的协议是:

这个originInvoker对应注册中心(registry)协议

这个invokerDelegete对应dubbo协议

正好对应了上面的根据不同的Invoker实例做export

RegistryProtocol 134行做了Invoker转换的动作,转换的逻辑来自于getProviderUrl方法,改方法会去取配置中的export字段,那么下面列出注册协议和dubbo协议对应的样例url。

注册协议url

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=hello-world-app&dubbo=2.0.0&export=dubbo://172.22.221.166:20880/org.simonme.dubbo.demo.provider.service.HelloService?anyhost=true&application=hello-world-app&dubbo=2.0.0&interface=org.simonme.dubbo.demo.provider.service.HelloService&loadbalance=leastactive&methods=sayHello&pid=7380&side=provider&timestamp=1584343891343&pid=7380&registry=zookeeper&timestamp=1584343891329

注意看上面有export=….这一段

dubbo协议url

dubbo://172.22.221.166:20880/org.simonme.dubbo.demo.provider.service.HelloService?anyhost=true&application=hello-world-app&dubbo=2.0.0&interface=org.simonme.dubbo.demo.provider.service.HelloService&loadbalance=leastactive&methods=sayHello&pid=7380&side=provider&timestamp=1584343891343

这就是上面注册协议中export的字段值…

那么再思考一个问题,这个export字段值是什么时候加上去的?看下面代码即可:

// com.alibaba.dubbo.config.ServiceConfig.doExportUrlsFor1Protocol(ProtocolConfig, List<URL>) 483行
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

注册中心导出细节

导出器实例维护在哪里?

RegistryProtocol export出的Expoter实例维护在ServiceConfig.exporters中。

RegistryProtocol export出的Expoter实例是RegistryProtocol的Exporter实现的匿名内部类。 这个匿名内部类通过ExporterChangeableWrapper在doLocalExport中对接了ListenerExporterWrapper实例,ListenerExporterWrapper实例再对接了DubboProtocol的Exporter。这个地方的传递方式有点绕。

RegistryProtocol 在export的过程中已经完成DubboProtocol的export,即DubboExporter实例的构建。

RegistryProtocol export出的Expoter实例对应的invoker是JavassistProxyFactory$1,即AbstractProxyInvoker匿名内部类。该Invoker实例此时对应的协议是registry://…,可以观察其url字段得知。

详细的可以看下面这个调试信息表格:

val$exporter    RegistryProtocol$ExporterChangeableWrapper (id=112)    
    exporter    ListenerExporterWrapper (id=126)        
        exporter    DubboExporter (id=131)        
            invoker    ProtocolFilterWrapper$1 (id=143)        
    originInvoker    JavassistProxyFactory$1 (id=82)        

代码如下:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    //registry provider
    final Registry registry = getRegistry(originInvoker);
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    registry.register(registedProviderUrl);
    // 订阅override数据
    // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //保证每次export都返回一个新的exporter实例
    return new Exporter<T>() {
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }
        public void unexport() {
        	try {
        		exporter.unexport();
        	} catch (Throwable t) {
            	logger.warn(t.getMessage(), t);
            }
            try {
            	registry.unregister(registedProviderUrl);
            } catch (Throwable t) {
            	logger.warn(t.getMessage(), t);
            }
            try {
            	overrideListeners.remove(overrideSubscribeUrl);
            	registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
            } catch (Throwable t) {
            	logger.warn(t.getMessage(), t);
            }
        }
    };
}

dubbo协议导出细节

DubboExporter实例维护在DubboProtocol的exporterMap(声明在其父类AbstractProtocol中)

DubboExporter 实例对应的invoker是ProtocolFilterWrapper$1,即Invoker的匿名内部类,也即对应包装了过滤器链的调用链。该Invoker实例此时对应的协议是dubbo://…,可以观察其url字段得知。

Dubbo协议导出涉及调用链的构建,server的创建与开启,具体细节可以参见《调用链如何构建的?ProtocolFilterWrapper等分析》和com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.export(Invoker)

com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.openServer(URL)

com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.createServer(URL)

注意 dubbo协议对接的server 对接到 ExchangeServer这一层,并没有直接对接到Netty啥的。

延迟暴露

if (delay != null && delay > 0) {
           Thread thread = new Thread(new Runnable() {
               public void run() {
                   try {
                       Thread.sleep(delay);
                   } catch (Throwable e) {
                   }
                   doExport();
               }
           });
           thread.setDaemon(true);
           thread.setName("DelayExportServiceThread");
           thread.start();
       }

很简单,在ServiceBean的export中做了逻辑判断,发现是延迟暴露的,另起一个线程,并先sleep delay的时间在doExport。