“部分列更新,主要是指直接更新表中某些字段值,而不是全部的字段值。可以采用 Update 语句来进行更新,这种 Update 语句一般采用先将整行数据读出,然后再更新部分字段值,再写回。这种读写事务非常耗时,并且不适合大批量数据写入。 Doris 在主键模型的导入更新,提供了可以直接插入或者更新部分列数据的功能,不需要先读取整行数据,这样更新效率就大幅提升了。 本文适用范围:2.0.2 及之后的版本,将一起学习Doris的Stream Load最佳实践指南。
部分列更新的使用场景主要包括以下几个方面:
这些场景中,部分列更新通过减少不必要的数据写入和锁定,提升了系统的整体性能和响应速度。
注意事项
CREATE TABLE `test_partial_update` (
`id` int(11) NULL,
`value` varchar(20) NULL,
`date_time` datetime(6) not NULL DEFAULT CURRENT_TIMESTAMP,
`dt` datetime(6) default current_timestamp(6) on update current_timestamp(6)
) ENGINE=OLAP
unique KEY(`id`)
COMMENT '部分列更新'
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
on update current_timestamp
“是否在该行有列更新时将该列的值更新为当前时间 (
current_timestamp
)。该特性只能在开启了 Merge-on-Write 的 Unique 表上使用,开启了这个特性的列必须声明默认值,且默认值必须为current_timestamp
。如果此处声明了时间戳的精度,则该列默认值中的时间戳精度必须与该处的时间戳精度相同。
mysql> insert into test_partial_update(id, value) values(1, "doris");
Query OK, 1 row affected (0.04 sec)
{'label':'label_3ed18eb2142c42a6_b02373286719ab46', 'status':'VISIBLE', 'txnId':'60072'}
mysql> select * from test_partial_update;
+------+-------+----------------------------+----------------------------+
| id | value | date_time | dt |
+------+-------+----------------------------+----------------------------+
| 1 | doris | 2024-10-13 10:45:22.000000 | 2024-10-13 10:45:22.257000 |
+------+-------+----------------------------+----------------------------+
1 row in set (0.01 sec)
如果只想更改value值,并且保留数据的插入时间,可以开启insert的部分列更新功能并指定插入的列名:
set enable_unique_key_partial_update=true;
mysql> insert into test_partial_update(id, value) values(1, "SelectDB");
Query OK, 1 row affected (0.03 sec)
{'label':'label_6a1e2c7306e84c81_a3ba841f7b302bb3', 'status':'VISIBLE', 'txnId':'60074'}
mysql> select * from test_partial_update;
+------+----------+----------------------------+----------------------------+
| id | value | date_time | dt |
+------+----------+----------------------------+----------------------------+
| 1 | SelectDB | 2024-10-13 10:45:22.000000 | 2024-10-13 10:48:12.619000 |
+------+----------+----------------------------+----------------------------+
1 row in set (0.01 sec)
可以看出此时date_time字段记录了数据的插入时间,dt字段记录了数据的更新时间。
vim test_partial_update.csv
1, "SelectDB"
curl --location-trusted -u root:123456 -T test_partial_update.csv -H "format:csv" -H "column_separator:," -H "partial_columns:true" -H "columns:id, value" http://10.16.10.6:8030/api/demo/test_partial_update/_stream_load
mysql> select * from test_partial_update;
+------+-------------+----------------------------+----------------------------+
| id | value | date_time | dt |
+------+-------------+----------------------------+----------------------------+
| 1 | "SelectDB" | 2024-10-13 10:45:22.000000 | 2024-10-13 11:11:51.795000 |
+------+-------------+----------------------------+----------------------------+
1 row in set (0.01 sec)
CREATE TABLE `test_partial_update` (
`id` INT(11) NULL,
`value` VARCHAR(20) NULL,
`date_time` DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
`dt` DATETIME(6) DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6)
);
mysql> insert into test_partial_update(id, value) values(1, "SelectDB");
Query OK, 1 row affected (0.00 sec)
mysql> select * from test_partial_update;
+------+----------+----------------------------+----------------------------+
| id | value | date_time | dt |
+------+----------+----------------------------+----------------------------+
| 1 | SelectDB | 2024-10-14 16:08:14.058728 | 2024-10-14 16:08:14.058728 |
+------+----------+----------------------------+----------------------------+
1 row in set (0.00 sec)
./bin/sql-client.sh embedded
“Flink作业周期性执行checkpoint,记录Binlog位点,当作业发生Failover时,便会从之前记录的Binlog位点继续处理。
Flink SQL> SET execution.checkpointing.interval = 3s;
CREATE TABLE cdc_mysql_source (
id INT,
`value` STRING,
date_time TIMESTAMP,
dt TIMESTAMP
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.16.10.6',
'port' = '3326',
'username' = 'root',
'password' = '123456',
'database-name' = 'test',
'scan.incremental.snapshot.chunk.key-column'='id',
'table-name' = 'test_partial_update'
);
CREATE TABLE doris_sink (
id INT,
`value` STRING,
date_time TIMESTAMP,
dt TIMESTAMP
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'demo.test_partial_update',
'username' = 'root',
'password' = '123456',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.properties.columns' = 'id,name,bank,age',
'sink.properties.partial_columns' = 'true' -- 开启部分列更新
);
insert into doris_sink(id, `value`) select id,`value` from cdc_mysql_source;
mysql> select * from test_partial_update;
+------+-------------+----------------------------+----------------------------+
| id | value | date_time | dt |
+------+-------------+----------------------------+----------------------------+
| 1 | "SelectDB" | 2024-10-13 10:45:22.000000 | 2024-10-14 16:11:23.546000 |
+------+-------------+----------------------------+----------------------------+
1 row in set (0.01 sec)
Q1. 如果开启了部分列更新之后插入报错"errCode = 2, detailMessage = Insert has filtered data in strict mode"
解决方法:set enable_insert_strict=false;
Q2. MySQL CDC表数据同步失败。
解决办法:
可以通过以下命令检查 binlog 是否已启用:
SHOW VARIABLES LIKE 'log_bin';
如果 log_bin
的值是 OFF
,需要在 MySQL 配置文件中启用 binlog:
[mysqld]
log_bin = mysql-bin
binlog_format = ROW
server_id = 1
重启 MySQL 服务后生效。
test_partial_update
的字段类型与 Flink 表 DDL 是否匹配。字段类型不匹配可能会导致数据读取问题。解决方法:
set time_zone='+8:00';
#或者
set persist time_zone='+8:00';
下期,我们将一起探讨Doris其它更有趣有用有价值的内容,敬请期待!