08.提供者如何注册服务 如何延迟暴露
provider的服务(机器ip、服务侦听端口等信息)如何让消费者找到,就是通过注册中心完成的。
+---------------+
| |
+----->Register Center<-----+
| | | |
| +---------------+ |
| |
| |
+---+----+ +----+---+
| | | |
|consumer| |provider|
| | | |
+--------+ +--------+
一些问题
- ServiceBean的onApplicationEvent对接spring容器refresh事件时调用了export方法,那么export方法里面干了什么呢?
- provider什么时候注册且怎么像注册中心注册?
- provider的server是什么时候启动并监听在相应端口上的?
- provider注册后在provider内部留下了怎样的数据结构?
下面会逐一解释。
注册与启动server
既然我们知道了provider需要向注册中心注册,provider也要启server侦听端口用来接收consumer的调用请求。那么我们先看下这个两个事情的调用堆栈吧。
向注册中心注册的调用堆栈
ZookeeperRegistry.doRegister(URL) line: 100
ZookeeperRegistry(FailbackRegistry).register(URL) line: 130
RegistryProtocol.export(Invoker<T>) line: 111
ProtocolFilterWrapper.export(Invoker<T>) line: 53
ProtocolListenerWrapper.export(Invoker<T>) line: 54
Protocol$Adpative.export(Invoker) line: not available
ServiceBean<T>(ServiceConfig<T>).doExportUrlsFor1Protocol(ProtocolConfig, List<URL>) line: 485
ServiceBean<T>(ServiceConfig<T>).doExportUrls() line: 281
ServiceBean<T>(ServiceConfig<T>).doExport() line: 242
ServiceBean<T>(ServiceConfig<T>).export() line: 143
ServiceBean<T>.onApplicationEvent(ApplicationEvent) line: 109
ZookeeperRegistry.doRegister代码如下:
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
在我们调试时url的值如下:
dubbo://192.168.2.3:20880/com.code260.ss.dubbo.demov.facade.service.HelloService?anyhost=true&application=hello-world-app&connections=50&dubbo=2.0.0&interface=com.code260.ss.dubbo.demov.facade.service.HelloService&methods=sayHello&pid=56078&service.filter=tpslimiter&side=provider×tamp=1590823617615
启server侦听端口的调用堆栈
NettyServer.doOpen() line: 68
NettyServer(AbstractServer).<init>(URL, ChannelHandler) line: 67
NettyServer.<init>(URL, ChannelHandler) line: 63
NettyTransporter.bind(URL, ChannelHandler) line: 33
Transporter$Adpative.bind(URL, ChannelHandler) line: not available
Transporters.bind(URL, ChannelHandler...) line: 48
HeaderExchanger.bind(URL, ExchangeHandler) line: 41
Exchangers.bind(URL, ExchangeHandler) line: 63
DubboProtocol.createServer(URL) line: 287
DubboProtocol.openServer(URL) line: 266
DubboProtocol.export(Invoker<T>) line: 253
ProtocolFilterWrapper.export(Invoker<T>) line: 55
ProtocolListenerWrapper.export(Invoker<T>) line: 56
Protocol$Adpative.export(Invoker) line: not available
RegistryProtocol.doLocalExport(Invoker<T>) line: 153
RegistryProtocol.export(Invoker<T>) line: 107
ProtocolFilterWrapper.export(Invoker<T>) line: 53
ProtocolListenerWrapper.export(Invoker<T>) line: 54
Protocol$Adpative.export(Invoker) line: not available
ServiceBean<T>(ServiceConfig<T>).doExportUrlsFor1Protocol(ProtocolConfig, List<URL>) line: 485
ServiceBean<T>(ServiceConfig<T>).doExportUrls() line: 281
ServiceBean<T>(ServiceConfig<T>).doExport() line: 242
ServiceBean<T>(ServiceConfig<T>).export() line: 143
ServiceBean<T>.onApplicationEvent(ApplicationEvent) line: 109
NettyServer.doOpen()的代码如下:
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
getBindAddress()的返回值就是这个provider监听的地址和端口。
如果同一个地址和端口上有多个provider服务,那么显然server只会open一次。这个逻辑在哪里做的呢?在DubboProtocol.openServer这里,会判断server是否能取到,取不到才会createServer:
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}
ServiceBean的export导出
为什么要服务导出?
因为作者的设计意图是:
Protocol 负责export出Exporter,Exporter 里面 包了Invoker;Protocol 也负责refer出Invoker。
上述设计正好对应了服务被调用与服务启动暴露的两个行为。
注册协议服务导出时会做服务注册的事情。dubbo协议服务导出时会做server创建与开启的事情。
Protocol实例(Protocol$Adaptive)会根据不同的Invoker实例来做不同的导出逻辑处理,具体看下面分析。
关于服务导出的配置
服务导出分为:
- null(默认缺省配置,既暴露本地也暴露远程)
- none(不暴露)
- remote(远程暴露)
- local(本地暴露)
远程暴露时两种协议导出过程的衔接与过程
导出的两个步骤
远程暴露时服务导出分两种或者说两个步骤(在走注册中心的情况下):
- 注册协议导出(RegistryProtocol )。注册协议导出完成服务注册,定制unexport逻辑等。
- dubbo协议(DubboProtocol)导出(DubboExporter)。dubbo协议导出完成server的创建与开启。
两个过程的细节后面再描述,先看在哪衔接了这两个过程呢?要看明白衔接过程,先要看明白协议与导出逻辑的关系。
协议与导出的逻辑关系
整体原则是:Protocol实例根据不同的Invoker实例做export,可以参见Protocol接口的export方法签名
但是根据dubbo的扩展点机制,这是一个@Adaptive的接口。实际运行时会adaptive上如下几个类的实例:
注册协议发起export的点及调用栈
// ServiceConfig.doExportUrlsFor1Protocol(ProtocolConfig, List<URL>)
Exporter<?> exporter = protocol.export(invoker);
调用栈:
这个栈非常重要,表示了dubbo内部协议转换的过程
ServiceConfig.doExportUrlsFor1Protocol 485行 (Protocol$Adaptive)
ProtocolListenerWrapper
ProtocolFilterWrapper
RegistryProtocol
dubbo协议发起export的点及调用栈,两种协议导出的衔接点也在此,代码如下:
// com.alibaba.dubbo.registry.integration.RegistryProtocol.doLocalExport(Invoker<T>)
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));// RegistryProtocol 152行
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); // RegistryProtocol 153行(Protocol$Adaptive)
调用栈:
这个栈非常重要,表示了dubbo内部协议转换的过程
RegistryProtocol 153行(Protocol$Adaptive)
ProtocolListenerWrapper
ProtocolFilterWrapper
DubboProtocol
其中ProtocolListenerWrapper和ProtocolFilterWrapper会在代码中硬编码判断根据是否是注册协议来做不同的逻辑处理。
两种导出的衔接点
为啥说在此衔接两种协议,注意看上述代码中两个invoker对应的url的协议是:
这个originInvoker对应注册中心(registry)协议。
这个invokerDelegete对应dubbo协议。
正好对应了上面的根据不同的Invoker实例做export
RegistryProtocol 134行做了Invoker转换的动作,转换的逻辑来自于getProviderUrl方法,改方法会去取配置中的export字段,那么下面列出注册协议和dubbo协议对应的样例url。
注册协议url
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=hello-world-app&dubbo=2.0.0&export=dubbo://172.22.221.166:20880/org.simonme.dubbo.demo.provider.service.HelloService?anyhost=true&application=hello-world-app&dubbo=2.0.0&interface=org.simonme.dubbo.demo.provider.service.HelloService&loadbalance=leastactive&methods=sayHello&pid=7380&side=provider×tamp=1584343891343&pid=7380®istry=zookeeper×tamp=1584343891329
注意看上面有export=….这一段。
dubbo协议url
dubbo://172.22.221.166:20880/org.simonme.dubbo.demo.provider.service.HelloService?anyhost=true&application=hello-world-app&dubbo=2.0.0&interface=org.simonme.dubbo.demo.provider.service.HelloService&loadbalance=leastactive&methods=sayHello&pid=7380&side=provider×tamp=1584343891343
这就是上面注册协议中export的字段值…
那么再思考一个问题,这个export字段值是什么时候加上去的?看下面代码即可:
// com.alibaba.dubbo.config.ServiceConfig.doExportUrlsFor1Protocol(ProtocolConfig, List<URL>) 483行
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
注册中心导出细节
导出器实例维护在哪里?
RegistryProtocol export出的Expoter实例维护在ServiceConfig.exporters中。
RegistryProtocol export出的Expoter实例是RegistryProtocol的Exporter实现的匿名内部类。 这个匿名内部类通过ExporterChangeableWrapper在doLocalExport中对接了ListenerExporterWrapper实例,ListenerExporterWrapper实例再对接了DubboProtocol的Exporter。这个地方的传递方式有点绕。
RegistryProtocol 在export的过程中已经完成DubboProtocol的export,即DubboExporter实例的构建。
RegistryProtocol export出的Expoter实例对应的invoker是JavassistProxyFactory$1,即AbstractProxyInvoker匿名内部类。该Invoker实例此时对应的协议是registry://…,可以观察其url字段得知。
详细的可以看下面这个调试信息表格:
| val$exporter | RegistryProtocol$ExporterChangeableWrapper |
| exporter | ListenerExporterWrapper |
| exporter | DubboExporter |
| invoker | ProtocolFilterWrapper$1 (id=143) |
| originInvoker | JavassistProxyFactory$1 (id=82) |
代码如下:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 订阅override数据
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
dubbo协议导出细节
DubboExporter实例维护在DubboProtocol的exporterMap(声明在其父类AbstractProtocol中)。
DubboExporter 实例对应的invoker是ProtocolFilterWrapper$1,即Invoker的匿名内部类,也即对应包装了过滤器链的调用链。该Invoker实例此时对应的协议是dubbo://…,可以观察其url字段得知。
Dubbo协议导出涉及调用链的构建,server的创建与开启,具体细节可以参见《调用链如何构建的?ProtocolFilterWrapper等分析》和com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.export(Invoker
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.openServer(URL)
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.createServer(URL)
注意 dubbo协议对接的server 对接到 ExchangeServer这一层,并没有直接对接到Netty啥的。
延迟暴露
if (delay != null && delay > 0) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(delay);
} catch (Throwable e) {
}
doExport();
}
});
thread.setDaemon(true);
thread.setName("DelayExportServiceThread");
thread.start();
}
很简单,在ServiceBean的export中做了逻辑判断,发现是延迟暴露的,另起一个线程,并先sleep delay的时间在doExport。
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!