Transporter是一种用于在不同数据存储之间移动数据的开源工具。开发人员经常为诸如跨数据库移动数据,将数据从文件移动到数据库或反之亦然等任务编写一次性脚本,但使用像Transporter这样的工具有几个优点。
在Transporter中,您构建通道,这些通道定义从源(读取数据的位置)到接收器(写入数据的位置)的数据流。源和接收器可以是SQL或NoSQL数据库,flat 数据或其他数据。Transporter使用可插拔扩展的适配器与这些资源进行通信,默认情况下,该项目包括几个适用于常用数据库的适配器。
除了移动数据之外,Transporter还允许您在使用变换器通过通道时更改数据。与适配器一样,默认情况下包含多个变换器。您也可以编写自己的变换器来自定义数据修改。
在本教程中,我们将介绍使用Transporter的内置适配器和用JavaScript编写的自定义转换器将数据从MongoDB数据库移动和处理到Elasticsearch的示例。
要学习本教程,您需要:
sudo
命令的非root账户。Transporter为大多数常见操作系统提供二进制文件。Ubuntu的安装过程包括两个步骤:
首先,从GartHub上的Transporter项目页面获取最新版本的链接。复制以-linux-amd6
结尾的链接。本教程使用v0.5.2,这是编写本文时最新的版本。
将二进制文件下载到您的主目录中。
cd
wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64
将其移动到/usr/local/bin
或者您的安装目录中。
mv transporter-*-linux-amd64 /usr/local/bin/transporter
接下来赋予权限,让其可执行
chmod +x /usr/local/bin/transporter
您可以通过运行二进制文件来测试是否正确设置了Transporter
transporter
您会看到帮助输出和版本号:
USAGE
transporter <command> [flags]
COMMANDS
run run pipeline loaded from a file
. . .
VERSION
0.5.2
为了使用Transporter将数据从MongoDB移动到Elasticsearch,我们需要准备两个工作:MongoDB中有我们想要移动的数据和告诉Transporter如何移动它的通道。下一步创建一些示例数据,但如果您已经有一个想要移动的MongoDB数据库,则可以跳过下一步并直接进入步骤3。
在此步骤中,我们将在MongoDB中创建一个包含单个集合的示例数据库,并向该集合添加一些文档。然后,在本教程的其余部分中,我们将使用Transporter通道迁移和转换此示例数据。
首先,连接到MongoDB数据库。
mongo
这会将您的命令提示符会自动更改为mongo>
,表示您正在使用MongoDB shell。
从这里,选择要处理的数据库。我们为其命名为my_application
use my_application
在MongoDB
中,您不需要创建数据库或集合。一旦开始将数据添加到您按名称选择的数据库,就会自动创建该数据库。
因此,要创建数据库my\_application
,请将两个文档保存到users`集合中:一个代表Sammy Shark,一个代表Gilly Glowfish。这将是我们的测试数据。
db.users.save({"firstName": "Sammy", "lastName": "Shark"});
db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});
添加文档后,您可以查询集合users
以查看记录。
db.users.find().pretty();
输出看起来类似于下面的输出,但_id
列是不同的。MongoDB自动添加对象ID以唯一标识集合中的文档。
{
"_id" : ObjectId("59299ac7f80b31254a916456"),
"firstName" : "Sammy",
"lastName" : "Shark"
}
{
"_id" : ObjectId("59299ac7f80b31254a916457"),
"firstName" : "Gilly",
"lastName" : "Glowfish"
}
按CTRL+C
退出MongoDB shell。
接下来,让我们创建一个Transporter通道,将这些数据从MongoDB移动到Elasticsearch。
Transporter中的通道默认由命名为pipeline.js
的JavaScript文件来定义。在给定源和接收器的情况下,内置的init
命令在COR中创建基本配置文件。
使用MongoDB的pipeline.js
作为源,将Elasticsearch作为接收器。
transporter init mongodb elasticsearch
您将看到以下输出:
Writing pipeline.js...
这次您不需要修改pipeline.js
,但让我们看看它是如何工作的。
该文件看起来是这样,但你也可以通过cat pipeline.js,less pipeline.js
(按q
退出)命令来查看文件的内容)。
var source = mongodb({
"uri": "${MONGODB_URI}"
// "timeout": "30s",
// "tail": false,
// "ssl": false,
// "cacerts": ["/path/to/cert.pem"],
// "wc": 1,
// "fsync": false,
// "bulk": false,
// "collection_filters": "{}",
// "read_preference": "Primary"
})
var sink = elasticsearch({
"uri": "${ELASTICSEARCH_URI}"
// "timeout": "10s", // defaults to 30s
// "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service
// "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
// "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch
})
t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")
分别以var source
和var sink
开头MongoDB定义JavaScript变量和Elasticsearch适配器。我们将定义MONGODB\_URI
和ELASTICSEARCH\_URI
环境变量,以便在这一步里面适配器后续来使用。
//
开头的行是注释行。它们突出显示了您可以为通道设置的一些常见配置选项,这次我们默认不打开。
最后一行连接源和接收器。变量transporter
或t
让我们访问我们的通道。我们使用.Source()
和.Save()
函数在文件中增加源和接收器,这些源和接收器是提前在文件中用source
和 sink
变量定义的。
SoCube()
和SaveE()
函数的第三个参数是namespace
。传递/.*/
最后一个参数意味着我们希望将所有数据从MangGDB传输,并将其保存在RealStCype中的同一命名空间中。
在我们运行此通道之前,我们需要为MongoDB URI和Elasticsearch URI设置环境变量。在我们使用的示例中,两者都使用默认设置在本地托管,但如果您使用的是现有MongoDB或Elasticsearch实例,请确保自定义这些选项。
export MONGODB_URI='mongodb://localhost/my_application'
export ELASTICSEARCH_URI='http://localhost:9200/my_application'
现在我们准备好运行通道了。
transporter run pipeline.js
你会看到输出结束如下:
. . .
INFO[0001] metrics source records: 2 path=source ts=1522942118483391242
INFO[0001] metrics source/sink records: 2 path="source/sink" ts=1522942118483395960
INFO[0001] exit map[source:mongodb sink:elasticsearch] ts=1522942118483396878
在第二行和第三行到最后行中,该输出指示源中存在2条记录,并且2条记录被移动到接收器。
为了确认两个记录都被处理,您可以查询my_application
数据库的内容进行搜索,而MySQL应用程序数据库现在应该存在新的数据。
curl $ELASTICSEARCH_URI/_search?pretty=true
加?pretty=true参数使输出更易于阅读
{
"took" : 5,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e9c6687d9f638ced4fe",
"_score" : 1.0,
"_source" : {
"firstName" : "Gilly",
"lastName" : "Glowfish"
}
},
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e986687d9f638ced4fd",
"_score" : 1.0,
"_source" : {
"firstName" : "Sammy",
"lastName" : "Shark"
}
}
]
}
}
MongoDB中的数据库和集合类似于Elasticsearch中的索引和类型。考虑到这一点,你应该看到:
_index
字段转向my\_application
MongoDB数据库的名称。_type
字段转向users
MongoDB集合的名称。firstName
和 lastName
字段分别填写了"Sammy","Shark"和"Gilly""Glowfish"。这证实了来自MongoDB的记录都通过Transporter成功处理并加载到Elasticsearch。为了构建这个基本通道,我们将添加一个可以转换输入数据的中间处理步骤。
顾名思义,变换器在将源数据加载到接收器之前修改源数据。例如,它们允许您添加新字段,删除字段或更改字段的数据。Transporter附带一些预定义的变换器以及对定制变换器的支持。
通常,自定义转换器编写为JavaScript函数并保存在单独的文件中。要使用它们,请在pipeline.js
中添加对变换器文件的引用。Transporter包括Otto和Goja JavaScript引擎。因为Goja更新快,我们将在这里使用它。唯一的差异是语法。
创建一个名为transform.js
的文件,我们将用它来编写转换函数。
nano transform.js
下面是我们将使用的函数,它将创建一个名为FulnNew
的新字段,其功能将是firstName
和lastName
字段连接在一起 ,然后用空格分割,代码如下:
function transform(msg) {
msg.data.fullName = msg.data.firstName + " " + msg.data.lastName;
return msg
}
让我们来看看这个代码:
function transform(msg)
,是函数定义。MSG
是一个JavaScript对象,包含源文档的详细信息。我们使用这个对象来访问通过通道的数据。fullName
字段。MSG
对象,以便使用其余的通道。保存并关闭文件。
接下来,我们需要修改通道以使用此转换器。打开pipeline.js文件进行编辑。
nano pipeline.js
最后,我们需要给转换函数添加一个调用Transform()
,以将转换器添加到Source()
和Save()
之间的通道中,像这样
. . .
t.Source("source", source, "/.*/")
.Transform(goja({"filename": "transform.js"}))
.Save("sink", sink, "/.*/")
传递给Transform()
的参数是转换的类型,在这种情况下是Goja。使用goja
函数,我们使用其相对路径指定变换器的文件名。
保存并关闭文件。在我们重新运行通道以测试变换器之前,让我们从之前的测试中清除Elasticsearch中的现有数据。
curl -XDELETE $ELASTICSEARCH_URI
您将看到正确的输出。
{"acknowledged":true}
现在重新运行通道。
transporter run pipeline.js
输出看起来与之前的测试非常相似,您可以在最后几行看到通道是否像以前一样成功完成。我们可以再次运行Elasticsearch查看数据是否是我们想要的格式。
curl $ELASTICSEARCH_URI/_search?pretty=true
您可以在新输出中看到fullName字段:
{
"took" : 9,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e9c6687d9f638ced4fe",
"_score" : 1.0,
"_source" : {
"firstName" : "Gilly",
"fullName" : "Gilly Glowfish",
"lastName" : "Glowfish"
}
},
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e986687d9f638ced4fd",
"_score" : 1.0,
"_source" : {
"firstName" : "Sammy",
"fullName" : "Sammy Shark",
"lastName" : "Shark"
}
}
]
}
}
fullName
已在两个文档中添加了正确设置值的字段。现在我们知道如何向Transporter管道添加自定义转换。
您已经构建了一个带有转换器的基本Transporter通道,用于将数据从MongoDB复制和修改到Elasticsearch。您可以以相同的方式应用更复杂的转换,在同一通道中链接多个转换等等。MongoDB和Elasticsearch只是Transporter支持的两个适配器。它还支持flat 数据或Postgres等SQL数据库以及许多其他数据源。
购买两台服务器试试吧:https://cloud.tencent.com/product/cvm,很简单哦~
参考文献:《How To Sync Transformed Data from MongoDB to Elasticsearch with Transporter on Ubuntu 16.04 》
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。