先看下Dubbo源码中demo启动类
@SpringBootApplication
@EnableDubbo(scanBasePackages = {"org.apache.dubbo.springboot.demo.provider"})
public class ProviderApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(ProviderApplication.class, args);
System.out.println("dubbo service started");
new CountDownLatch(1).await();
}
}
通过@EnableDubbo注解启动,该注解整合了@EnableDubboConfig和@DubboCompoentScan,这两个注解分别import了DubboConfigConfigurationRegistrar和DubboComponentScanRegistrar
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Import(DubboConfigConfigurationRegistrar.class)
public @interface EnableDubboConfig {......}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {......}
DubboConfigConfigurationRegistrar和DubboComponentScanRegistrar都是在BeanDefinitionRegistry加载BeanDefinition的过程中向BeanDefinitionRegistry注册自定义的BeanDefinition,代码如下,都调用了DubboSpringInitializer.initialize方法初始化DubboSpringInitContext(如果存在就不再初始化),而DubboComponentScanRegistrar多了一个registerServiceAnnotationPostProcessor步骤,向BeanDefinitionRegistry中注册了一个ServiceAnnotationPostProcessor
public class DubboConfigConfigurationRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
// initialize dubbo beans
DubboSpringInitializer.initialize(registry);BeanDefinitionRegistry
}
}
public class DubboComponentScanRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
// initialize dubbo beans
DubboSpringInitializer.initialize(registry);
Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);
registerServiceAnnotationPostProcessor(packagesToScan, registry);
}
}
DubboSpringInitializer.initialize最终调用DubboBeanUtils.registerCommonBeans注册一系列Dubbo的基础服务类,如下
static void registerCommonBeans(BeanDefinitionRegistry registry) {
registerInfrastructureBean(registry, ServicePackagesHolder.BEAN_NAME, ServicePackagesHolder.class);
registerInfrastructureBean(registry, ReferenceBeanManager.BEAN_NAME, ReferenceBeanManager.class);
// Since 2.5.7 Register @Reference Annotation Bean Processor as an infrastructure Bean
registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME,
ReferenceAnnotationBeanPostProcessor.class);
// TODO Whether DubboConfigAliasPostProcessor can be removed ?
// Since 2.7.4 [Feature] https://github.com/apache/dubbo/issues/5093
registerInfrastructureBean(registry, DubboConfigAliasPostProcessor.BEAN_NAME,
DubboConfigAliasPostProcessor.class);
// register ApplicationListeners
registerInfrastructureBean(registry, DubboDeployApplicationListener.class.getName(), DubboDeployApplicationListener.class);
registerInfrastructureBean(registry, DubboConfigApplicationListener.class.getName(), DubboConfigApplicationListener.class);
// Since 2.7.6 Register DubboConfigDefaultPropertyValueBeanPostProcessor as an infrastructure Bean
registerInfrastructureBean(registry, DubboConfigDefaultPropertyValueBeanPostProcessor.BEAN_NAME,
DubboConfigDefaultPropertyValueBeanPostProcessor.class);
// Dubbo config initializer
registerInfrastructureBean(registry, DubboConfigBeanInitializer.BEAN_NAME, DubboConfigBeanInitializer.class);
// register infra bean if not exists later
registerInfrastructureBean(registry, DubboInfraBeanRegisterPostProcessor.BEAN_NAME, DubboInfraBeanRegisterPostProcessor.class);
}
这里主要先提下这两个类,后面详细分析
ReferenceAnnotationBeanPostProcessor:处理服务消费者Processor
DubboDeployApplicationListener:Provider服务发布的Listener
ServiceAnnotationPostProcessor的主要作用就是扫描@EnableDubbo中配置的scanBasePackages路径下需要暴露Dubbo服务的Service,ServiceAnnotationPostProcessor实现了BeanDefinitionRegistryPostProcessor接口,BeanDefinitionRegistryPostProcessor接口是BeanFactoryPostProcessor接口的子类。
BeanFactoryPostProcessor和BeanDefinitionRegistryPostProcessor分别定义了postProcessBeanFactory和postProcessBeanDefinitionRegistry方法,postProcessBeanFactory在Bean实例化前执行,而postProcessBeanDefinitionRegistry在postProcessBeanFactory之前执行,看下ServiceAnnotationPostProcessor的实现
// postProcessBeanDefinitionRegistry方法比较简单,就是使用DubboClassPathBeanDefinitionScanner进行扫描
// DubboClassPathBeanDefinitionScanner继承了Spring的ClassPathBeanDefinitionScanner
// 所以本质上也是利用Spring自带的工具类扫描
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
this.registry = registry;
scanServiceBeans(resolvedPackagesToScan, registry);
}
// postProcessBeanFactory首先处理@Bean和@DubboService一起使用的场景,这种场景DubboClassPathBeanDefinitionScanner
// 扫描不到,需要特殊处理,然后通过processAnnotatedBeanDefinition方法注册BeanDefinition,最后再判断需不需要扫描,
// 低版本的Spring可能不会调用postProcessBeanDefinitionRegistry,所以需要在此方法中进行扫描
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
if (this.registry == null) {
// In spring 3.x, may be not call postProcessBeanDefinitionRegistry()
this.registry = (BeanDefinitionRegistry) beanFactory;
}
// scan bean definitions
String[] beanNames = beanFactory.getBeanDefinitionNames();
for (String beanName : beanNames) {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
Map<String, Object> annotationAttributes = getServiceAnnotationAttributes(beanDefinition);
if (annotationAttributes != null) {
// process @DubboService at java-config @bean method
processAnnotatedBeanDefinition(beanName, (AnnotatedBeanDefinition) beanDefinition, annotationAttributes);
}
}
if (!scanned) {
// In spring 3.x, may be not call postProcessBeanDefinitionRegistry(), so scan service class here
scanServiceBeans(resolvedPackagesToScan, registry);
}
}
看看scanServiceBeans方法的代码,只保留了核心逻辑
private void scanServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
scanned = true;
......
DubboClassPathBeanDefinitionScanner scanner =
new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
......
for (String packageToScan : packagesToScan) {
// Registers @Service Bean first
scanner.scan(packageToScan);
// Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
Set<BeanDefinitionHolder> beanDefinitionHolders =
findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);
if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
......
for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
processScannedBeanDefinition(beanDefinitionHolder);
......
}
} else {
......
}
......
}
}
主要就是通过DubboClassPathBeanDefinitionScanner扫描对应的包,然后调用processScannedBeanDefinition方法进行注册,和processAnnotatedBeanDefinition方法一样,最后都是通过buildServiceBeanDefinition先构建一个ServiceBean的BeanDefinition,然后将其注册到registry中,核心代码如下
private AbstractBeanDefinition buildServiceBeanDefinition(Map<String, Object> serviceAnnotationAttributes,
String serviceInterface,
String refServiceBeanName) {
BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class);
// 下面都是一些重复的属性设置,省略
......
}
private void registerServiceBeanDefinition(String serviceBeanName, AbstractBeanDefinition serviceBeanDefinition, String serviceInterface) {
// check service bean
if (registry.containsBeanDefinition(serviceBeanName)) {
BeanDefinition existingDefinition = registry.getBeanDefinition(serviceBeanName);
if (existingDefinition.equals(serviceBeanDefinition)) {
// exist equipment bean definition
return;
}
......
}
registry.registerBeanDefinition(serviceBeanName, serviceBeanDefinition);
......
}
ServiceBean实现了InitializingBean接口,实例初始化完之后会执行afterPropertiesSet方法
public void afterPropertiesSet() throws Exception {
if (StringUtils.isEmpty(getPath())) {
if (StringUtils.isNotEmpty(getInterface())) {
setPath(getInterface());
}
}
// 从applicationContext中获取ModuleModel实例,这个类主要用来处理服务发布
// 后续发布服务都要通过DubboBeanUtils.getModuleModel(applicationContext)来获取
ModuleModel moduleModel = DubboBeanUtils.getModuleModel(applicationContext);
moduleModel.getConfigManager().addService(this);
moduleModel.getDeployer().setPending();
}
主要就是将该实例添加到了ModuleModel的ConfigManager中,至此Service的扫描注册就完成了
DubboDeployApplicationListener主要用来处理Provider的服务发布(上面提到过),其实现了ApplicationListener<ApplicationContextEvent>接口,主要监听ApplicationContext相关的各种事件,比如Spring启动完成后会发布一个ContextRefreshedEvent事件
public void onApplicationEvent(ApplicationContextEvent event) {
if (nullSafeEquals(applicationContext, event.getSource())) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
}
同样DubboDeployApplicationListener也实现了ApplicationContextAware接口,实例化后通过setApplicationContext方法将ModuleModel设置到自己的属性当中用户后续服务发布
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
......
this.moduleModel = DubboBeanUtils.getModuleModel(applicationContext);
......
}
DubboDeployApplicationListener监听到ContextRefreshedEvent事件之后,调用onContextRefreshedEvent方法进行服务发布
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
// 通过ModuleModel获取发布器
ModuleDeployer deployer = moduleModel.getDeployer();
Assert.notNull(deployer, "Module deployer is null");
// 开始服务发布的流程
Future future = deployer.start();
......
}
最终调用DefaultModuleDeployer.startSync方法处理
private synchronized Future startSync() throws IllegalStateException {
......
try {
// new了一个CompletableFuture来以获取start的结果,并且发布一个start事件
onModuleStarting();
......
// export services
exportServices();
// refer services
referServices();
......
// 下面都是根据不同的发布结果,发布不同的事件的处理
if (asyncExportingFutures.isEmpty() && asyncReferringFutures.isEmpty()) {
onModuleStarted();
} else {
frameworkExecutorRepository.getSharedExecutor().submit(() -> {
try {
// wait for export finish
waitExportFinish();
// wait for refer finish
waitReferFinish();
} catch (Throwable e) {
logger.warn(CONFIG_FAILED_WAIT_EXPORT_REFER, "", "", "wait for export/refer services occurred an exception", e);
} finally {
onModuleStarted();
}
});
}
} catch (Throwable e) {
onModuleFailed(getIdentifier() + " start failed: " + e, e);
throw e;
}
return startFuture;
}
其中主要的步骤exportServices最终就是获取到所有的ServiceBean实例,调用其父类ServiceConfig.export()方法进行服务发布,可异步可同步
private void exportServiceInternal(ServiceConfigBase sc) {
ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
if (!serviceConfig.isRefreshed()) {
serviceConfig.refresh();
}
if (sc.isExported()) {
return;
}
if (exportAsync || sc.shouldExportAsync()) {
ExecutorService executor = executorRepository.getServiceExportExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
if (!sc.isExported()) {
sc.export();
exportedServices.add(sc);
}
} catch (Throwable t) {
logger.error(CONFIG_FAILED_EXPORT_SERVICE, "", "", "Failed to async export service config: " + getIdentifier() + " , catch error : " + t.getMessage(), t);
}
}, executor);
asyncExportingFutures.add(future);
} else {
if (!sc.isExported()) {
sc.export();
exportedServices.add(sc);
}
}
}
ServiceConfig.export()方法的代码就不仔细看了,最终就是调用下面的doExportUrl方法进行服务发布,如下
private void doExportUrl(URL url, boolean withMetaData) {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
if (withMetaData) {
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
Exporter<?> exporter = protocolSPI.export(invoker);
exporters.add(exporter);
}
先看下第一个入参URL的核心属性,这里先将注册模式设置成interface模式(2.x版本,3.x版本增加了instance模式,先把2.x的流程搞清楚,3.x不外乎就是往注册中心多写了些信息而已),如下
核心属性就是urlAddress和attributes中的export属性,分别是注册中心的地址和需要export的service,后续流程要使用proxyFactory和protocolSPI是代理和协议的适配工具类,都是动态生成的,主要作用就是根据具体的协议或者代理类型获取对应的Extension,生成的代码如下
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.ProxyFactory.class);
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.ProxyFactory.class);
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0, arg1);
}
public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.ProxyFactory.class);
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.Protocol.class);
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public java.util.List getServers() {
throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.Protocol.class);
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
proxyFactory生成了最原始的invoker(默认使用javassist,只对ref做了一些简单的代理)
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
try {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
} catch (Throwable fromJavassist) {
// try fall back to JDK proxy factory
try {
Invoker<T> invoker = jdkProxyFactory.getInvoker(proxy, type, url);
logger.error(PROXY_FAILED, "", "", "Failed to generate invoker by Javassist failed. Fallback to use JDK proxy success. " +
"Interfaces: " + type, fromJavassist);
// log out error
return invoker;
} catch (Throwable fromJdk) {
logger.error(PROXY_FAILED, "", "", "Failed to generate invoker by Javassist failed. Fallback to use JDK proxy is also failed. " +
"Interfaces: " + type + " Javassist Error.", fromJavassist);
logger.error(PROXY_FAILED, "", "", "Failed to generate invoker by Javassist failed. Fallback to use JDK proxy is also failed. " +
"Interfaces: " + type + " JDK Error.", fromJdk);
throw fromJavassist;
}
}
}
protocolSPI第一步获取类型为registry的protocol,获取到的其实是一个链表,链表中的每个元素依次进行export
直接看最后一个元素InterfaceCompatibleRegistryProtocol的export方法,这个类继承自RegistryProtocol,其实是调用RegistryProtocol的export方法,
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 获取注册中心的地址和需要export的的service地址
URL registryUrl = getRegistryUrl(originInvoker);
URL providerUrl = getProviderUrl(originInvoker);
......
// 发布本地服务,启动nettyserver
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 注册到注册中心
final Registry registry = getRegistry(registryUrl);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registry, registeredProviderUrl);
}
......
return new DestroyableExporter<>(exporter);
}
再来详细看看doLocalExport方法
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
调用protocol.export方法处理,这里的protocol其实是也是一个适配工具类,即上文中提到的protocolSPI,所有第一步也是去获取protocol的处理链表,只是这次获取的是类型为dubbo的protocol,如下
Filter链的构建在ProtocolFilterWrapper.export中构建
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
// 构建被Filter封装的invoker
FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
最后到DubboProtocol的export方法处理,此方法处理时invoker已经变成了封装好的具备Filter的invoker,结构如下图所示
export方法主要就是生成了serviceKey,serviceKey的格式为 groupName/interfacePath:version:port,并以此作为key,DubboExporter作为value放到了exporterMap(该逻辑在DubboExporter的构造函数中),最后通过openServer方法启动nettyserver
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
......
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
openServer(url);
...
return exporter;
}
再来看看具体创建server的地方,最终通过createServer方法启动
private ProtocolServer createServer(URL url) {
......
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
......
DubboProtocolServer protocolServer = new DubboProtocolServer(server);
loadServerProperties(protocolServer);
return protocolServer;
}
然后通过Exchangers.bind方法启动server,传入的requestHandler是请求的处理类,在DubboProtocol新建时初始化,用来处理消费端的请求
public DubboProtocol(FrameworkModel frameworkModel) {
requestHandler = new ExchangeHandlerAdapter(frameworkModel) {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
Invoker<?> invoker = inv.getInvoker() == null ? getInvoker(channel, inv) : inv.getInvoker();
// switch TCCL
if (invoker.getUrl().getServiceModel() != null) {
Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());
}
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
if (logger.isDebugEnabled()) {
logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
}
invoke(channel, ON_DISCONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
if (Boolean.TRUE.toString().equals(invocation.getAttachment(STUB_EVENT_KEY))) {
tryToGetStubService(channel, invocation);
}
received(channel, invocation);
} catch (Throwable t) {
logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", "Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
private void tryToGetStubService(Channel channel, Invocation invocation) throws RemotingException {
try {
Invoker<?> invoker = getInvoker(channel, invocation);
} catch (RemotingException e) {
String serviceKey = serviceKey(
0,
(String) invocation.getObjectAttachmentWithoutConvert(PATH_KEY),
(String) invocation.getObjectAttachmentWithoutConvert(VERSION_KEY),
(String) invocation.getObjectAttachmentWithoutConvert(GROUP_KEY)
);
throw new RemotingException(channel, "The stub service[" + serviceKey + "] is not found, it may not be exported yet");
}
}
/**
* FIXME channel.getUrl() always binds to a fixed service, and this service is random.
* we can choose to use a common service to carry onConnect event if there's no easy way to get the specific
* service this connection is binding to.
* @param channel
* @param url
* @param methodKey
* @return
*/
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(url.getServiceModel(), method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]);
invocation.setAttachment(PATH_KEY, url.getPath());
invocation.setAttachment(GROUP_KEY, url.getGroup());
invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
invocation.setAttachment(VERSION_KEY, url.getVersion());
if (url.getParameter(STUB_EVENT_KEY, false)) {
invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
this.frameworkModel = frameworkModel;
}
最后通过NettyTransporter启动了一个nettyserver
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
}
这样整个provider的启动流程就完成了
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。