我们在上篇文章【Trino源码学习】Trino源码剖析之plugin加载中,跟着代码一步步分析,梳理了trino在加载plugin的过程中,主要做了哪些事情。归纳总结起来就是,创建了每个plugin对应的PluginClassLoader以及InternalConnectorFactory,这个InternalConnectorFactory封装了每个plugin的ConnectorFactory。本文我们将继续跟着代码进行分析,看看trino在加载catalog的时候又做了哪些事情。
和上篇文章一样,为了防止阅读起来比较混乱,这里先用一个流程图对整个catalog的加载做一个整体的归纳总结:
在正式分析代码流程之前,先简单看下Connector相关的类图,如下所示:
我们在上篇文章中提到,Trino在加载plugin的时候,会对每个plugin创建一个ConnectorFactory,这个factory就会用于后续创建Connector。后续与配置的各个catalog进行交互的时候,就可以通过这个Connector进行。Connector可以分为两种:InternalConnector和其他的具体Connector,例如JdbcConnector、KuduConnector等,而Mysql、Oracle、Clickhouse等plugin使用的都是JdbcConnector;InternalConnector分为三种,分别是InformationSchemaConnector、GlobalSystemConnector和SystemConnector。关于这些Connector,笔者会在后续的文章中专门介绍,本文主要聚焦于catalog的加载流程。
在加载完成plugin之后,trino就会开始加载catalog。相关的函数堆栈如下所示:
doStart(Server.java):128
-loadCatalogs(StaticCatalogStore.java):68
--loadCatalog(StaticCatalogStore.java):88
---createCatalog(ConnectorManager.java):227
----createCatalog(ConnectorManager.java):241
-----createCatalog(ConnectorManager.java)
首先,会遍历etc/catalog目录下所有“.properties”结尾的文件,每一个文件代表一个catalog的配置文件,trino将文件的前缀作为catalog name,例如ch1.properties,对应的catalog name就是ch1,这与plugin是不一样的。接着读取配置文件中的内容,以“connector.name”这个配置项的值作为connector name。然后我们就可以使用该name去上一篇文章中提到的map中,获取指定的InternalConnectorFactory,如下所示:
//ConnectorManager.createCatalog():225
InternalConnectorFactory connectorFactory = connectorFactories.get(connectorName);
然后使用catalog name构造一个CatalogName,这个变量其实主要就是包含了catalog name:
public final class CatalogName {
private static final int INSTANCE_SIZE = ClassLayout.parseClass(CatalogName.class).instanceSize();
private static final String INFORMATION_SCHEMA_CONNECTOR_PREFIX = "$info_schema@";
private static final String SYSTEM_TABLES_CONNECTOR_PREFIX = "$system@";
private final String catalogName;
//省略其余代码
}
这里有两个带通配符的字符串:info_schema@和 system@,关于这两个字符串,与上面的几个InternalConnector也有关系,这里也先不展开描述,然后用这个CatalogName和InternalConnectorFactory来进行catalog的创建。
在最终的createCatalog函数中,首先会构造一个CatalogClassLoaderSupplier变量,该类主要包含以下成员:
private static class CatalogClassLoaderSupplier implements Supplier<ClassLoader> {
private final CatalogName catalogName;
private final Function<CatalogName, ClassLoader> duplicatePluginClassLoaderFactory;
private final HandleResolver handleResolver;
//省略其余代码
}
其中,catalogName就是上面创建的CatalogName变量;duplicatePluginClassLoaderFactory是我们在上篇文章中提到的lambda表达式,展开之后如下所示:
//演示代码,实际原代码中不存在
Function<CatalogName, ClassLoader> function = (CatalogName name) -> pluginClassLoader.duplicate(name);
这个duplicate函数的主要作用就是复制plugin的PluginClassLoader,但是与原先的PluginClassLoader不同的是,这里会传入新的CatalogName和URLs,CatalogName就是我们上面创建的变量,URLs其实就是plugin目录下的所有jar的路径;HandleResolver也是我们在上面文章中提到的,保存所有已创建的PluginClassLoader。
除了这三个成员变量之外,还有一个方法需要关注:
//ConnectorManager.CatalogClassLoaderSupplier
public ClassLoader get() {
ClassLoader classLoader = duplicatePluginClassLoaderFactory.apply(catalogName);
synchronized (this) {
this.classLoader = classLoader;
}
if (classLoader instanceof PluginClassLoader) {
handleResolver.registerClassLoader((PluginClassLoader) classLoader);
}
return classLoader;
}
这个方法会调用传入的Function,也就是上面提到的绑定的duplicate函数,根据传入的CatalogName构造一个新的PluginClassLoader,然后注册到HandleResolver。接着,用CatalogName、对应的具体的ConnectorFactory以及CatalogClassLoaderSupplier来创建具体的Connector。
创建Connector的方法如下所示:
private Connector createConnector(CatalogName catalogName, ConnectorFactory connectorFactory,
Supplier<ClassLoader> duplicatePluginClassLoaderFactory, Map<String, String> properties) {
ConnectorContext context = new ConnectorContextInstance(xx, duplicatePluginClassLoaderFactory);
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(connectorFactory.getClass().getClassLoader())) {
return connectorFactory.create(catalogName.getCatalogName(), properties, context);
}
}
首先创建一个ConnectorContext变量,ConnectorContextInstance是它的一个实现类。然后将当前线程的ClassLoader设置为ConnectorFactory的class loader,也就是plugin的class loader。最后再调用每个ConnectorFactory的create方法来创建具体的Connector。关于ConnectorContextInstance我们这里需要关注一个方法:
//ConnectorContextInstance.java
private final Supplier<ClassLoader> duplicatePluginClassLoaderFactory;
public ClassLoader duplicatePluginClassLoader() {
return duplicatePluginClassLoaderFactory.get();
}
这个方法会调用Supplier的get方法,这个Supplier就是我们上面介绍的CatalogClassLoaderSupplier,它的get方法就是复制并创建一个带CatalogName的PluginClassLoader。因此,调用这个duplicatePluginClassLoader()方法,最终会创建,并返回一个新的带CatalogName的PluginClassLoader。
最后,调用ConnectorFactory的create方法时,会将ConnectorContext作为参数传入。在create方法中调用上面的ConnectorContextInstance.duplicatePluginClassLoader()就会得到一个新的catalog的class loader,这个与plugin的class loader是不一样的。也就是说,trino在这里用了两层的class loader。
不过目前只有hive和iceberg是这样设计的,其他的plugin并没有这样设计,例如:
//HiveConnectorFactory.java
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context) {
ClassLoader classLoader = context.duplicatePluginClassLoader();
//省略后续代码
不过,目前笔者还没弄清楚为什么要对hive和iceberg这样设计。Trino的代码注释和commit message信息相对比较少,无法直接从这些信息获取一些代码架构和设计的意图,只能自己阅读源码进行学习。
创建完成具体的Connector(例如HiveConnector、JdbcConnector等)之后,会将这个Connector封装成为一个MaterializedConnector,这个类包含了一个connector相关的各种信息,如下所示:
//ConnectorManager.java
private static class MaterializedConnector {
private final CatalogName catalogName;
private final Connector connector;
private final Runnable afterShutdown;
private final Set<SystemTable> systemTables;
private final List<PropertyMetadata<?>> tableProperties;
private final List<PropertyMetadata<?>> materializedViewProperties;
private final List<PropertyMetadata<?>> schemaProperties;
private final List<PropertyMetadata<?>> columnProperties;
private final List<PropertyMetadata<?>> analyzeProperties;
除了CatalogName和Connector之外,还有一些table、schema、column等信息。
接着,再使用这个CatalogName创建InformationSchemaConnector和SystemConnector,这两个Connector都是属于InternalConnector,我们在上面的类图中可以看到。关于这两个Connector的作用,我们再后续会专门提到,这里暂不展开。然后再将这两个InternalConnector也封装为MaterializedConnector。最后将这三个MaterializedConnector作为参数,构造一个Catalog,这个类与MaterializedConnector类似,也是只包含了一些成员变量信息:
public class Catalog {
private final String catalogName;
private final CatalogName connectorCatalogName;
private final String connectorName;
private final Connector connector;
private final SecurityManagement securityManagement;
private final CatalogName informationSchemaId;
private final Connector informationSchema;
private final CatalogName systemTablesId;
private final Connector systemTables;
到这里,trino的catalog加载就已经介绍完成了。可以看到,这里最终创建的Catalog,跟Impala中的catalog是完全不一样的东西。Trino的catalog更像是一个catalog的context,包含了三个相关的MaterializedConnector,通过这些Connector可以跟配置的各个catalog进行交互。而Impala的catalog则是指从HMS加载上来的元数据信息,例如db、table、partition等。
通过上面代码流程梳理不难发现,整个catalog的加载过程也只是完成了一些Connector的创建,并没有实际与各个catalog进行交互。对于Connector与catalog交互的部分、InternalConnector的作用等,由于篇幅原因,不在本文展开说明,后续会专门再对Connector相关的代码进行学习和研究。同时,由于缺乏相关的代码注释和commit message,文章中的所有观点,都是笔者本人基于代码分析得出来的,如有错误,欢迎指正。