openGauss——SyncRepLock同步机制概述
lsn更新的时候可以一次性唤醒所有后台线程队列,避免唤醒期间队列又有更小lsn的后台线程插入,能够保证所有等待唤醒进程都根据lsn数组由WalSnder唤醒;后台线程在插入SyncRepQueue的时候通过加锁可以保证链表的有序性,所有的遍历和唤醒操作都是建立在链表有序这一大前提上;
一、SyncrepLock的相关梳理:
同步复制的概念之前已经讲过,并且该操作基本都是在主节点上执行的。核心的流复制传输仍在walreceiver/walsender模块中进行。这种设计的核心思想是将所有关于等待/释放的逻辑隔离在主节点上。主节点定义了它希望等待的备节点。备节点完全不知道主节点上事务的同步要求,从而降低了代码的复杂性。
当插入数据,WAL写入磁盘过程中,主要是由RecordTransactionCommit()函数完成,保证已提交的数据不会丢失,主库等待、唤醒这一套的实现机制,则是由核心函数为SyncRepWaitForLSN()来保障进行完成的。
对于该操作中,核心的操作是对SyncRepQueue进行控制,该队列的主要用途为:当后台线程走入主要是由RecordTransactionCommit之后,需要将其插入到该等待队列中,并且按照LSN的等待大小进行排序,最终维持一个全局、有序的链表。这里的lsn是上述几种队列队头后台线程等待的日志同步位置。
因此,有很多配套的函数对其进行操作,主要的有:
- SyncRepQueueInsert:主库 SyncRepWaitForLSN 函数调用,作用是把该进程插入 SyncRepQueue 队列中,然后开始等待;
- SyncRepCancelWait:停止等待,并将该进程从队列中移除;
- SyncRepWakeQueue:唤醒队列中所有等待的进程,并将所有进程移除队列;
但是值得注意的是,由于该队列是全局,并且要求强行有序的,所以必须啊要进行加锁,即SyncRepLock,因此,这也是SyncrepLock的最主要的应用点和性能损耗点。
二、 SyncRepLock主要应用点:
- 后台线程SyncRepQueue队列插入/删除
插入:SyncRepWaitForLSN
取消删除:SyncRepCancelWait
线程退出删除:SyncRepCleanupAtProcExit
初始化操作:SyncRepInitConfig - SyncRepReleaseWaiters/NotifySyncWaiters:
sender线程根据接收到的lsn值,更改lsn队列,遍历有序链表SyncRepQueue队列插入唤醒沉睡的后台线程; - 同步主备机配置变更检测:
SyncRepUpdateSyncStandbysDefined;
三、SyncRepQueue队列下的SyncRepLock分析:
walsndctl->lsn为各个线程共享有一份,在原有代码逻辑中有两处用途:
针对于walSnder:
用于标记当前接受的lsn,并且给予更新,lsn队列中,并且调用SyncRepWakeQueue唤醒后台等待队列:
if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_RECEIVE], receivePtr)) {
walsndctl->lsn[SYNC_REP_WAIT_RECEIVE] = receivePtr;
numreceive = SyncRepWakeQueue(false, SYNC_REP_WAIT_RECEIVE);
}
if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], writePtr)) {
walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
}
if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], flushPtr)) {
walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_APPLY], replayPtr)) {
walsndctl->lsn[SYNC_REP_WAIT_APPLY] = replayPtr;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
}
上述操作为全部加锁执行,保证每更新一个lsn,就能对队列中所有wait_lsn小于lsn的线程进行唤醒,Walsnder驱动唤醒后台线程;
相应的,在SyncRepWakeQueue函数中,会遍历整个proc链表,由于链表递增有序的且加锁保证了互斥性,所以只需要一次遍历便可以保证wait_lsn < lsn下的所有proc都可以被唤醒;
while (proc != NULL) {
/*
* Assume the queue is ordered by LSN
*/
if (!all && XLByteLT(walsndctl->lsn[mode], proc->waitLSN))
break;
/*
* Move to next proc, so we can delete thisproc from the queue.
* thisproc is valid, proc may be NULL after this.
*/
thisproc = proc;
thisproc->syncRepInCompleteQueue = true;
#ifndef ENABLE_MULTIPLE_NODES
/*
* Set confirmed LSN at primary node during sync wait for LSN.
* Confirmed LSN is the start LSN of last xact of proc which all qurom stanby nodes had met with primary.
* With saving the confirmed LSN at primary node during sync wait process and validating it during build
* process, it can avoid the primary node lost data if it will be built as new standby while an async
* standby node is running as new primary.
*/
if (g_instance.attr.attr_storage.enable_save_confirmed_lsn &&
XLogRecPtrIsValid(thisproc->syncSetConfirmedLSN)) {
confirmedLSN =
XLByteLT(confirmedLSN, thisproc->syncSetConfirmedLSN) ? thisproc->syncSetConfirmedLSN : confirmedLSN;
thisproc->syncSetConfirmedLSN = InvalidXLogRecPtr;
}
#endif
proc = (PGPROC *)SHMQueueNext(&(t_thrd.walsender_cxt.WalSndCtl->SyncRepQueue[mode]), &(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
/* Refers to the last removable node */
pTail = &(thisproc->syncRepLinks);
numprocs++;
}
该方法先保留头指针,通过遍历确定 满足的最后一个proc,将该链表截断,最后通过唤醒首个proc节点,让该proc节点继续唤醒后续的所有节点;
/* Delete the finished segment from the list, and only notifies leader proc */
if (pTail != pHead) {
PGPROC* leaderProc = (PGPROC *) (((char *) pHead->next) - offsetof(PGPROC, syncRepLinks));
pHead->next->prev = NULL;
pHead->next = pTail->next;
pTail->next->prev = pHead;
pTail->next = NULL;
/*
* SyncRepWaitForLSN() reads syncRepState without holding the lock, so
* make sure that it sees the queue link being removed before the
* syncRepState change.
*/
pg_write_barrier();
/*
* Set state to complete; see SyncRepWaitForLSN() for discussion of
* the various states.
*/
leaderProc->syncRepState = SYNC_REP_WAIT_COMPLETE;
/*
* Wake only when we have set state and removed from queue.
*/
SetLatch(&(leaderProc->procLatch));
}
针对于后台线程插入:
后台线程会尝试将自己的PROC(类似)插入到有序链表中:
(void)LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
Assert(t_thrd.proc->syncRepState == SYNC_REP_NOT_WAITING);
/*
* We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
* set. See SyncRepUpdateSyncStandbysDefined.
*
* Also check that the standby hasn't already replied. Unlikely race
* condition but we'll be fetching that cache line anyway so its likely to
* be a low cost check. We don't wait for sync rep if no sync standbys alive
*/
if (!t_thrd.walsender_cxt.WalSndCtl->sync_standbys_defined) {
LWLockRelease(SyncRepLock);
RESUME_INTERRUPTS();
return NOT_SET_STANDBY_DEFINED;
}
if (XLByteLE(XactCommitLSN, t_thrd.walsender_cxt.WalSndCtl->lsn[mode])) {
LWLockRelease(SyncRepLock);
RESUME_INTERRUPTS();
return REPSYNCED;
}
if (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !IS_SHARED_STORAGE_MODE &&
!DelayIntoMostAvaSync(false)) {
LWLockRelease(SyncRepLock);
RESUME_INTERRUPTS();
return STAND_ALONE;
}
if (!SynRepWaitCatchup(XactCommitLSN)) {
LWLockRelease(SyncRepLock);
RESUME_INTERRUPTS();
return NOT_WAIT_CATCHUP;
}
/*
* Set our waitLSN so WALSender will know when to wake us, and add
* ourselves to the queue.
*/
t_thrd.proc->waitLSN = XactCommitLSN;
t_thrd.proc->syncRepState = SYNC_REP_WAITING;
SyncRepQueueInsert(mode);
Assert(SyncRepQueueIsOrderedByLSN(mode));
LWLockRelease(SyncRepLock);
其中 SyncRepQueueInsert(mode);为选择队列进行插入, 插入后直接会陷入忙等;
for (;;) {
/* Must reset the latch before testing state. */
ResetLatch(&t_thrd.proc->procLatch);
/*
* Acquiring the lock is not needed, the latch ensures proper barriers.
* If it looks like we're done, we must really be done, because once
* walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will never
* update it again, so we can't be seeing a stale value in that case.
*/
// 实际唤醒位置,通过t_thrd.proc->syncRepState == SYNC_REP_WAIT_COMPLETE 退出循环;
if (t_thrd.proc->syncRepState == SYNC_REP_WAIT_COMPLETE && !DelayIntoMostAvaSync(true)) {
waitStopRes = SYNC_COMPLETE;
break;
}
/*
* If a wait for synchronous replication is pending, we can neither
* acknowledge the commit nor raise ERROR or FATAL. The latter would
* lead the client to believe that the transaction aborted, which
* is not true: it's already committed locally. The former is no good
* either: the client has requested synchronous replication, and is
* entitled to assume that an acknowledged commit is also replicated,
* which might not be true. So in this case we issue a WARNING (which
* some clients may be able to interpret) and shut off further output.
* We do NOT reset t_thrd.int_cxt.ProcDiePending, so that the process will die after
* the commit is cleaned up.
*/
if (t_thrd.int_cxt.ProcDiePending || t_thrd.proc_cxt.proc_exit_inprogress) {
#ifndef ENABLE_MULTIPLE_NODES
if (g_instance.attr.attr_storage.enable_save_confirmed_lsn) {
t_thrd.postgres_cxt.whereToSendOutput = DestNone;
}
#endif
ereport(WARNING,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("canceling the wait for synchronous replication and terminating connection due to "
"administrator command"),
errdetail("The transaction has already committed locally, but might not have been replicated to "
"the standby.")));
t_thrd.postgres_cxt.whereToSendOutput = DestNone;
if (SyncRepCancelWait()) {
waitStopRes = STOP_WAIT;
break;
}
}
/*
* It's unclear what to do if a query cancel interrupt arrives. We
* can't actually abort at this point, but ignoring the interrupt
* altogether is not helpful, so we just terminate the wait with a
* suitable warning.
*/
if (enableHandleCancel && t_thrd.int_cxt.QueryCancelPending) {
/* reset query cancel signal after vacuum. */
if (!t_thrd.vacuum_cxt.in_vacuum) {
t_thrd.int_cxt.QueryCancelPending = false;
}
ereport(WARNING,
(errmsg("canceling wait for synchronous replication due to user request"),
errdetail("The transaction has already committed locally, but might not have been replicated to "
"the standby.")));
if (SyncRepCancelWait()) {
waitStopRes = STOP_WAIT;
break;
}
}
/*
* If the postmaster dies, we'll probably never get an
* acknowledgement, because all the wal sender processes will exit. So
* just bail out.
*/
if (!PostmasterIsAlive()) {
t_thrd.int_cxt.ProcDiePending = true;
t_thrd.postgres_cxt.whereToSendOutput = DestNone;
if (SyncRepCancelWait()) {
waitStopRes = STOP_WAIT;
break;
}
}
/*
* If we modify the syncmode dynamically, we'll stop wait.
* Reload config file here to update most_available_sync if it's modified
* dynamically.
*/
reload_configfile();
if ((t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !IS_SHARED_STORAGE_MODE &&
!DelayIntoMostAvaSync(false)) ||
u_sess->attr.attr_storage.guc_synchronous_commit <= SYNCHRONOUS_COMMIT_LOCAL_FLUSH) {
ereport(WARNING,
(errmsg("canceling wait for synchronous replication due to syncmaster standalone."),
errdetail("The transaction has already committed locally, but might not have been replicated to "
"the standby.")));
if (SyncRepCancelWait()) {
waitStopRes = STOP_WAIT;
break;
}
}
/*
* For gs_rewind, if standby or secondary is not connected, we'll stop wait
*/
if (strcmp(u_sess->attr.attr_common.application_name, "gs_rewind") == 0) {
if (IS_DN_MULTI_STANDYS_MODE() ||
(IS_DN_DUMMY_STANDYS_MODE() &&
!(WalSndInProgress(SNDROLE_PRIMARY_STANDBY | SNDROLE_PRIMARY_DUMMYSTANDBY)))) {
ereport(WARNING,
(errmsg("canceling wait for synchronous replication due to client is gs_rewind and "
"secondary is not connected."),
errdetail("The transaction has already committed locally, but might not have been replicated "
"to the standby.")));
if (SyncRepCancelWait()) {
waitStopRes = STOP_WAIT;
break;
}
}
}
/*
* For case that query cancel pending or proc die pending signal not reached, if current
* session is set closed, we'll stop wait
*/
if (u_sess->status == KNL_SESS_CLOSE) {
ereport(WARNING,
(errmsg("canceling wait for synchronous replication due to session close."),
errdetail("The transaction has already committed locally, but might not have been replicated to "
"the standby.")));
if (SyncRepCancelWait()) {
waitStopRes = STOP_WAIT;
break;
}
}
/*
* Wait on latch. Any condition that should wake us up will set the
* latch, so no need for timeout.
*/
WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 3000L);
}
所以后台线程插入之后会进行死等,直到WalSender或者后台leader线程将其唤醒,如前述的WalSender代码唤醒逻辑所言:
/* Leader informs following procs */
if (t_thrd.proc->syncRepLinks.next != NULL) {
SyncRepNotifyComplete();
}
总结:
对于SyncRepLock在lsn和SyncRepQueue的应用可以初步分析,该锁保证两点:
- lsn更新的时候可以一次性唤醒所有后台线程队列,避免唤醒期间队列又有更小lsn的后台线程插入,能够保证所有等待唤醒进程都根据lsn数组由WalSnder唤醒;
- 后台线程在插入SyncRepQueue的时候通过加锁可以保证链表的有序性,所有的遍历和唤醒操作都是建立在链表有序这一大前提上;
鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。
更多推荐


所有评论(0)