前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >PostgreSQL逻辑复制槽功能

PostgreSQL逻辑复制槽功能

作者头像
AustinDatabases
发布2025-03-18 19:16:23
发布2025-03-18 19:16:23
6200
代码可运行
举报
文章被收录于专栏:AustinDatabasesAustinDatabases
运行总次数:0
代码可运行

PostgreSQL 数据库的逻辑复制槽是一项非常重要的功能,通过逻辑复制槽本身,可以提供多种多样的功能。这里我们画一个思维导图进行一个全方面的分析和梳理。

PostgreSQL 逻辑复制槽

逻辑复制槽的功能可以用到多种途径,满足多种业务的需求。 这里总结:

  1. 数据库升级 PostgreSQL数据库升级中,有如下的需求,则需要考虑使用逻辑复制槽的方式来进行数据库的升级。

  1. 系统不能停机太长时间,或根本无法停机,通过逻辑复制槽的方式来进行数据的同步到目的主机(已经升级的PostgreSQL系统)
  2. 系统升级有负担,需要进行验证,或灰度的升级,程序是两份需要连接到不同的数据库版本中,需要动态数据的支持。(需要注意,验证正能验证 Query SQL)

  1. 数据库备份

  1. 数据备份中,仅仅针对表级别进行实时的数据备份
  2. 数据备份中,需要动态的只读类的逻辑库备份
  3. 数据备份中,需要对表进行改造后,进行数据备份,或备份不在主机中尽显,而是在备库中进行。
  4. 数据异步同步

  1. 针对Saas类的需求,进行异步的部分数据同步到甲方
  2. 数据逻辑库异步同步到备库,已被进行非实时的查询
  3. 远程可接受的数据远程传输和移动,且保证数据的异步实时同步

  1. 异构数据库输出

  1. 通过不同的插件输出数据库中的CDC,来捕捉数据库中数据的变化,来进行预估触发事件
  2. 通过不同的插件来完成数据库审计操作的部分功能
  3. 通过不同的插件来进行异构数据的捕捉和清洗等,在将数据通过kafka等二次传输给异构的数据库产品,进行异构的数据同步功能。

  1. 不同的支持逻辑复制的插件

插件名称

输出格式

适用场景

默认安装

test_decoding

文本

测试和开发环境

✅ 是

pgoutput

内部格式

逻辑复制、主备同步

✅ 是

wal2json

JSON

Kafka、Debezium、数据湖

❌否

pg_recvlogical

文本

轻量级日志分析

✅ 是

decoderbufs

Protobuf

高性能数据流

❌ 否


下面是简单的实际操作案例 (版本PG16) 注意事项: 1.在数据复制中,需要保持复制中的所有表均有主键,或唯一索引,如表中无这两项,也可以进行逻辑复制但会严重影响性能,或导致逻辑复制报错

2.数据复制中需要源库需要保证有足够的空间存留WAL日志,避免数据延迟中WAL日志空间不足导致的数据库停机的风险

3.数据复制中需要严格监控复制槽的状态,避免订阅端出现问题,导致复制中断后,主库WAL不断挤压导致的磁盘空间异常问题。

4.逻辑复制槽最大的单位是针对逻辑库,而不是实例这点需要注意

psql

1 进行单逻辑库的数据库传输

在源库上创建逻辑复制槽,注意逻辑复制槽最好带有对应数据库的名字。

代码语言:javascript
代码运行次数:0
运行
复制
SELECT pg_create_logical_replication_slot('test_database', 'pgoutput');

//创建一个发布名字为 test_all_tables

CREATE PUBLICATION test_all_tables FOR ALL TABLES;
代码语言:javascript
代码运行次数:0
运行
复制
postgres=# create database test;
CREATE DATABASE
postgres=# \c test
You are now connected to database "test" as user "postgres".
test=# create table test(id int primary key,name varchar(20),age int);
CREATE TABLE

test=# insert into test(id,name,age) values (1,'Simon',33),(2,'Kim',23);
INSERT 0 2
test=# SELECT pg_create_logical_replication_slot('test_database', 'pgoutput');
 pg_create_logical_replication_slot 
------------------------------------
 (test_database,0/60DA2E0)
(1 row)

test=# CREATE PUBLICATION test_all_tables FOR ALL TABLES;
CREATE PUBLICATION

2 在目的库建立同名的test库,同时建立同名的表,同结构的表,然后创建订阅,最后查询当前的订阅情况

代码语言:javascript
代码运行次数:0
运行
复制
test=# CREATE SUBSCRIPTION test CONNECTION 'host=192.168.198.100 port=5432 dbname=test user=admin password=admin' PUBLICATION test_all_tables;
NOTICE:  created replication slot "test" on publisher
CREATE SUBSCRIPTION
test=# 
test=# select * from pg_subscription;
  oid  | subdbid | subskiplsn | subname | subowner | subenabled | subbinary | substream | subtwophasestate | subdisableonerr | subpasswordrequire
d | subrunasowner |                             subconninfo                              | subslotname | subsynccommit |  subpublications  | subo
rigin 
-------+---------+------------+---------+----------+------------+-----------+-----------+------------------+-----------------+-------------------
--+---------------+----------------------------------------------------------------------+-------------+---------------+-------------------+-----
------
 16421 |   16414 | 0/0        | test    |    16387 | t          | f         | f         | d                | f               | t                 
  | f             | host=192.168.198.100 port=5432 dbname=test user=admin password=admin | test        | off           | {test_all_tables} | any
(1 row)

test=# select * from test;
 id | name  | age 
----+-------+-----
  1 | Simon |  33
  2 | Kim   |  23
(2 rows)


下面我们换另一个实例,我们创建一个不完全同步的表,只同步部分的字段,这里我们在源库创建新的逻辑复制槽,test_id_name

代码语言:javascript
代码运行次数:0
运行
复制
test=# create publication test_id_name for table test (id,name);
CREATE PUBLICATION
test=# select * from pg_publication;
  oid  |     pubname     | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot 
-------+-----------------+----------+--------------+-----------+-----------+-----------+-------------+------------
 16860 | test_all_tables |       10 | t            | t         | t         | t         | t           | f
 16862 | test_id_name    |       10 | f            | t         | t         | t         | t           | f
(2 rows)

test=# 

然后我们在目的库的另一逻辑库引用这个新的publication

代码语言:javascript
代码运行次数:0
运行
复制
test=# create database test2;
CREATE DATABASE
test=# \c test2
You are now connected to database "test2" as user "admin".
test2=# 
test2=# create table test(id int primary key,name varchar(20));
CREATE TABLE

test2=# CREATE SUBSCRIPTION test_id_name_s CONNECTION 'host=192.168.198.100 port=5432 dbname=test user=admin password=admin' PUBLICATION test_id_name;
NOTICE:  created replication slot "test_id_name_s" on publisher
CREATE SUBSCRIPTION
test2=# 
test2=# 
test2=# select * from test;
 id | name  
----+-------
  1 | Simon
  2 | Kim
(2 rows)

test2=# 

通过以上的两个小的实验我们获得了2个逻辑复制槽进行PostgreSQL 逻辑库和单表(选择非全部列)的工作方式。

这里还需要注意,如果源库新加表等,都需要运行如下的命令来刷新订阅接受命令。 ALTER SUBSCRIPTION 订阅的名字 REFRESH PUBLICATION;


对于逻辑复制槽的监控需要注意

1 需要定期发现逻辑复制槽的状态 2 发现逻辑复制槽挤压WAL的文件的情况

::如果发现逻辑复制槽失效,或由于WAL源库的文件丢失等原因逻辑复制槽无法再消费,则必须马上删除订阅,删除逻辑复制槽。

通过如下的命令来针对逻辑复制槽进行监控

代码语言:javascript
代码运行次数:0
运行
复制
test2=# SELECT subname, pid, relid, received_lsn, last_msg_send_time, 
test2-#        last_msg_receipt_time, latest_end_lsn, latest_end_time
test2-# FROM pg_stat_subscription;
    subname     | pid  | relid | received_lsn |      last_msg_send_time       |     last_msg_receipt_time     | latest_end_lsn |        latest_en
d_time        
----------------+------+-------+--------------+-------------------------------+-------------------------------+----------------+-----------------
--------------
 test           | 5478 |       | 0/60DF6F8    | 2024-11-29 12:16:11.152715-05 | 2024-11-29 12:16:11.152782-05 | 0/60DF6F8      | 2024-11-29 12:16
:11.152715-05
 test_id_name_s | 5635 |       | 0/60DF6F8    | 2024-11-29 12:16:10.476948-05 | 2024-11-29 12:16:10.47766-05  | 0/60DF6F8      | 2024-11-29 12:16
:10.476948-05
(2 rows)

下面的命令并不准确,尤其在主库并未有真实的数据变动或插入则这个语句就会产生问题,但如果能保证主库持续有数据,则这个语句可以观察逻辑复制的从库与主库的数据之间的延迟

代码语言:javascript
代码运行次数:0
运行
复制
test2=# SELECT subname, 
test2-#        EXTRACT(EPOCH FROM (now() - last_msg_receipt_time)) AS delay_seconds
test2-# FROM pg_stat_subscription;
    subname     | delay_seconds 
----------------+---------------
 test           |      4.555973
 test_id_name_s |      5.372016
(2 rows)

检查逻辑复制槽的状态,明显active=F 的逻辑复制出现问题,可能是网络,也可能是WAL丢失,或者是订阅端出现问题等等,都需要后续的分析和排除故障。

代码语言:javascript
代码运行次数:0
运行
复制
test=# SELECT slot_name, plugin, active, restart_lsn, confirmed_flush_lsn 
test-# FROM pg_replication_slots
test-# WHERE slot_type = 'logical';
   slot_name    |  plugin  | active | restart_lsn | confirmed_flush_lsn 
----------------+----------+--------+-------------+---------------------
 test_database  | pgoutput | f      | 0/60DA2A8   | 0/60DA2E0
 test           | pgoutput | t      | 0/60DF6C0   | 0/60DF6F8
 test_id_name_s | pgoutput | t      | 0/60DF6C0   | 0/60DF6F8
(3 rows)
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-03-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 AustinDatabases 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档