前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo源码学习-服务发布

Dubbo源码学习-服务发布

作者头像
周同学
发布2020-03-20 11:07:50
9010
发布2020-03-20 11:07:50
举报
文章被收录于专栏:一块自留地

一、自己实现的思路

  • 解析配置文件
  • netty通信
  • 序列化、反序列化
  • 服务地址注册到 注册中心

二、如何解析spring的配置文件

我们一般会把服务的信息放在spring的配置文件中,供dubbo解析调用。那么这些配置文件是怎么起作用的呢?

2.1、首先定义spring能识别的自定义标签
  • spring提供NamespaceHandlerSupportBeanDefinitionParser这两个接口,给我们去扩展,可以实现自定义的标签
  • 通过.xsd文件声明自定义标签和属性
2.2、dubbo去解析自定义标签

接下来我们来看看dubbo是怎么解析它自定义标签的:

代码语言:javascript
复制
//DubboNamespaceHandler.java
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    @Override
    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }
}
复制代码

可以看到该类继承了spring的NamespaceHandlerSupport,每个标签通过dubbo的解析类去解析,然后再把解析出来的数据放入各自的配置类中。 细心的同学应该发现了,其他的标签都是Config,但是service和reference的却是Bean,这是因为他们在解析完具体配置后,需要调用它们对应的方法进行初始化

如标签"service"会被DubboBeanDefinitionParser解析,然后把数据填充到ServiceBean中。

三、发布流程

ServiceBean中,有众多实现类,其中有ApplicationListener接口,根据spring的监听器,来触发服务发布。如下:

代码语言:javascript
复制
//ServiceBean.java

//监听 spring上下文刷新或者加载的时候
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            //服务发布
            export();
        }
    }
复制代码

接着追踪:

代码语言:javascript
复制
//ServiceBean.java

@Override
    public void export() {
        //调用父类方法
        super.export();
        // Publish ServiceBeanExportedEvent
        //发布监听事件
        publishExportEvent();
    }
复制代码
代码语言:javascript
复制
//ServiceConfig.java
public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        //是否发布
        if (export != null && !export) {
            return;
        }
        
        //是否延迟
        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                @Override
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }
复制代码

继续看doExport()方法

代码语言:javascript
复制
//ServiceConfig.java
protected synchronized void doExport() {
    ...
    checkApplication();
    checkRegistry();
    checkProtocol();
    appendProperties(this);
    checkStubAndMock(interfaceClass);
    if (path == null || path.length() == 0) {
        path = interfaceName;
    }
    doExportUrls();
}
复制代码

继续看doExportUrls()方法

代码语言:javascript
复制
//ServiceConfig.java
 private void doExportUrls() {
        //加载注册中心,并且生成URL地址
        //例如:regitry://192.168.1.1:2080/org.apache.dubbo.registry.RegistryService/...
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            //服务发布
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
复制代码

接着追踪doExportUrlsFor1Protocol()

代码语言:javascript
复制
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
            name = "dubbo";
        }
        
        //1、生成map,填充参数
        Map<String, String> map = new HashMap<String, String>();
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        
        ...省略
        
        if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
            protocolConfig.setRegister(false);
            map.put("notify", "false");
        }

        String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
            contextPath = provider.getContextpath();
        }
        //2、从protocol标签中获取服务的host
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        //3、从protocol标签中获取服务的端口
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        //4、把map转为URL
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        //具体服务发布
        String scope = url.getParameter(Constants.SCOPE_KEY);
        // don't export when none is configured
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && !registryURLs.isEmpty()) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                        //将监控中心的 URL 作为 "monitor" 参数添加到服务提供者的 URL 中
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(Constants.PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                        }
                        //invoker -> 代理类
                        //将服务提供者的 URL 作为 "export" 参数添加到注册中心的 URL 中
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }
复制代码

这段代码很长很多,不要慌,我们先看重点内容,大致知道每段代码的作用,上面的代码可以大致分为以下几个步骤:

  • 生成map,填充参数
  • 从protocol标签中获取服务的host
  • 从protocol标签中获取服务的port
  • 生成服务的URL
  • 将服务提供者的 URL 作为 "export" 参数添加到注册中心的 URL 中
  • 调用Protocol#export(invoker)方法
3.1、 第1步,是把服务发布相关的参数全部填充到map中,如下图:
3.2、第2步,获取host和port,最后把map转为URL,如下图:
3.3、第3步,将服务提供者的 URL 作为 "export" 参数添加到注册中心的 URL 中。
3.4、第4步,调用Protocol#export(invoker)方法。

此时 Dubbo SPI 自适应的特性的好处就出来了,可以自动根据 URL 参数,获得对应的拓展实现。例如,invoker 传入后,根据 invoker.url 自动获得对应 Protocol拓展实现为DubboProtocol那么具体是怎么实现的呢?

首先,通过Protocol$Adapter类中的export方法,如下图:

我们这里传入的url是registry://,所以这里会得到一个RegistryProtocol对象。 然后,RegistryProtocol调用export方法,我们来看下代码:

代码语言:javascript
复制
//RegistryProtocol.java

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        //1、暴露服务,本质上就是启动一个neety服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        //获取注册中心url registry:// -> zookeeper://
        URL registryUrl = getRegistryUrl(originInvoker);

        //registry provider
        //创建注册中心对象,与注册中心创建TCP连接。
        final Registry registry = getRegistry(originInvoker);
        // 获得服务提供者 URL dubbo://
        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

        //to judge to delay publish whether or not
        boolean register = registeredProviderUrl.getParameter("register", true);

        // 向本地注册表,注册服务提供者
        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

        //2、 向注册中心注册服务提供者(自己)
        if (register) {
            register(registryUrl, registeredProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        // 订阅configurators节点,监听服务动态属性变更事件。
        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
    }

复制代码

这里的代码我们也主要看大概功能,可以分成两部分:

  • 暴露服务,启动netty
  • 向注册中心注册服务提供者
3.4.1、我们先来看第一部分,暴露服务,doLocalExport点进去看:
代码语言:javascript
复制
//RegistryProtocol.java

@SuppressWarnings("unchecked")
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        // 获得在 `bounds` 中的缓存 Key
        String key = getCacheKey(originInvoker);
        //从 `bounds` 获取exporter
        //这里的`bounds`是一个map缓存,避免暴露过的服务重复暴露。
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                // 没有暴露过,则进行暴露服务
                if (exporter == null) {
                // 创建 Invoker Delegate 对象,继续对invoker进行包装
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    //Protocol$Adapter.export() -> DubbpProtocol.export()
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }
复制代码

这里的逻辑很简单,首先判断是否暴露过,没有则进行暴露,这里的protocol还是Protocol$Adapter,调用的是Protocol$Adapter.export(),代码如下:

这里的extension是做了许多层包装的,最后在ProtocolFilterWrapper中调用了DubboProtocol.export(),如下图:

为什么会这样呢?因为originInvoker里面是服务提供者的url,我们这里是dubbo协议,所以会自适应成DubboProtocol

接下来看最重要的DubboProtocol.export()代码:

代码语言:javascript
复制
//DubboProtocol.java

/**
 * 服务器集合
 *
 * key: 服务器地址。格式为:host:port
 */
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>();

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        //从url中组装服务器地址 例如 120.0.0.1:8080
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        //存入集合
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        //开启一个服务,暴露20880端口
        openServer(url);
        //优化序列化
        optimizeSerialization(url);
        return exporter;
    }
复制代码

接下来看具体启动服务的代码openServer(url)

代码语言:javascript
复制
//DubboProtocol.java

private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        //判断是否是服务端
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                //启动服务
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }
复制代码

继续跟踪createServer(url):

代码语言:javascript
复制
//DubboProtocol.java

private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        //默认开启心跳检测
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        //选择使用什么方式启动服务,netty3? netty4? mina?  默认是netty4
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        // 设置编解码器为 `"Dubbo"`
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            // 启动服务
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
复制代码

Exchanger是数据交换层的门面类,里面抽象了2个方法:

代码语言:javascript
复制
@SPI(HeaderExchanger.NAME)
public interface Exchanger {

    /**
     * bind.
     * 绑定一个服务器
     * @param url
     * @param handler
     * @return message server
     */
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    /**
     * connect.
     * 连接一个服务器
     * @param url
     * @param handler
     * @return message channel
     */
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
复制代码

server = Exchangers.bind(url, requestHandler);这个方法最终会调用到netty的方法。

代码语言:javascript
复制
//NettyServer.java

protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // 启动netty服务 
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
复制代码

可以看的,最终是调用了netty的方法,构建了Socket连接。

总结一下dubbo服务发布的主要流程:

  • 解析spring配置文件
  • 组装url
  • 构建一个Invoker
  • RegistryProtocol.export()
  • DubboProtpcol.export()
  • 启动一个nettyServer
  • 把服务提供这注册到注册中心
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020年03月02日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、自己实现的思路
  • 二、如何解析spring的配置文件
    • 2.1、首先定义spring能识别的自定义标签
      • 2.2、dubbo去解析自定义标签
      • 三、发布流程
        • 3.1、 第1步,是把服务发布相关的参数全部填充到map中,如下图:
          • 3.2、第2步,获取host和port,最后把map转为URL,如下图:
            • 3.3、第3步,将服务提供者的 URL 作为 "export" 参数添加到注册中心的 URL 中。
              • 3.4、第4步,调用Protocol#export(invoker)方法。
                • 3.4.1、我们先来看第一部分,暴露服务,doLocalExport点进去看:
            相关产品与服务
            微服务引擎 TSE
            微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档