Notify和Listen是Postgresql提供的不同会话间异步消息通信功能,例子:
LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.
LISTEN foo;
SELECT pg_notify('fo' || 'o', 'pay' || 'load');
Asynchronous notification "foo" with payload "payload" received from server process with PID 14728.
功能使用PG的基础设施shm_mq + 信号机制拼装实现。
监听、通知的行为也兼容了数据库的事务的功能,事务回滚会删除监听、事务提交会触发通知。
本文对异步消息队列与事务的联动机制做一些分析。
NOTIFY的功能必须等到事务提交才会触发:
postgres=# listen a1;
LISTEN
postgres=# begin;
BEGIN
postgres=*# notify a1;
NOTIFY
postgres=*# notify a1;
NOTIFY
postgres=*# commit;
COMMIT
Asynchronous notification "a1" received from server process with PID 17111.
流程比较简单,先从pendingActions中注册监听。再发信号触发异步notify。
void
AtCommit_Notify(void)
{
...
if (pendingActions != NULL)
{
foreach(p, pendingActions->actions)
{
ListenAction *actrec = (ListenAction *) lfirst(p);
switch (actrec->action)
{
case LISTEN_LISTEN:
Exec_ListenCommit(actrec->channel);
break;
case LISTEN_UNLISTEN:
Exec_UnlistenCommit(actrec->channel);
break;
case LISTEN_UNLISTEN_ALL:
Exec_UnlistenAllCommit();
break;
}
}
}
...
if (pendingNotifies != NULL)
SignalBackends();
...
}
回滚后监听和通知都会清理:
postgres=# begin;
BEGIN
postgres=*# listen k123;
LISTEN
postgres=*# notify k123;
NOTIFY
postgres=*# abort;
ROLLBACK
postgres=# notify k123;
NOTIFY
postgres=#
事务回滚时执行清理动作:
void
AtAbort_Notify(void)
{
if (amRegisteredListener && listenChannels == NIL)
asyncQueueUnregister();
pendingActions = NULL;
pendingNotifies = NULL;
}
全部清理干净。
提交的子事务将notify交接给上一层事务。
postgres=# listen k000;
LISTEN
postgres=# begin;
BEGIN
postgres=*# savepoint sp1;
SAVEPOINT
postgres=*# savepoint sp2;
SAVEPOINT
postgres=*# notify k000;
NOTIFY
postgres=*# release sp2;
RELEASE
postgres=*# commit;
COMMIT
Asynchronous notification "k000" received from server process with PID 18902.
实现:
void
AtSubCommit_Notify(void)
{
int my_level = GetCurrentTransactionNestLevel();
if (pendingActions != NULL &&
pendingActions->nestingLevel >= my_level)
{
if (pendingActions->upper == NULL ||
pendingActions->upper->nestingLevel < my_level - 1)
{
--pendingActions->nestingLevel;
}
else
{
ActionList *childPendingActions = pendingActions;
pendingActions = pendingActions->upper;
pendingActions->actions =
list_concat(pendingActions->actions,
childPendingActions->actions);
pfree(childPendingActions);
}
}
if (pendingNotifies != NULL &&
pendingNotifies->nestingLevel >= my_level)
{
Assert(pendingNotifies->nestingLevel == my_level);
if (pendingNotifies->upper == NULL ||
pendingNotifies->upper->nestingLevel < my_level - 1)
{
--pendingNotifies->nestingLevel;
}
else
{
NotificationList *childPendingNotifies = pendingNotifies;
ListCell *l;
pendingNotifies = pendingNotifies->upper;
foreach(l, childPendingNotifies->events)
{
Notification *childn = (Notification *) lfirst(l);
if (!AsyncExistsPendingNotify(childn))
AddEventToPendingNotifies(childn);
}
pfree(childPendingNotifies);
}
}
}
子事务提交时,notify并不会真正触发,也是和其他资源一样,将自己绑定的nestingLevel转移到上一层(注意这里是绑的nestingLevel不是xid比较合理)。
整体上会有两种情况:
情况一:子事务有间隔,走这个分支pendingActions->upper->nestingLevel < my_level - 1
begin;
savepoint sp1;
notify ch123;
savepoint sp2;
savepoint sp3;
notify ch789;
release sp3;
情况二:子事务无间隔,走else
分支
begin;
savepoint sp1;
notify ch123;
savepoint sp2;
notify ch456;
savepoint sp3;
notify ch789;
release sp3;
pendingActions和pendingNotifies都有自己的upper指针形成链式结构,两种数据结构在子事务提交时的行为都是将信息转移到上一层中,区别是pendingActions直接挂到上一层的actions链表;pendingNotifies调用AddEventToPendingNotifies接口完成同样的动作。
回滚的子事务会删除监听。
postgres=# begin;
BEGIN
postgres=*# savepoint sp1;
SAVEPOINT
postgres=*# listen k123;
LISTEN
postgres=*# savepoint sp2;
SAVEPOINT
postgres=*# listen k000;
LISTEN
postgres=*# rollback to sp2;
ROLLBACK
postgres=*# notify k123;
NOTIFY
postgres=*# notify k000;
NOTIFY
postgres=*# commit;
COMMIT
Asynchronous notification "k123" received from server process with PID 18098.
postgres=#
void
AtSubAbort_Notify(void)
{
int my_level = GetCurrentTransactionNestLevel();
...
while (pendingActions != NULL &&
pendingActions->nestingLevel >= my_level)
{
ActionList *childPendingActions = pendingActions;
pendingActions = pendingActions->upper;
pfree(childPendingActions);
}
while (pendingNotifies != NULL &&
pendingNotifies->nestingLevel >= my_level)
{
NotificationList *childPendingNotifies = pendingNotifies;
pendingNotifies = pendingNotifies->upper;
pfree(childPendingNotifies);
}
}
子事务回滚的话,全部是直接删除,不在做向上归属。
(This content is a summary derived from code comments.)
一个在相同频道上监听的应用程序将会收到自己发送的NOTIFY消息。如果这些消息对应用程序没有用处,可以通过将NOTIFY消息中的be_pid与应用程序自身后端的PID进行比较来忽略它们。(从FE/BE协议2.0开始,在启动期间,后端的PID会提供给前端。)上述设计确保通过忽略自我通知,不会错过来自其他后端的通知。用于通知管理的共享内存使用量(NUM_NOTIFY_BUFFERS)可以根据需要进行调整,而不会影响除性能之外的任何内容。可以同时排队的通知数据的最大量由max_notify_queue_pages GUC确定。