流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
本文将您详细介绍如何将数据写入 MongoDB。
进入 Oceanus 控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群。
进入 MongoDB 控制台,点击左上角【新建实例】创建实例,具体参考 创建 MongoDB 实例。作者这里使用 shell 的方式,下载 MongoDB 客户端的方式连接数据库,更多连接信息请参考 连接 MongoDB 实例
## 安装 MongoDB 客户端
wget https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel70-XX.XX.XX.tgz
## 解压
tar zxvf mongodb-linux-x86_64-rhel70-XX.XX.XX.tgz
## 进入目录
cd mongodb-linux-x86_64-rhel70-XX.XX.XX
## 连接客户端
./bin/mongo -umongouser -p***** 172.xx.xx.xx:27017/admin
请注意选择与云数据库 MongoDB 服务并与 CVM 操作系统相匹配的版本
CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- 每秒产生的数据条数
);
CREATE TABLE mongodb (
id INT,
name STRING
) WITH (
'connector' = 'mongodb', -- 固定值 'mongodb'
'database' = 'testdb', --数据库名
'collection' = 'test1', --数据集合
'uri' = 'mongodb://mongouser:******@xx.xx.xx.xx:27017/admin', -- MongoDB连接串
'batchSize' = '5' -- 每次批量写入的条数
);
INSERT INTO mongodb
SELECT * from datagen_source_table;
本实例演示如何使用 Datagen 生成随机数据,然后使用 MongoDB Sink 连接器将数据写入 MongoDB。
目前仅 Flink 1.13 支持 Sink 端写入,其他版本暂不支持。undefinedMongoDB Sink 暂不支持 Upsert。undefinedMongoDB 的 User 必须拥有 database 的写权限。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。