孤独雪鹰 发表于 2016-11-21 07:49:43

PostgreSQL启动过程中的那些事七:初始化共享内存和信号六:shmem中初始化twophase

       pg初始化shmem,给其加上索引"ShmemIndex"后,接着就在shmem里初始化xlog。然后依次初始化clog、subtrans、twophase、multixact。安排按clog、subtrans、multixact、twophase的顺序写,把twophase放到multixact之后是因为前面三个用了相同的算法和数据结构,连起来写可以加深印象和归类记忆。这一篇讨论twophase的初始化。
       分布式事务在pg里用两阶段提交(twophase)支持,发起者给一个全局事务ID(GID)标识该事务,同时涉及到的数据库有一个本地事务,pg里是prepare事务。
       每一个全局事务(globaltransaction,简写为gxact)都和一个全局事务ID(GID)相关。每个客户端用PREPARE TRANSACTION命令分配一个GID给一个postgres事务,做两阶段提交准备。
    pg在共享内存数组里保持所有活跃全局事务。当PREPARE TRANSACTION命令发起后,这个全局事务的GID被保存在数组里。这些发生在WAL日志记录之前,因为如果已经有一个相同GID的已处于prepared状态全局事务时,可以检查重复GID和退出事务。
    一个全局事务(gxact)还有一个辅助的PGPROC放在ProcArray数组里。这方便了PGPROC勾住这个全局事务(gxact)的锁。
    为了从崩溃/关闭后再启动时一切正常,所有准备好的事务必须存储在永久存储中。这包括锁信息,等待的通知(pending notifications)等。所有状态信息写入到data/pg_towphase文件夹里的每事务状态文件。
 
  
1先上个图,看一下函数调用过程梗概,中间略过部分细节
  
 
 

初始化twophase方法调用流程图
 
  
 
  
2初始化xlog相关结构
  
话说main()->…->PostmasterMain()->…->reset_shared() -> CreateSharedMemoryAndSemaphores()>…->TwoPhaseShmemInit(),初始化支持分布式事务的两阶段提交/TwoPhase相关数据结构TwoPhaseState等,用作内存里管理和缓存两阶段提交/TwoPhase(对应于存放在"data/ pg_twophase"文件夹里的文件)。
  
TwoPhaseShmemInit()->ShmemInitStruct(),在其中调用hash_search()在哈希表索引"ShmemIndex"中查找"Prepared Transaction Table",如果没有,就在shmemIndex中给"Prepared Transaction Table"分一个HashElement和ShmemIndexEnt(entry),在其中的Entry中写上"Prepared Transaction Table"。返回ShmemInitStruct(),再调用ShmemAlloc()在共享内存上给"Prepared Transaction Table"相关结构(见下面“TwoPhase相关结构图”)分配空间,设置entry(在这儿即ShmemIndexEnt类型变量)的成员location指向该空间,size成员记录该空间大小,最后返回ShmemInitStruct(),让TwoPhaseStateData *类型静态全局变量TwoPhaseState指向TwoPhaseStateData结构,TwoPhaseStateData的起始地址就是在shmem里给"Prepared Transaction Table"相关结构分配的内存起始地址,设置其中TwoPhaseStateData结构类型的成员值。
  
相关变量、结构定义和初始化完成后数据结构图在下面。
  
typedef structTwoPhaseStateData
  
{
  
    /* Head of linked list of free GlobalTransactionDatastructs */
  
    GlobalTransactionfreeGXacts;
  
 
  
    /* Number of valid prepXacts entries. */
  
    int         numPrepXacts;
  
 
  
    /*
  
     * There are max_prepared_xacts items in thisarray, but C wants a
  
     * fixed-size array.
  
     */
  
    GlobalTransactionprepXacts;     /*VARIABLE LENGTH ARRAY */
  
} TwoPhaseStateData;            /* VARIABLE LENGTH STRUCT */
  
 
  
static TwoPhaseStateData*TwoPhaseState;
  
 
  
/*
  
 *GlobalTransactionData is defined in twophase.c; other places have no
  
 * business knowingthe internal definition.
  
 */
  
typedef structGlobalTransactionData *GlobalTransaction;
  
 
  
typedef structGlobalTransactionData
  
{
  
    PGPROC      proc;           /* dummy proc */
  
    TimestampTzprepared_at;    /*time of preparation */
  
    XLogRecPtr  prepare_lsn;    /* XLOG offset of prepare record */
  
    Oid         owner;          /* ID of user that executed the xact */
  
    TransactionIdlocking_xid;  /*top-level XID of backend working on xact */
  
    bool        valid;          /* TRUE iffully prepared */
  
    char        gid;   /* The GID assignedto the prepared xact */
  
} GlobalTransactionData;
  
 
  
struct PGPROC
  
{
  
    /* proc->linksMUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */
  
    SHM_QUEUE   links;          /*list link if process is in a list */
  
 
  
    PGSemaphoreData sem;        /* ONE semaphore to sleep on */
  
    int         waitStatus;     /* STATUS_WAITING, STATUS_OK or STATUS_ERROR */
  
 
  
    LocalTransactionIdlxid;    /* local id of top-leveltransaction currently
  
                                 * being executed by this proc, if running;
  
                                 * else InvalidLocalTransactionId */
  
 
  
    TransactionId xid;          /* id of top-level transactioncurrently being
  
                                 * executed by this proc, if running and XID
  
                                 * is assigned; else InvalidTransactionId */
  
 
  
    TransactionId xmin;         /* minimal running XID as it was whenwe were
  
                                 * starting our xact, excluding LAZY VACUUM:
  
                                 * vacuum must not remove tuples deleted by
  
                                 * xid >= xmin ! */
  
 
  
    int         pid;            /*Backend's process ID; 0 if prepared xact */
  
 
  
    /* These fields arezero while a backend is still starting up: */
  
    BackendId   backendId;      /*This backend's backend ID (if assigned) */
  
    Oid         databaseId;     /* OID of database this backend is using */
  
    Oid         roleId;         /*OID of role using this backend */
  
 
  
    bool        inCommit;       /* true if within commit critical section */
  
 
  
    uint8       vacuumFlags;    /* vacuum-related flags, see above */
  
 
  
    /*
  
     * While in hot standby mode, shows that aconflict signal has been sent
  
     * for the current transaction. Set/clearedwhile holding ProcArrayLock,
  
     * though not required. Accessed without lock,if needed.
  
     */
  
    bool        recoveryConflictPending;
  
 
  
    /* Info about LWLockthe process is currently waiting for, if any. */
  
    bool        lwWaiting;      /* true if waiting for an LW lock */
  
    bool        lwExclusive;    /* true if waiting for exclusive access */
  
    struct PGPROC*lwWaitLink;  /* next waiter for same LWlock */
  
 
  
    /* Info about lockthe process is currently waiting for, if any. */
  
    /* waitLock andwaitProcLock are NULL if not currently waiting. */
  
    LOCK      *waitLock;       /* Lock objectwe're sleeping on ... */
  
    PROCLOCK   *waitProcLock;   /* Per-holder info for awaited lock */
  
    LOCKMODE    waitLockMode;   /* type of lock we're waiting for */
  
    LOCKMASK    heldLocks;      /*bitmask for lock types already held on this
  
                                 * lock object by this backend */
  
 
  
    Latch       procLatch;      /* generic latch for process */
  
 
  
    /*
  
     * Info to allow us to wait for synchronousreplication, if needed.
  
     * waitLSN is InvalidXLogRecPtr if not waiting;set only by user backend.
  
     * syncRepState must not be touched except byowning process or WALSender.
  
     * syncRepLinks used only while holdingSyncRepLock.
  
     */
  
    XLogRecPtr  waitLSN;        /*waiting for this LSN or higher */
  
    int         syncRepState;   /* wait state for sync rep */
  
    SHM_QUEUE   syncRepLinks;   /*list link if process is in syncrep queue */
  
 
  
    /*
  
     * All PROCLOCK objects for locks held orawaited by this backend are
  
     * linked into one of these lists, according tothe partition number of
  
     * their lock.
  
     */
  
    SHM_QUEUE   myProcLocks;
  
 
  
    struct XidCachesubxids;    /* cache for subtransactionXIDs */
  
};
  
 
  
/* shmqueue.c */
  
typedef structSHM_QUEUE
  
{
  
    structSHM_QUEUE *prev;
  
    structSHM_QUEUE *next;
  
} SHM_QUEUE;
  
 
  
关于上面的PGPROC结构这里再说一下。
  
每一个后台进程(backend)在共享内存里有一个PGPROC结构。还有一个当前未使用的PGPROC结构列表可以从中给新的backend分配。
  
Links:任何PGPROC在列表里。当等待锁时,PGPROC被链接到锁的等待进程队列里(lock's waitProcs queue)。一个回收的PGPROC被链接到ProcGlobal的freeProcs列表里。
  
pg的两阶段提交事务管理器还为每一个当前prepared事务创建一个假PGPROC结构。这些PGPROC结构出现在ProcArray数据结构以至于/目的是prepared事务显示/看起来仍然在运行且正确的显示其持有锁。通过它的pid等于0这个事实,一个prepared事务PGPROC能够和一个真正的进程在需要的时候能够被区别。在一个prepared事务PGPROC里的信号和lock-activity字段是不用的,但它的myProcLocks[]数组列表是有效的。
  
 
  
下面看看初始化完"Prepared Transaction Table"相关结构后在内存中的结构图
 
 

 
初始化完twophase相关结构的内存结构图
  
       为了精简上图,把创建shmem的哈希表索引"ShmemIndex"时创建的HCTL结构删掉了,这个结构的作用是记录创建可扩展哈希表的相关信息。增加了左边灰色底的部分,描述共享内存/shmem里各变量物理布局概览,由下往上,由低地址到高地址。其中的"twophase"相关结构图在下面给出,要不上面的图太大太复杂了。
 

twophase相关结构图
页: [1]
查看完整版本: PostgreSQL启动过程中的那些事七:初始化共享内存和信号六:shmem中初始化twophase