近期在工作中需要用到DataX去作为公司内部的数据同步引擎,特花了一些时间研究了DataX的整体架构和设计思想,从中吸收了很多优秀的设计思路,作为一款纯Java实现的数据同步工具,相对于市面上已存在的基于大数据框架为背景的数据同步工具有着易部署、易扩展的优点,但不足的地方是alibaba只是开源了DataX单机模式代码,并未开源分布式部分代码,目前在Github中的只是阉割版是DataX,对此我表示很遗憾。
DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能,项目地址:https://github.com/alibaba/DataX
❝以上摘抄自DataX项目首页README ❞
DataX作为一款高性能的异构数据源同步工具,要解决的第一个难点是异构数据基本单位的统一抽象,为此,DataX将每一条数据全部抽象为Record
,每一条Record
中包含了多个Column
,具体代码实现如下:
Record.java
// 数据抽象接口,一条数据将具有以下特性
public interface Record {
// 增加一列
public void addColumn(Column column);
// 设置某一列
public void setColumn(int i, final Column column);
// 获取某一列
public Column getColumn(int i);
public String toString();
// 获取列数
public int getColumnNumber();
// 获取数据占用字节数,单位为byte
public int getByteSize();
// 获取数据占用内存数,指的是对象头占用的内存,单位为byte
public int getMemorySize();
}
Column.java
// 列抽象类
public abstract class Column {
// 列类型
private Type type;
// 列值
private Object rawData;
// 列占用的字节数
private int byteSize;
public Column(final Object object, final Type type, int byteSize) {
this.rawData = object;
this.type = type;
this.byteSize = byteSize;
}
public Object getRawData() {
return this.rawData;
}
public Type getType() {
return this.type;
}
public int getByteSize() {
return this.byteSize;
}
protected void setType(Type type) {
this.type = type;
}
protected void setRawData(Object rawData) {
this.rawData = rawData;
}
protected void setByteSize(int byteSize) {
this.byteSize = byteSize;
}
public abstract Long asLong();
public abstract Double asDouble();
public abstract String asString();
public abstract Date asDate();
public abstract byte[] asBytes();
public abstract Boolean asBoolean();
public abstract BigDecimal asBigDecimal();
public abstract BigInteger asBigInteger();
@Override
public String toString() {
return JSON.toJSONString(this);
}
public enum Type {
BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES
}
}
在基于这一层抽象的基础上,DataX对于Record
有三种不同的实现,分别是:
image-20220426150914034
image-20220426151041245
image-20220426151133210
以上便是DataX中基本的传输子单位,无论是reader插件和writer插件都是将原始数据源的每一条数据转换成为以上的对象。
在了解完DataX中基本的传输单位,接下来去看一看一个DataX任务究竟是怎样的一个流程被启动起来的,从官网得知,要启动一个简单的数据任务,要经历以下二步:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 1,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
python datax.py job.json
在扒了datax.py的源码之后,发现使用python去进行启动的作用是快速构建了DataX的启动命令,尤其是补充了一些有用的jvm参数,实际上最后python拼接出来的命令如下:
java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/home/tyrantlucifer/IdeaProjects/DataX/target/datax/datax/log -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/home/tyrantlucifer/IdeaProjects/DataX/target/datax/datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=/home/tyrantlucifer/IdeaProjects/DataX/target/datax/datax -Dlogback.configurationFile=/home/tyrantlucifer/IdeaProjects/DataX/target/datax/datax/conf/logback.xml -classpath /home/tyrantlucifer/IdeaProjects/DataX/target/datax/datax/lib/*:. -Dlog.file.name=b_stream2stream_json com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job /home/tyrantlucifer/IdeaProjects/DataX/target/datax/datax/job/stream2stream.json
我们根据这条命令就会发现入口的主类:com.alibaba.datax.core.Engine
image-20220426160234337
由上图可知,一切的一切都是从Engine.entry(args)
这个方法开始,接下来分析一下这个方法究竟做了哪些工作:
image-20220426160144350
ConfigParser.parse(jobPath)
,往配置中加入系统默认项,用户只提供job
部分,补上其余common
core
entry
job
plugin
配置项 {
"common": {
"column": {
"dateFormat": "yyyy-MM-dd",
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"encoding": "utf-8",
"extraFormats": [
"yyyyMMdd"
],
"timeFormat": "HH:mm:ss",
"timeZone": "GMT+8"
}
},
"core": {
"container": {
"job": {
"id": -1,
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}
},
"dataXServer": {
"address": "http://localhost:7001/api",
"reportDataxLog": false,
"reportPerfLog": false,
"timeout": 10000
},
"statistics": {
"collector": {
"plugin": {
"maxDirtyNumber": 10,
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector"
}
}
},
"transport": {
"channel": {
"byteCapacity": 67108864,
"capacity": 512,
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"flowControlInterval": 20,
"speed": {
"byte": -1,
"record": -1
}
},
"exchanger": {
"bufferSize": 32,
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger"
}
}
},
"entry": {
"jvm": "-Xms1G -Xmx1G"
},
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
},
"plugin": {
"reader": {
"streamreader": {
"class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",
"description": {
"mechanism": "use datax framework to transport data from stream.",
"useScene": "only for developer test.",
"warn": "Never use it in your real job."
},
"developer": "alibaba",
"name": "streamreader",
"path": "/home/tyrantlucifer/IdeaProjects/DataX/target/datax/datax/plugin/reader/streamreader"
}
},
"writer": {
"streamwriter": {
"class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter",
"description": {
"mechanism": "use datax framework to transport data to stream.",
"useScene": "only for developer test.",
"warn": "Never use it in your real job."
},
"developer": "alibaba",
"name": "streamwriter",
"path": "/home/tyrantlucifer/IdeaProjects/DataX/target/datax/datax/plugin/writer/streamwriter"
}
}
}
}
image-20220426160755100
image-20220426160809559
image-20220426161002583
image-20220426161140289
此时程序进入到了Engine.start(configuration)的执行流程,在这一步中经历以下环节:
image-20220426162444100
image-20220426162716387
core.container.model
是job还是jobGroup,如果是job,那么实例化JobContainer
,如果是jobGroup实例化TaskGroupContainer
image-20220426163207308
image-20220426163804421
image-20220426163829
image-20220426164056
任务容器被启动后,会执行任务生命周期的每一个阶段
job.setting.speed.byte
,如果是,根据字节限制配置计算channel数量job.setting.speed.record
,如果是,根据条数限制配置计算channel数量job.setting.speed.channel
来确定channel数量job.content
中实际上我认为如果是一个数据任务执行的话,1和6阶段完全不必要存在,但是如果是任务之间DAG调度,这两个阶段就可以派上用场,是否阿里在这里阉割了DAG调度的功能,咱也不知道,就在这瞎猜猜而已
image-20220426175346192
从代码逻辑上看,DataX的代码流程是比较清晰且设计思路明确的,之所以它能够实现高效的异构数据源同步工作,总共实现了以下这么几点:
下篇文章将对DataX的调度流程做一个详细的剖析,敬请期待,我们下期再见!
本文分享自 Tyrant Lucifer 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!