一、SyncrepLock的相关梳理:

同步复制的概念之前已经讲过,并且该操作基本都是在主节点上执行的。核心的流复制传输仍在walreceiver/walsender模块中进行。这种设计的核心思想是将所有关于等待/释放的逻辑隔离在主节点上。主节点定义了它希望等待的备节点。备节点完全不知道主节点上事务的同步要求,从而降低了代码的复杂性。
当插入数据,WAL写入磁盘过程中,主要是由RecordTransactionCommit()函数完成,保证已提交的数据不会丢失,主库等待、唤醒这一套的实现机制,则是由核心函数为SyncRepWaitForLSN()来保障进行完成的。
对于该操作中,核心的操作是对SyncRepQueue进行控制,该队列的主要用途为:当后台线程走入主要是由RecordTransactionCommit之后,需要将其插入到该等待队列中,并且按照LSN的等待大小进行排序,最终维持一个全局、有序的链表。这里的lsn是上述几种队列队头后台线程等待的日志同步位置。
因此,有很多配套的函数对其进行操作,主要的有:

  • SyncRepQueueInsert:主库 SyncRepWaitForLSN 函数调用,作用是把该进程插入 SyncRepQueue 队列中,然后开始等待;
  • SyncRepCancelWait:停止等待,并将该进程从队列中移除;
  • SyncRepWakeQueue:唤醒队列中所有等待的进程,并将所有进程移除队列;

但是值得注意的是,由于该队列是全局,并且要求强行有序的,所以必须啊要进行加锁,即SyncRepLock,因此,这也是SyncrepLock的最主要的应用点和性能损耗点。

二、 SyncRepLock主要应用点:

  1. 后台线程SyncRepQueue队列插入/删除
    插入:SyncRepWaitForLSN
    取消删除:SyncRepCancelWait
    线程退出删除:SyncRepCleanupAtProcExit
    初始化操作:SyncRepInitConfig
  2. SyncRepReleaseWaiters/NotifySyncWaiters:
    sender线程根据接收到的lsn值,更改lsn队列,遍历有序链表SyncRepQueue队列插入唤醒沉睡的后台线程;
  3. 同步主备机配置变更检测:
    SyncRepUpdateSyncStandbysDefined;

三、SyncRepQueue队列下的SyncRepLock分析:

walsndctl->lsn为各个线程共享有一份,在原有代码逻辑中有两处用途:

针对于walSnder:

用于标记当前接受的lsn,并且给予更新,lsn队列中,并且调用SyncRepWakeQueue唤醒后台等待队列:

    if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_RECEIVE], receivePtr)) {
        walsndctl->lsn[SYNC_REP_WAIT_RECEIVE] = receivePtr;
        numreceive = SyncRepWakeQueue(false, SYNC_REP_WAIT_RECEIVE);
    }
    if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], writePtr)) {
        walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
        numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
    }
    if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], flushPtr)) {
        walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
        numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
    }
    if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_APPLY], replayPtr)) {
        walsndctl->lsn[SYNC_REP_WAIT_APPLY] = replayPtr;
        numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
    }

上述操作为全部加锁执行,保证每更新一个lsn,就能对队列中所有wait_lsn小于lsn的线程进行唤醒,Walsnder驱动唤醒后台线程;

相应的,在SyncRepWakeQueue函数中,会遍历整个proc链表,由于链表递增有序的且加锁保证了互斥性,所以只需要一次遍历便可以保证wait_lsn < lsn下的所有proc都可以被唤醒;

    while (proc != NULL) {
        /*
         * Assume the queue is ordered by LSN
         */
        if (!all && XLByteLT(walsndctl->lsn[mode], proc->waitLSN))
            break;

        /*
         * Move to next proc, so we can delete thisproc from the queue.
         * thisproc is valid, proc may be NULL after this.
         */
        thisproc = proc;
        thisproc->syncRepInCompleteQueue = true;
#ifndef ENABLE_MULTIPLE_NODES
        /*
         * Set confirmed LSN at primary node during sync wait for LSN.
         * Confirmed LSN is the start LSN of last xact of proc which all qurom stanby nodes had met with primary.
         * With saving the confirmed LSN at primary node during sync wait process and validating it during build 
         * process, it can avoid the primary node lost data if it will be built as new standby while an async 
         * standby node is running as new primary.
         */
        if (g_instance.attr.attr_storage.enable_save_confirmed_lsn &&
            XLogRecPtrIsValid(thisproc->syncSetConfirmedLSN)) {
            confirmedLSN =
                XLByteLT(confirmedLSN, thisproc->syncSetConfirmedLSN) ? thisproc->syncSetConfirmedLSN : confirmedLSN;
            thisproc->syncSetConfirmedLSN = InvalidXLogRecPtr;
        }
#endif
        proc = (PGPROC *)SHMQueueNext(&(t_thrd.walsender_cxt.WalSndCtl->SyncRepQueue[mode]), &(proc->syncRepLinks),
                                      offsetof(PGPROC, syncRepLinks));

        /* Refers to the last removable node */
        pTail = &(thisproc->syncRepLinks);
        numprocs++;
    }

该方法先保留头指针,通过遍历确定 满足的最后一个proc,将该链表截断,最后通过唤醒首个proc节点,让该proc节点继续唤醒后续的所有节点;

    /* Delete the finished segment from the list, and only notifies leader proc */
    if (pTail != pHead) {
        PGPROC* leaderProc = (PGPROC *) (((char *) pHead->next) - offsetof(PGPROC, syncRepLinks));
        pHead->next->prev = NULL;
        pHead->next = pTail->next;
        pTail->next->prev = pHead;
        pTail->next = NULL;

        /*
         * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
         * make sure that it sees the queue link being removed before the
         * syncRepState change.
         */
        pg_write_barrier();

        /*
         * Set state to complete; see SyncRepWaitForLSN() for discussion of
         * the various states.
         */
        leaderProc->syncRepState = SYNC_REP_WAIT_COMPLETE;

        /*
         * Wake only when we have set state and removed from queue.
         */
        SetLatch(&(leaderProc->procLatch));
    }
针对于后台线程插入:

后台线程会尝试将自己的PROC(类似)插入到有序链表中:

    (void)LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
    Assert(t_thrd.proc->syncRepState == SYNC_REP_NOT_WAITING);

    /*
     * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
     * set.  See SyncRepUpdateSyncStandbysDefined.
     *
     * Also check that the standby hasn't already replied. Unlikely race
     * condition but we'll be fetching that cache line anyway so its likely to
     * be a low cost check. We don't wait for sync rep if no sync standbys alive
     */
    if (!t_thrd.walsender_cxt.WalSndCtl->sync_standbys_defined) {
        LWLockRelease(SyncRepLock);
        RESUME_INTERRUPTS();
        return NOT_SET_STANDBY_DEFINED;
    }
    if (XLByteLE(XactCommitLSN, t_thrd.walsender_cxt.WalSndCtl->lsn[mode])) {
        LWLockRelease(SyncRepLock);
        RESUME_INTERRUPTS();
        return REPSYNCED;
    }
    if (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !IS_SHARED_STORAGE_MODE &&
        !DelayIntoMostAvaSync(false)) {
        LWLockRelease(SyncRepLock);
        RESUME_INTERRUPTS();
        return STAND_ALONE;
    }
    
    if (!SynRepWaitCatchup(XactCommitLSN)) {
        LWLockRelease(SyncRepLock);
        RESUME_INTERRUPTS();
        return NOT_WAIT_CATCHUP;
    }

    /*
     * Set our waitLSN so WALSender will know when to wake us, and add
     * ourselves to the queue.
     */
    t_thrd.proc->waitLSN = XactCommitLSN;
    t_thrd.proc->syncRepState = SYNC_REP_WAITING;
    SyncRepQueueInsert(mode);
    Assert(SyncRepQueueIsOrderedByLSN(mode));
    LWLockRelease(SyncRepLock);

其中 SyncRepQueueInsert(mode);为选择队列进行插入, 插入后直接会陷入忙等;

    for (;;) {
        /* Must reset the latch before testing state. */
        ResetLatch(&t_thrd.proc->procLatch);


        /*
         * Acquiring the lock is not needed, the latch ensures proper barriers.
         * If it looks like we're done, we must really be done, because once
         * walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will never
         * update it again, so we can't be seeing a stale value in that case.
         */
         // 实际唤醒位置,通过t_thrd.proc->syncRepState == SYNC_REP_WAIT_COMPLETE 退出循环;
        if (t_thrd.proc->syncRepState == SYNC_REP_WAIT_COMPLETE && !DelayIntoMostAvaSync(true)) {
            waitStopRes = SYNC_COMPLETE;
            break;
        }

        /*
         * If a wait for synchronous replication is pending, we can neither
         * acknowledge the commit nor raise ERROR or FATAL.  The latter would
         * lead the client to believe that the transaction aborted, which
         * is not true: it's already committed locally. The former is no good
         * either: the client has requested synchronous replication, and is
         * entitled to assume that an acknowledged commit is also replicated,
         * which might not be true. So in this case we issue a WARNING (which
         * some clients may be able to interpret) and shut off further output.
         * We do NOT reset t_thrd.int_cxt.ProcDiePending, so that the process will die after
         * the commit is cleaned up.
         */
        if (t_thrd.int_cxt.ProcDiePending || t_thrd.proc_cxt.proc_exit_inprogress) {
#ifndef ENABLE_MULTIPLE_NODES
            if (g_instance.attr.attr_storage.enable_save_confirmed_lsn) {
                t_thrd.postgres_cxt.whereToSendOutput = DestNone;
            }
#endif
            ereport(WARNING,
                    (errcode(ERRCODE_ADMIN_SHUTDOWN),
                     errmsg("canceling the wait for synchronous replication and terminating connection due to "
                            "administrator command"),
                     errdetail("The transaction has already committed locally, but might not have been replicated to "
                               "the standby.")));
            t_thrd.postgres_cxt.whereToSendOutput = DestNone;
            if (SyncRepCancelWait()) {
                waitStopRes = STOP_WAIT;
                break;
            }
        }

        /*
         * It's unclear what to do if a query cancel interrupt arrives.  We
         * can't actually abort at this point, but ignoring the interrupt
         * altogether is not helpful, so we just terminate the wait with a
         * suitable warning.
         */
        if (enableHandleCancel && t_thrd.int_cxt.QueryCancelPending) {
            /* reset query cancel signal after vacuum. */
            if (!t_thrd.vacuum_cxt.in_vacuum) {
                t_thrd.int_cxt.QueryCancelPending = false;
            }
            ereport(WARNING,
                    (errmsg("canceling wait for synchronous replication due to user request"),
                     errdetail("The transaction has already committed locally, but might not have been replicated to "
                               "the standby.")));
            if (SyncRepCancelWait()) {
                waitStopRes = STOP_WAIT;
                break;
            }
        }

        /*
         * If the postmaster dies, we'll probably never get an
         * acknowledgement, because all the wal sender processes will exit. So
         * just bail out.
         */
        if (!PostmasterIsAlive()) {
            t_thrd.int_cxt.ProcDiePending = true;
            t_thrd.postgres_cxt.whereToSendOutput = DestNone;
            if (SyncRepCancelWait()) {
                waitStopRes = STOP_WAIT;
                break;
            }
        }

        /*
         * If we modify the syncmode dynamically, we'll stop wait.
         * Reload config file here to update most_available_sync if it's modified
         * dynamically.
         */
        reload_configfile();
        if ((t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !IS_SHARED_STORAGE_MODE &&
             !DelayIntoMostAvaSync(false)) ||
            u_sess->attr.attr_storage.guc_synchronous_commit <= SYNCHRONOUS_COMMIT_LOCAL_FLUSH) {
            ereport(WARNING,
                    (errmsg("canceling wait for synchronous replication due to syncmaster standalone."),
                     errdetail("The transaction has already committed locally, but might not have been replicated to "
                               "the standby.")));
            if (SyncRepCancelWait()) {
                waitStopRes = STOP_WAIT;
                break;
            }
        }

        /*
         * For gs_rewind, if standby or secondary is not connected, we'll stop wait
         */
        if (strcmp(u_sess->attr.attr_common.application_name, "gs_rewind") == 0) {
            if (IS_DN_MULTI_STANDYS_MODE() ||
                (IS_DN_DUMMY_STANDYS_MODE() &&
                 !(WalSndInProgress(SNDROLE_PRIMARY_STANDBY | SNDROLE_PRIMARY_DUMMYSTANDBY)))) {
                ereport(WARNING,
                        (errmsg("canceling wait for synchronous replication due to client is gs_rewind and "
                                "secondary is not connected."),
                         errdetail("The transaction has already committed locally, but might not have been replicated "
                                   "to the standby.")));
                if (SyncRepCancelWait()) {
                    waitStopRes = STOP_WAIT;
                    break;
                }
            }
        }

        /*
         * For case that query cancel pending or proc die pending signal not reached, if current
         * session is set closed, we'll stop wait
         */
        if (u_sess->status == KNL_SESS_CLOSE) {
            ereport(WARNING,
                    (errmsg("canceling wait for synchronous replication due to session close."),
                     errdetail("The transaction has already committed locally, but might not have been replicated to "
                               "the standby.")));
            if (SyncRepCancelWait()) {
                waitStopRes = STOP_WAIT;
                break;
            }
        }

        /*
         * Wait on latch.  Any condition that should wake us up will set the
         * latch, so no need for timeout.
         */
        WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 3000L);
    }

所以后台线程插入之后会进行死等,直到WalSender或者后台leader线程将其唤醒,如前述的WalSender代码唤醒逻辑所言:

    /* Leader informs following procs */
    if (t_thrd.proc->syncRepLinks.next != NULL) {
        SyncRepNotifyComplete();
    }
总结:

对于SyncRepLock在lsn和SyncRepQueue的应用可以初步分析,该锁保证两点:

  1. lsn更新的时候可以一次性唤醒所有后台线程队列,避免唤醒期间队列又有更小lsn的后台线程插入,能够保证所有等待唤醒进程都根据lsn数组由WalSnder唤醒;
  2. 后台线程在插入SyncRepQueue的时候通过加锁可以保证链表的有序性,所有的遍历和唤醒操作都是建立在链表有序这一大前提上;
Logo

鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。

更多推荐