前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >【Apache Doris】Stream Load 最佳实践指南

【Apache Doris】Stream Load 最佳实践指南

作者头像
一臻数据
发布2024-12-24 16:17:24
发布2024-12-24 16:17:24
21510
代码可运行
举报
文章被收录于专栏:一臻数据一臻数据
运行总次数:0
代码可运行

Doris 的导入方式比较多,一般情况下每个场景都有对应的数据导入方式,比如Streamload、HdfsLoad(逐渐替换BrokerLoad)、RoutineLoad、MySqlLoad等。 其中大家用的最多的可能是StreamLoad的方式,因为一般用doris flink connector 、doris spark connector、datax等进行数据同步时,底层都是走streamload。由于 spark doris connnector/flink doris connnector/datax 底层都是走的streamload,所以遇到的导入报错情况也基本一致。 本文将一起学习Doris的Stream Load最佳实践指南。

导入报错梳理实践

1. 分区没有提前创建Schema

代码语言:javascript
代码运行次数:0
运行
复制
CREATE TABLE IF NOT EXISTS tb_dynamic_partition_test2(
    `sid` LARGEINT NOT NULL COMMENT "学生id",
    `name` VARCHAR(50) NOT NULL COMMENT "学生名字",
    `class` INT COMMENT "学生所在班级",
    `age` SMALLINT COMMENT "学生年龄",
    `sex` TINYINT COMMENT "学生性别",
    `phone` LARGEINT COMMENT "学生电话",
    `address` VARCHAR(500)  NOT NULL COMMENT "学生家庭地址",
    `date` DATETIME NOT NULL COMMENT "数据录入时间"
)
ENGINE=olap
DUPLICATE KEY (`sid`,`name`)
PARTITION BY RANGE(`date`)()
DISTRIBUTED BY HASH (`sid`) BUCKETS 4
PROPERTIES
(
"dynamic_partition.enable"="true", -- 开启动态分区
"dynamic_partition.start"="-3", -- 保留前三天的分区
"dynamic_partition.end"="1", -- 往后创建一个分区
"dynamic_partition.time_unit"="DAY", -- 按天分区
"dynamic_partition.prefix"="p_", -- 分区字段以p_开始
"dynamic_partition.replication_num"="1", -- 动态分区中的副本数指定为1
"dynamic_partition.buckets"="4" -- 动态分区中的分桶数量为 4 
);
代码语言:javascript
代码运行次数:0
运行
复制
-- data:
1,lisi,1001,18,1,1008610010,beijing,2024-04-26

Streamload:

代码语言:javascript
代码运行次数:0
运行
复制
curl --location-trusted -u root -H "column_separator:,"  -T /mnt/disk2/test.csv http://ip:8030/api/test/tb_dynamic_partition_test2/_stream_load

ERROR:

代码语言:javascript
代码运行次数:0
运行
复制
curl http://ip:8040/api/_load_error_log?file=__shard_6/error_log_insert_stmt_974c58560ccd1a48-f470414c963ae092_974c58560ccd1a48_f470414c963ae092

Reason: no partition for this tuple. tuple=+---------------+---------------+----------------------+--------------------+-------------------+-----------------------+---------------+----------------+
|sid(Int128)    |name(String)   |class(Nullable(Int32))|age(Nullable(Int16))|sex(Nullable(Int8))|phone(Nullable(Int128))|address(String)|date(DateTimeV2)|
+---------------+---------------+----------------------+--------------------+-------------------+-----------------------+---------------+----------------+
|              1|           lisi|                  1001|                  18|                  1|             1008610010|        beijing|2024-04-26 00...|
+---------------+---------------+----------------------+--------------------+-------------------+-----------------------+---------------+----------------+
8 rows in block, only show first 1 rows.

处理方式,添加对应分区:

代码语言:javascript
代码运行次数:0
运行
复制
// 关闭动态分区
ALTER TABLE tb_dynamic_partition_test2 SET ("dynamic_partition.enable" = "false");
// 添加分区
ALTER TABLE test.tb_dynamic_partition_test2
ADD PARTITION p_20240426 VALUES [("2024-04-26 00:00:00"), ("2024-04-27 00:00:00")) ("replication_num"="1");
// 打开动态分区
ALTER TABLE tb_dynamic_partition_test2 SET ("dynamic_partition.enable" = "true");]

正常导入数据:

总结

如果是数据找到不到对应分区,可以先排查分区是否创建,或者补分区、修改分区策略,保证数据在分区范围内。

2.数据和字段类型不匹配Schema

代码语言:javascript
代码运行次数:0
运行
复制
-- table schema:
CREATE TABLE IF NOT EXISTS test(
    `sid` LARGEINT NOT NULL COMMENT "学生id",
    `name` VARCHAR(5) NOT NULL COMMENT "学生名字",
    `class` INT COMMENT "学生所在班级",
    `age` SMALLINT COMMENT "学生年龄",
    `sex` TINYINT COMMENT "学生性别",
    `phone` LARGEINT COMMENT "学生电话",
    `address` VARCHAR(5)  NOT NULL COMMENT "学生家庭地址",
    `date` DATETIME NOT NULL COMMENT "数据录入时间"
)
ENGINE=olap
DUPLICATE KEY (`sid`,`name`)
DISTRIBUTED BY HASH (`sid`) BUCKETS 4
PROPERTIES
(
"replication_num"="1"
);
代码语言:javascript
代码运行次数:0
运行
复制
-- 数据:
1,lisixxxxxxxxxxxxxxxxxxxx,1001,18,1,1008610010,bj,2024-04-26

Streamload:

代码语言:javascript
代码运行次数:0
运行
复制
curl --location-trusted -u root -H "column_separator:,"  -T /mnt/disk2/liyuanyuan/data/test.csv http://10.16.10.6:18739/api/test/test/_stream_load

ERROR:

代码语言:javascript
代码运行次数:0
运行
复制
error:
curl http://10.16.10.6:18749/api/_load_error_log?file=__shard_0/error_log_insert_stmt_e743756cd8da0bf9-47820b98b0cae2b0_e743756cd8da0bf9_47820b98b0cae2b0
Reason: column_name[name], the length of input is too long than schema. first 32 bytes of input str: [lisixxxxxxxxxxxxxxxxxxxx] schema length: 5; actual length: 24; . src line [];

从报错来看,是 name 字段导入的数据长度大于字段类型的长度

处理方式,参考:

ALTER-TABLE-COLUMN - Apache Doris-- 修改列长度

代码语言:javascript
代码运行次数:0
运行
复制
ALTER TABLE test.test MODIFY COLUMN name VARCHAR(50) KEY ;

导入成功:

3. 导入列和schema 列不对应Schema

代码语言:javascript
代码运行次数:0
运行
复制
-- table schema
CREATE TABLE IF NOT EXISTS test2(
    `sid` LARGEINT NOT NULL COMMENT "学生id",
    `name` VARCHAR(50) NOT NULL COMMENT "学生名字",
    `class` INT COMMENT "学生所在班级",
    `age` SMALLINT COMMENT "学生年龄",
    `sex` TINYINT COMMENT "学生性别",
    `phone` LARGEINT COMMENT "学生电话",
    `address` VARCHAR(50)  NOT NULL COMMENT "学生家庭地址",
    `date` DATETIME NOT NULL COMMENT "数据录入时间"
)
ENGINE=olap
DUPLICATE KEY (`sid`,`name`)
DISTRIBUTED BY HASH (`sid`) BUCKETS 4
PROPERTIES
(
"replication_num"="1"
);
代码语言:javascript
代码运行次数:0
运行
复制
--data
1,xxxxxxxxxxxxxxxxxxxxxxx,1001,18,1,1008610010,beijing,2024-04-26,test_column

Streamload:

代码语言:javascript
代码运行次数:0
运行
复制
curl --location-trusted -u root -H "column_separator:,"  -T /mnt/disk2/liyuanyuan/data/test2.csv http://10.16.10.6:18739/api/test/test2/_stream_load

ERROR:

代码语言:javascript
代码运行次数:0
运行
复制
Reason: actual column number in csv file is  more than  schema column number.actual number: 9, schema column number: 8; line delimiter: [
], column separator: [,], result values:[1, xxxxxxxxxxxxxxxxxxxxxxx, 1001, 18, 1, 1008610010, beijing, 2024-04-26, test_column, ]. src line [1,xxxxxxxxxxxxxxxxxxxxxxx,1001,18,1,1008610010,beijing,2024-04-26,test_column];

处理方式,参考:

ALTER-TABLE-COLUMN - Apache Doris-- 添加列

代码语言:javascript
代码运行次数:0
运行
复制
ALTER TABLE test.test2 ADD COLUMN new_col varchar(50) ;

导入成功:

4.csv中含有特殊字符导入失败

比如:flink/spark to doris 使用csv举例子:以下图为例子,有时候在进行数据同步的时候会遇到一些问题,比如 表schema 的字段是固定的32个,但是实际列数小于schema列数,甚至有可能是变动的,这种情况一般是数据中有分隔符导致的,可以考虑换成json格式。

Flink Flink Doris Connector - Apache Doris

Spark Spark Doris Connector - Apache Doris

代码语言:javascript
代码运行次数:0
运行
复制
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");

含包围符数据导入

1. 包围符数据导入

Schema:

代码语言:javascript
代码运行次数:0
运行
复制
CREATE TABLE IF NOT EXISTS test3(
    `sid` LARGEINT NOT NULL COMMENT "学生id",
    `name` VARCHAR(50) NOT NULL COMMENT "学生名字",
    `class` INT COMMENT "学生所在班级",
    `age` SMALLINT COMMENT "学生年龄",
    `sex` TINYINT COMMENT "学生性别",
    `phone` LARGEINT COMMENT "学生电话",
    `address` VARCHAR(50)  NOT NULL COMMENT "学生家庭地址",
    `date` DATETIME NOT NULL COMMENT "数据录入时间"
)
ENGINE=olap
DUPLICATE KEY (`sid`,`name`)
DISTRIBUTED BY HASH (`sid`) BUCKETS 4
PROPERTIES
(
"replication_num"="1"
);
代码语言:javascript
代码运行次数:0
运行
复制
--data
"1","xxxxxxx","1001","18","1","1008610010","beijing","2024-04-26"

Streamload:

代码语言:javascript
代码运行次数:0
运行
复制
curl --location-trusted -u root -H "column_separator:,"  -H "enclose:\"" -H "trim_double_quotes:true"  -T /mnt/disk2/liyuanyuan/data/test3.csv http://10.16.10.6:18739/api/test/test3/_stream_load

参考Streamload Stream Load - Apache Doris:

代码语言:javascript
代码运行次数:0
运行
复制
enclose:指定包围符。
trim_double_quotes:为 true 时裁剪掉 CSV 文件每个字段最外层的双引号。

处理方式:

2. 部分数据有包围符

Schema:

代码语言:javascript
代码运行次数:0
运行
复制
CREATE TABLE IF NOT EXISTS test4(
    `sid` LARGEINT NOT NULL COMMENT "学生id",
    `name` VARCHAR(50) NOT NULL COMMENT "学生名字",
    `class` INT COMMENT "学生所在班级",
    `age` SMALLINT COMMENT "学生年龄",
    `sex` TINYINT COMMENT "学生性别",
    `phone` LARGEINT COMMENT "学生电话",
    `address` VARCHAR(50)  NOT NULL COMMENT "学生家庭地址",
    `date` DATETIME NOT NULL COMMENT "数据录入时间"
)
ENGINE=olap
DUPLICATE KEY (`sid`,`name`)
DISTRIBUTED BY HASH (`sid`) BUCKETS 4
PROPERTIES
(
"replication_num"="1"
);
代码语言:javascript
代码运行次数:0
运行
复制
--data 部分数据有包围符,包围符中的数据有和列分隔符相同的分隔符
"1","xx,x,x,xxx",1001,18,"1",1008610010,"bei,jing",2024-04-26

Streamload:

代码语言:javascript
代码运行次数:0
运行
复制
curl --location-trusted -u root -H "column_separator:,"  -H "enclose:\"" -H "trim_double_quotes:true"  -T /mnt/disk2/liyuanyuan/data/test4.csv http://10.16.10.6:18739/api/test/test4/_stream_load

处理方式参考:Streamload Stream Load - Apache Doris:

代码语言:javascript
代码运行次数:0
运行
复制
enclose:指定包围符。
trim_double_quotes:为 true 时裁剪掉 CSV 文件每个字段最外层的双引号。

列名含有特殊字符

Schema:

代码语言:javascript
代码运行次数:0
运行
复制
CREATE TABLE loadtest
(
    `@col` DATETIME NOT NULL COMMENT "时间",
    `colint` INT NOT NULL COMMENT "int",
    `colvar` INT COMMENT "字符串"
)
DUPLICATE KEY(`@col`, `colint`)
DISTRIBUTED BY HASH(`@col`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1");data:2024-05-14 20:00:00,1,1

Streamload:

代码语言:javascript
代码运行次数:0
运行
复制
curl --location-trusted -u "root:" -T bb -H "format:csv" -H "column_separator:,"  -H 'columns:`@col`,colint,colvar' http://10.16.10.x:18739/api/test/loadtest/_stream_load

说明

需要将 -H "columns:@coltime,colint,colvar" 改成单引号 + 反引号就可以,因为双引号 curl 会转译

Windows 换行符问题

如果导入windows 数据后查询有问题,出现类似 select * from table where col = "xxx" 查不到数据,但实际上 col字段 xxx 数据是存在的,这种情况就要考虑是否是因为最后一列多出了 \r 。

排查方式

od -c test_data.csv 查看是否有\r\n 存在

处理方式:

导入数据时候指定换行符为 \r\n:-H "line_delimiter:\r\n"

Streamload 表达式写法

Demo 1:

Schema:

代码语言:javascript
代码运行次数:0
运行
复制
CREATE TABLE test.test_streamload(
    user_id            BIGINT       NOT NULL COMMENT "用户 ID",
    name               VARCHAR(20)           COMMENT "用户姓名",
    age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
properties (
"replication_allocation" = "tag.location.default: 1"
);

-- data:1,xxx,18

Streamload:

代码语言:javascript
代码运行次数:0
运行
复制
curl --location-trusted -u "root:" -T aa -H "format:csv" -H "column_separator:," -H "columns:user_id,tmp,age,name=upper(tmp)" http://10.16.10.6:18739/api/test/loadtest/_stream_load

Demo2 :

Schema:

代码语言:javascript
代码运行次数:0
运行
复制
CREATE TABLE test.test_streamload2(
    c1            INT,
    c2               INT,
    c3               VARCHAR(20)
)
DUPLICATE KEY(c1)
DISTRIBUTED BY HASH(c1) BUCKETS 10
properties (
"replication_allocation" = "tag.location.default: 1"
);

-- data:1,2,ab,cd,ef

Streamload:

代码语言:javascript
代码运行次数:0
运行
复制
curl --location-trusted -u "root:" -T aa -H "format:csv" -H "column_separator:," -H "columns:c1,c2,A,B,C,c3=CONCAT(A,B,C)" http://127.0.0.1:8030/api/test/test_streamload2/_stream_load
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-11-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一臻数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 导入报错梳理实践
    • 1. 分区没有提前创建Schema
    • 2.数据和字段类型不匹配Schema
    • 3. 导入列和schema 列不对应Schema
    • 4.csv中含有特殊字符导入失败
  • 含包围符数据导入
    • 1. 包围符数据导入
    • 2. 部分数据有包围符
  • 列名含有特殊字符
  • Windows 换行符问题
  • Streamload 表达式写法
    • Demo 1:
    • Demo2 :
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档