"main@1" prio=5 tid=0x1 nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.dubbo.remoting.transport.netty4.NettyServer.doOpen(NettyServer.java:97) at org.apache.dubbo.remoting.transport.AbstractServer.<init>(AbstractServer.java:63) at org.apache.dubbo.remoting.transport.netty4.NettyServer.<init>(NettyServer.java:65) at org.apache.dubbo.remoting.transport.netty4.NettyTransporter.bind(NettyTransporter.java:32) at org.apache.dubbo.remoting.Transporter$Adaptive.bind(Transporter$Adaptive.java:-1) at org.apache.dubbo.remoting.Transporters.bind(Transporters.java:56) at org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger.bind(HeaderExchanger.java:44) at org.apache.dubbo.remoting.exchange.Exchangers.bind(Exchangers.java:70) at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol.createServer(DubboProtocol.java:306) at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol.openServer(DubboProtocol.java:283) - locked <0x9ee> (a org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol) at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol.export(DubboProtocol.java:267) at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.export(ProtocolListenerWrapper.java:57) at org.apache.dubbo.qos.protocol.QosProtocolWrapper.export(QosProtocolWrapper.java:63) at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper.export(ProtocolFilterWrapper.java:100) at org.apache.dubbo.rpc.Protocol$Adaptive.export(Protocol$Adaptive.java:-1) at org.apache.dubbo.registry.integration.RegistryProtocol.doLocalExport(RegistryProtocol.java:170) - locked <0x9e2> (a java.util.concurrent.ConcurrentHashMap) at org.apache.dubbo.registry.integration.RegistryProtocol.export(RegistryProtocol.java:133) at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.export(ProtocolListenerWrapper.java:55) at org.apache.dubbo.qos.protocol.QosProtocolWrapper.export(QosProtocolWrapper.java:61) at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper.export(ProtocolFilterWrapper.java:98) at org.apache.dubbo.rpc.Protocol$Adaptive.export(Protocol$Adaptive.java:-1) at org.apache.dubbo.config.ServiceConfig.doExportUrlsFor1Protocol(ServiceConfig.java:513) at org.apache.dubbo.config.ServiceConfig.doExportUrls(ServiceConfig.java:358) at org.apache.dubbo.config.ServiceConfig.doExport(ServiceConfig.java:317) - locked <0x848> (a org.apache.dubbo.config.spring.ServiceBean) at org.apache.dubbo.config.ServiceConfig.export(ServiceConfig.java:216) at org.apache.dubbo.config.spring.ServiceBean.onApplicationEvent(ServiceBean.java:123) at org.apache.dubbo.config.spring.ServiceBean.onApplicationEvent(ServiceBean.java:49) at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:393) at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:347) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:883) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) - locked <0xab7> (a java.lang.Object) at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139) at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83) at cn.shuaijunlan.dubbo.learning.main.Main.main(Main.java:13)
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); }
String scope = url.getParameter(Constants.SCOPE_KEY); // don't export when none is configured if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote) if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) { //本地暴露(分析1) exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) //如果配置不是local则暴露为远程服务 if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && !registryURLs.isEmpty()) { //多个注册中心 for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); }
// For providers, this is used to enable custom proxy to generate invoker String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } //根据Java SPI机制得到ProxyFactory$Adaptive类的实例proxyFactory Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //获取包装类?? DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); //调用Protocol$Adaptive的export()方法 Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { //没有注册中心,只在本机IP打开服务端口生成服务代理,并不注册到注册中心 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //Ensure that a new exporter instance is returned every time export //保证每次export都返回一个新的exporter实例 returnnew DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }
因此有如下的调用栈:
1 2 3 4 5
at org.apache.dubbo.registry.integration.RegistryProtocol.export(RegistryProtocol.java:133) at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.export(ProtocolListenerWrapper.java:55) at org.apache.dubbo.qos.protocol.QosProtocolWrapper.export(QosProtocolWrapper.java:61) at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper.export(ProtocolFilterWrapper.java:98) at org.apache.dubbo.rpc.Protocol$Adaptive.export(Protocol$Adaptive.java:-1)
//export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //根据url开启一个服务,比如绑定端口,开始接受请求 openServer(url); optimizeSerialization(url); return exporter; }
到这里就可以对应下面的调用栈:
1 2 3 4 5 6
at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol.export(DubboProtocol.java:267) at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.export(ProtocolListenerWrapper.java:57) at org.apache.dubbo.qos.protocol.QosProtocolWrapper.export(QosProtocolWrapper.java:63) at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper.export(ProtocolFilterWrapper.java:100) at org.apache.dubbo.rpc.Protocol$Adaptive.export(Protocol$Adaptive.java:-1) at org.apache.dubbo.registry.integration.RegistryProtocol.doLocalExport(RegistryProtocol.java:170)
privatevoidopenServer(URL url){ // find server. //key=host:port 用于定位server String key = url.getAddress(); //client can export a service which's only for server to invoke //client也可以暴露一个只有server可以调用的服务 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { //服务实例放到serverMap中,key是host:port //这里serverMap也是单例的 ExchangeServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { //通过createServer(url)方法获取server serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override //server支持reset,配合override使用 server.reset(url); } } }
@Override protectedvoiddoOpen()throws Throwable { bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels();
at org.apache.dubbo.remoting.transport.netty4.NettyServer.doOpen(NettyServer.java:97) at org.apache.dubbo.remoting.transport.AbstractServer.<init>(AbstractServer.java:63) at org.apache.dubbo.remoting.transport.netty4.NettyServer.<init>(NettyServer.java:65) at org.apache.dubbo.remoting.transport.netty4.NettyTransporter.bind(NettyTransporter.java:32) at org.apache.dubbo.remoting.Transporter$Adaptive.bind(Transporter$Adaptive.java:-1) at org.apache.dubbo.remoting.Transporters.bind(Transporters.java:56) at org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger.bind(HeaderExchanger.java:44) at org.apache.dubbo.remoting.exchange.Exchangers.bind(Exchangers.java:70) at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol.createServer(DubboProtocol.java:306) at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol.openServer(DubboProtocol.java:283)
if (register) { //注册服务从这里开始 register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); }
// Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //Ensure that a new exporter instance is returned every time export returnnew DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }
@Override publicvoidregister(URL url){ super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // Sending a registration request to the server side doRegister(url); } catch (Exception e) { Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } thrownew IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); }
// Record a failed registration request to a failed list, retry regularly failedRegistered.add(url); } }
"main@1" prio=5 tid=0x1 nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.dubbo.registry.zookeeper.ZookeeperRegistry.doRegister(ZookeeperRegistry.java:114) at org.apache.dubbo.registry.support.FailbackRegistry.register(FailbackRegistry.java:137) at org.apache.dubbo.registry.integration.RegistryProtocol.register(RegistryProtocol.java:127) at org.apache.dubbo.registry.integration.RegistryProtocol.export(RegistryProtocol.java:147) at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.export(ProtocolListenerWrapper.java:55) at org.apache.dubbo.qos.protocol.QosProtocolWrapper.export(QosProtocolWrapper.java:61) at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper.export(ProtocolFilterWrapper.java:98) at org.apache.dubbo.rpc.Protocol$Adaptive.export(Protocol$Adaptive.java:-1) at org.apache.dubbo.config.ServiceConfig.doExportUrlsFor1Protocol(ServiceConfig.java:513) at org.apache.dubbo.config.ServiceConfig.doExportUrls(ServiceConfig.java:358) at org.apache.dubbo.config.ServiceConfig.doExport(ServiceConfig.java:317) - locked <0x9bb> (a org.apache.dubbo.config.spring.ServiceBean) at org.apache.dubbo.config.ServiceConfig.export(ServiceConfig.java:216) at org.apache.dubbo.config.spring.ServiceBean.onApplicationEvent(ServiceBean.java:123) at org.apache.dubbo.config.spring.ServiceBean.onApplicationEvent(ServiceBean.java:49)