目录:
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。 由于官网的例子是基于python的例子,网上也很少可以找到java版本的,然后自己刚好做过,记录一下,我搜了一下,我应该是全网第一篇写的datax最详细的文章。
https://github.com/alibaba/Data
点击下载就好了
下载的压缩文件解压,在lib目录下将这两个依赖安装到本地
将这个两个依赖安装到本地maven仓库
在项目引入这两个依赖
<dependency>
<groupId>com.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.datax</groupId>
<artifactId>datax-common</artifactId>
<version>0.0.1</version>
</dependency>
同时也需要引入下面这几个依赖,否则会报错
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
在resource目录下新建一个datax目录,在datax目录下新建test.json文件。
test.json:
{
"job": {
"setting": {
"speed": {
"channel": 4
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test"],
"querySql": ["select t.id,t.name,t.status from users t"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"writeMode": "insert",
"column": ["id","name","status"],
"connection": [
{
"table": [
"temp_users"
],
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test"
}
]
}
}
}
]
}
}
我这是自己本地的mysql数据库进行数据同步的测试
public class TestMain {
public static String getCurrentClasspath(){
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
String currentClasspath = classLoader.getResource("").getPath();
// 当前操作系统
String osName = System.getProperty("os.name");
if (osName.startsWith("Win")) {
// 删除path中最前面的/
currentClasspath = currentClasspath.substring(1, currentClasspath.length()-1);
}
return currentClasspath;
}
public static void main(String[] args) {
System.setProperty("datax.home","D:\\datax\\datax");
String[] datxArgs2 = {"-job", getCurrentClasspath()+"/datax/test.json", "-mode", "standalone", "-jobid", "-1"};
try {
Engine.entry(datxArgs2);
} catch (Throwable e) {
e.printStackTrace();
}
}
}
运行结果:
数据同步成功。
相信大家在做数据同步的时候,肯定不是简单的sql,一般还有条件的,也就是参数,那参数要怎么传进去呢?
test.json: 改成一个接收参数的方式
我是将id为多少的数据同步过去select t.id,t.name,t.status from users t where t.id=${id}
{
"job": {
"setting": {
"speed": {
"channel": 4
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test"],
"querySql": ["select t.id,t.name,t.status from users t where t.id=${id}"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"writeMode": "insert",
"column": ["id","name","status"],
"connection": [
{
"table": [
"temp_users"
],
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test"
}
]
}
}
}
]
}
}
测试类就应该这么写:
参数值已经成功的注入进来了
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有