JobContainer
的start方法,最后一步调用的是invokeHooks
,这个方法就是datax的自定义hook被调用的地方。datax的hook提供了一种机制,可以让开发者再任务执行完成后做一些定制化的事情,比如给任务的负责人发送一条短信提醒之类的。
我们顺着invokeHooks
方法来分析下,
private void invokeHooks() {
Communication comm = super.getContainerCommunicator().collect();
//配置也要传过去,因为你实现的hook可能需要一些参数,比如accessKey等,配置的格式是可以自己定义的
HookInvoker invoker = new HookInvoker(CoreConstant.DATAX_HOME + "/hook", configuration, comm.getCounter());
invoker.invokeAll();
}
HookInvoker是datax实现hook机制的一个管理类,我们看到它接受三个参数,一个是目录,这里存放的是datax主目录+/hook。第二个参数是配置,因为你自己实现的hook可能需要一些外部配置的参数,比如你要实现发送短信功能,可能会把一些api密钥之类的东西放在配置里传进去。第三个参数是用来统计的。
接着看下invokeAll
方法,
public void invokeAll() {
if (!baseDir.exists() || baseDir.isFile()) {
LOG.info("No hook invoked, because base dir not exists or is a file: " + baseDir.getAbsolutePath());
return;
}
//过滤子目录保存在subDirs
String[] subDirs = baseDir.list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return new File(dir, name).isDirectory();
}
});
//没有子目录,不符合hook目录要求
if (subDirs == null) {
throw DataXException.asDataXException(FrameworkErrorCode.HOOK_LOAD_ERROR, "获取HOOK子目录返回null");
}
/**
* 子目录的格式示例:
* META-INF/services/com.example.CodecSet
*/
for (String subDir : subDirs) {
doInvoke(new File(baseDir, subDir).getAbsolutePath());
}
}
首先是做一些基本的检查,确保目录结构是合法的。接着扫描给定目录的所有一级子目录,每个子目录当作一个Hook的目录。对于每个子目录,必须符合ServiceLoader
的标准目录格式
hook的目录结构看起来类似这个样子:
简单来讲,ServiceLoader实现了一种机制,可以动态加载指定目录的实现类并且实例化,它是java SPI机制的重要组成部分。
Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制,提供了通过interface寻找implement的方法。类似于IOC的思想,将装配的控制权移到程序之外,从而实现解耦。如下图所示:
继续看doInvoke
方法,
private void doInvoke(String path) {
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
try {
JarLoader jarLoader = new JarLoader(new String[]{path});
Thread.currentThread().setContextClassLoader(jarLoader);
//这里通过 ServiceLoader 机制加载Hook的实现类
Iterator<Hook> hookIt = ServiceLoader.load(Hook.class).iterator();
if (!hookIt.hasNext()) {
LOG.warn("No hook defined under path: " + path);
} else {
Hook hook = hookIt.next();
LOG.info("Invoke hook [{}], path: {}", hook.getName(), path);
hook.invoke(conf, msg);//调用实际的hook实现
}
} catch (Exception e) {
LOG.error("Exception when invoke hook", e);
throw DataXException.asDataXException(
CommonErrorCode.HOOK_INTERNAL_ERROR, "Exception when invoke hook", e);
} finally {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
}
这里首先是把类加载器设置成jar包的加载器,这是继承的应用加载器,然后使用ServiceLoader
加载用户自己是实现的Hook接口的实现类并进行实例化。实例化后调用invoke方法执行自定义的逻辑。Hook
接口的定义如下:
/**
* Created by xiafei.qiuxf on 14/12/17.
* DataX 的 Hook 机制,这里定义了开放的接口
* https://xie.infoq.cn/article/68102f356019f52560f4b8c70
*/
public interface Hook {
/**
* 返回名字
*
* @return
*/
public String getName();
/**
* TODO 文档
*
* @param jobConf
* @param msg
*/
public void invoke(Configuration jobConf, Map<String, Number> msg);
}
我们自己的实现类示例:
public class SMSReport implements Hook {
@Override
public String getName() {
return "SMSReportHook";
}
@Override
public void invoke(Configuration configuration, Map<String, Number> map) {
//调用第三方的短信api发送短信
....
}
实现类写完后我们打成jar包,在 DataX 根目录下,新建hook文件夹,然后在其下创建sms文件夹,将jar包放在这里就可以了。
我们来总结下:
datax提供了一种Hook
机制,可以在执行完核心逻辑后触发一个开发者自己定义的逻辑。实现的原理是利用了java SPI机制,datax定义了一个Hook
接口,开发者实现这个接口。通过interface寻找implement的方法。
参考: