PostgreSQL 数据库的逻辑复制槽是一项非常重要的功能,通过逻辑复制槽本身,可以提供多种多样的功能。这里我们画一个思维导图进行一个全方面的分析和梳理。
PostgreSQL 逻辑复制槽
逻辑复制槽的功能可以用到多种途径,满足多种业务的需求。 这里总结:
插件名称 | 输出格式 | 适用场景 | 默认安装 |
---|---|---|---|
test_decoding | 文本 | 测试和开发环境 | ✅ 是 |
pgoutput | 内部格式 | 逻辑复制、主备同步 | ✅ 是 |
wal2json | JSON | Kafka、Debezium、数据湖 | ❌否 |
pg_recvlogical | 文本 | 轻量级日志分析 | ✅ 是 |
decoderbufs | Protobuf | 高性能数据流 | ❌ 否 |
下面是简单的实际操作案例 (版本PG16) 注意事项: 1.在数据复制中,需要保持复制中的所有表均有主键,或唯一索引,如表中无这两项,也可以进行逻辑复制但会严重影响性能,或导致逻辑复制报错
2.数据复制中需要源库需要保证有足够的空间存留WAL日志,避免数据延迟中WAL日志空间不足导致的数据库停机的风险
3.数据复制中需要严格监控复制槽的状态,避免订阅端出现问题,导致复制中断后,主库WAL不断挤压导致的磁盘空间异常问题。
4.逻辑复制槽最大的单位是针对逻辑库,而不是实例这点需要注意
psql
1 进行单逻辑库的数据库传输
在源库上创建逻辑复制槽,注意逻辑复制槽最好带有对应数据库的名字。
SELECT pg_create_logical_replication_slot('test_database', 'pgoutput');
//创建一个发布名字为 test_all_tables
CREATE PUBLICATION test_all_tables FOR ALL TABLES;
postgres=# create database test;
CREATE DATABASE
postgres=# \c test
You are now connected to database "test" as user "postgres".
test=# create table test(id int primary key,name varchar(20),age int);
CREATE TABLE
test=# insert into test(id,name,age) values (1,'Simon',33),(2,'Kim',23);
INSERT 0 2
test=# SELECT pg_create_logical_replication_slot('test_database', 'pgoutput');
pg_create_logical_replication_slot
------------------------------------
(test_database,0/60DA2E0)
(1 row)
test=# CREATE PUBLICATION test_all_tables FOR ALL TABLES;
CREATE PUBLICATION
2 在目的库建立同名的test库,同时建立同名的表,同结构的表,然后创建订阅,最后查询当前的订阅情况
test=# CREATE SUBSCRIPTION test CONNECTION 'host=192.168.198.100 port=5432 dbname=test user=admin password=admin' PUBLICATION test_all_tables;
NOTICE: created replication slot "test" on publisher
CREATE SUBSCRIPTION
test=#
test=# select * from pg_subscription;
oid | subdbid | subskiplsn | subname | subowner | subenabled | subbinary | substream | subtwophasestate | subdisableonerr | subpasswordrequire
d | subrunasowner | subconninfo | subslotname | subsynccommit | subpublications | subo
rigin
-------+---------+------------+---------+----------+------------+-----------+-----------+------------------+-----------------+-------------------
--+---------------+----------------------------------------------------------------------+-------------+---------------+-------------------+-----
------
16421 | 16414 | 0/0 | test | 16387 | t | f | f | d | f | t
| f | host=192.168.198.100 port=5432 dbname=test user=admin password=admin | test | off | {test_all_tables} | any
(1 row)
test=# select * from test;
id | name | age
----+-------+-----
1 | Simon | 33
2 | Kim | 23
(2 rows)
下面我们换另一个实例,我们创建一个不完全同步的表,只同步部分的字段,这里我们在源库创建新的逻辑复制槽,test_id_name
test=# create publication test_id_name for table test (id,name);
CREATE PUBLICATION
test=# select * from pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
-------+-----------------+----------+--------------+-----------+-----------+-----------+-------------+------------
16860 | test_all_tables | 10 | t | t | t | t | t | f
16862 | test_id_name | 10 | f | t | t | t | t | f
(2 rows)
test=#
然后我们在目的库的另一逻辑库引用这个新的publication
test=# create database test2;
CREATE DATABASE
test=# \c test2
You are now connected to database "test2" as user "admin".
test2=#
test2=# create table test(id int primary key,name varchar(20));
CREATE TABLE
test2=# CREATE SUBSCRIPTION test_id_name_s CONNECTION 'host=192.168.198.100 port=5432 dbname=test user=admin password=admin' PUBLICATION test_id_name;
NOTICE: created replication slot "test_id_name_s" on publisher
CREATE SUBSCRIPTION
test2=#
test2=#
test2=# select * from test;
id | name
----+-------
1 | Simon
2 | Kim
(2 rows)
test2=#
通过以上的两个小的实验我们获得了2个逻辑复制槽进行PostgreSQL 逻辑库和单表(选择非全部列)的工作方式。
这里还需要注意,如果源库新加表等,都需要运行如下的命令来刷新订阅接受命令。 ALTER SUBSCRIPTION 订阅的名字 REFRESH PUBLICATION;
对于逻辑复制槽的监控需要注意
1 需要定期发现逻辑复制槽的状态 2 发现逻辑复制槽挤压WAL的文件的情况
::如果发现逻辑复制槽失效,或由于WAL源库的文件丢失等原因逻辑复制槽无法再消费,则必须马上删除订阅,删除逻辑复制槽。
通过如下的命令来针对逻辑复制槽进行监控
test2=# SELECT subname, pid, relid, received_lsn, last_msg_send_time,
test2-# last_msg_receipt_time, latest_end_lsn, latest_end_time
test2-# FROM pg_stat_subscription;
subname | pid | relid | received_lsn | last_msg_send_time | last_msg_receipt_time | latest_end_lsn | latest_en
d_time
----------------+------+-------+--------------+-------------------------------+-------------------------------+----------------+-----------------
--------------
test | 5478 | | 0/60DF6F8 | 2024-11-29 12:16:11.152715-05 | 2024-11-29 12:16:11.152782-05 | 0/60DF6F8 | 2024-11-29 12:16
:11.152715-05
test_id_name_s | 5635 | | 0/60DF6F8 | 2024-11-29 12:16:10.476948-05 | 2024-11-29 12:16:10.47766-05 | 0/60DF6F8 | 2024-11-29 12:16
:10.476948-05
(2 rows)
下面的命令并不准确,尤其在主库并未有真实的数据变动或插入则这个语句就会产生问题,但如果能保证主库持续有数据,则这个语句可以观察逻辑复制的从库与主库的数据之间的延迟
test2=# SELECT subname,
test2-# EXTRACT(EPOCH FROM (now() - last_msg_receipt_time)) AS delay_seconds
test2-# FROM pg_stat_subscription;
subname | delay_seconds
----------------+---------------
test | 4.555973
test_id_name_s | 5.372016
(2 rows)
检查逻辑复制槽的状态,明显active=F 的逻辑复制出现问题,可能是网络,也可能是WAL丢失,或者是订阅端出现问题等等,都需要后续的分析和排除故障。
test=# SELECT slot_name, plugin, active, restart_lsn, confirmed_flush_lsn
test-# FROM pg_replication_slots
test-# WHERE slot_type = 'logical';
slot_name | plugin | active | restart_lsn | confirmed_flush_lsn
----------------+----------+--------+-------------+---------------------
test_database | pgoutput | f | 0/60DA2A8 | 0/60DA2E0
test | pgoutput | t | 0/60DF6C0 | 0/60DF6F8
test_id_name_s | pgoutput | t | 0/60DF6C0 | 0/60DF6F8
(3 rows)
本文分享自 AustinDatabases 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!