设为首页 收藏本站
查看: 1730|回复: 0

[经验分享] pgpool-II的master-slave模式的分析

[复制链接]

尚未签到

发表于 2016-11-22 09:19:56 | 显示全部楼层 |阅读模式
  磨砺技术珠矶,践行数据之道,追求卓越价值
  回到上一级页面: PostgreSQL集群方案相关索引页     回到顶级页面:PostgreSQL索引页
  现象描述:
  客户来邮件,问:为何Pgpool-II在master-slave模式的时候,发生:
  pgpool-II的某子进程与slave db节点间的连接因为长时间无联系被L4SW切断,却不发生failover,而此时向master db节点的commit已经生效,但是马上返回出错信息?
  简单言之,那是因为,Pgpool-II开发的时候,没有考虑到这种进程的单独的连接被刻意切断的情形。
  此时,如果fail_over_on_backend_error为ture,那么也会激发failover过程。
  如果fail_over_on_backend_error为false,而pgpool-II的主进程此时还不断地进行healthcheck,可以正常检测到slave db节点,那么failover过程不会被激发。
  上代码:
  源代码概要A:



/*                                    
* child main loop                                    
*/                                    
void do_child(int unix_fd, int inet_fd)                                    
{                                    
…                                    
for (;;)                                
{                                
…                           
/* perform accept() */                           
frontend = do_accept(unix_fd, inet_fd, &timeout);                           
if (frontend =/=* N cUonLLn)ection request from frontend timed out */                           
{                           
/* check select() timeout */                        
if (connected && pool_config->child_life_time > 0 &&                        
timeout.tv_sec == 0 && timeout.tv_usec == 0)                    
{                        
pool_debug("child life %d seconds expired", pool_config->child_life_time);                    
/*                    
* Doesn't need to call this. child_exit() calls it.                    
* send_frontend_exits();                    
*/                    
child_exit(2);                    
}                        
continue;                        
}                           
…                           
/*                           
* Ok, negotiaton with frontend has been done. Let's go to the                           
* next step. Connect to backend if there's no existing                           
* connection which can be reused by this frontend.                           
* Authentication is also done in this step.                           
*/                           
…                           
/*                           
* if there's no connection associated with user and database,                           
* we need to connect to the backend and send the startup packet.                           
*/                           
/* look for existing connection */                           
found = 0;                           
backend = pool_get_cp(sp->user, sp->database, sp->major, 1);                           
…                           
/* Mark this connection pool is conncted from frontend */                           
pool_coninfo_set_frontend_connected(pool_get_process_context()->proc_id, pool_pool_index());                           
/* query process loop */                           
for (;;)                           
{                           
POOL_STATUS status;                        
status = pool_process_query(frontend, backend, 0);                        
sp = MASTER_CONNECTION(backend)->sp;                        
switch (status)                        
{                        
…                    
}                        
if (status != POOL_CONTINUE)                        
break;                    
}                           
…                           
}                                
child_exit(0);                                
}                                    
/*                                    
* Main module for query processing                                    
* reset_request: if non 0, call reset_backend to execute reset queries                                    
*/                                    
POOL_STATUS pool_process_query(POOL_CONNECTION *frontend,                                    
POOL_CONNECTION_POOL *backend,            
int reset_request)            
{                                    
…                                
for (;;)                                
{                                
…                           
/*                           
* If we are prcessing query, process it.                           
*/                           
if (pool_is_query_in_progress())                           
{                           
status = ProcessBackendResponse(frontend, backend, &state, &num_fields);                        
if (status != POOL_CONTINUE)                        
return status;                    
}                           
/*                           
* If frontend and all backends do not have any pending data in                           
* the receiving data cache, then issue select(2) to wait for new                           
* data arrival                           
*/                           
else if (is_cache_empty(frontend, backend))                           
{                           
bool cont = true;                        
① status = read_packets_and_process(frontend, backend, reset_request,                        
&state, &num_fields, &cont);
if (status != POOL_CONTINUE)                        
return status;                    
else if (!c/o*n Dt)etected admin shutdown */                        
return status;                    
}                           
else                           
{                           
…                        
}                           
…                           
}                                
return POOL_CONTINUE;                                
}                                    
/*                                    
* Read packet from either frontend or backend and process it.                                    
*/                                    
static POOL_STATUS read_packets_and_process(POOL_CONNECTION *frontend,                                    
POOL_CONNECTION_POOL *backend, int reset_request, int *state, short *num_fields, bool *cont)                                    
{                                    
…                                
if (!reset_request)                                
{                                
if (FD_ISSET(frontend->fd, &exceptmask))                           
return POOL_END;                        
else if (FD_ISSET(frontend->fd, &readmask))                           
{                           
② status = ProcessFrontendResponse(frontend, backend);                        
if (status != POOL_CONTINUE)                        
return status;                    
}                           
}                                
…                                
return POOL_CONTINUE;                                
}                                    
POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,                                    
POOL_CONNECTION_POOL *backend)        
{                                    
…                                
switch (fkind)                                
{                                
…                           
case 'X': /* Terminate */                           
free(contents);                        
return POOL_END;                        
case 'Q': /* Query */                           
allow_close_transaction = 1;                        
③ status = SimpleQuery(frontend, backend, len, contents);                        
break;                        
…                           
default:                           
pool_error("ProcessFrontendResponse: unknown message type %c(%02x)", fkind, fkind);                        
status = POOL_ERROR;                        
}                                
free(contents);                                
if (status != POOL_CONTINUE)                                
status = POOL_ERROR;                           
return status;                                
}                                    
/*                                    
* Process Query('Q') message                                    
* Query messages include an SQL string.                                    
*/                                    
POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,                                    
POOL_CONNECTION_POOL *backend, int len, char *contents)               
{                                    
…                                
/* log query to log file if necessary */                                
if (pool_config->log_statement)                                
{                                
pool_log("statement: %s", contents);                           
}                                
else                                
{                                
pool_debug("statement2: %s", contents);                           
}                                
…                                
if (parse_tree_list != NIL)                                
{                                
…                           
/*                           
* Decide where to send query                           
*/                           
④ pool_where_to_send(query_context, query_context->original_query,                           
query_context->parse_tree);            
…                           
}                                
…                                
/* switch memory context */                                
pool_memory_context_switch_to(old_context);                                
return POOL_CONTINUE;                                
}                                    

/*                                    
* Decide where to send queries(thus expecting response)                                    
*/                                    
void pool_where_to_send(POOL_QUERY_CONTEXT *query_context, char *query, Node *node)                                    
{                                    
…                                
/*                                
* In raw mode, we send only to master node. Simple enough.                                
*/                                
if (RAW_MODE)                                
{                                
pool_set_node_to_be_sent(query_context, REAL_MASTER_NODE_ID);                           
}                                
else if (MASTER_SLAVE && query_context->is_multi_statement)                                
{                                
…                                
}                                
else if (MASTER_SLAVE)                                
{                                
POOL_DEST dest;                           
POOL_MEMORY_POOL *old_context;                           
old_context = pool_memory_context_switch_to(query_context->memory_context);                           
⑤ dest = send_to_where(node, query);                           
pool_memory_context_switch_to(old_context);                           
pool_debug("send_to_where: %d query: %s", dest, query);                           
/* Should be sent to primary only? */                           
if (dest == POOL_PRIMARY)                           
{                           
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);                        
}                           
/* Should be sent to both primary and standby? */                           
else if (dest == POOL_BOTH)                           
{                           
pool_setall_node_to_be_sent(query_context);                        
}                           
/*                           
* Ok, we might be able to load balance the SELECT query.                           
*/                           
else                           
{                           
…                        
}                           
}                                
else if (REPLICATION || PARALLEL_MODE)                                
{                                
…                           
}                                
else                                
{                                
pool_error("pool_where_to_send: unknown mode");                           
return;                           
}                                
…                                
return;                                
}                                    

/*                                    
* From syntactically analysis decide the statement to be sent to the                                    
* primary, the standby or either or both in master/slave+HR/SR mode.                                    
*/                                    
static POOL_DEST send_to_where(Node *node, char *query)                                    
{                                    
if (bsearch(&nodeTag(node), nodemap, sizeof(nodemap)/sizeof(nodemap[0]),                                
sizeof(NodeTag), compare) != NULL)                        
{                                
/*                           
* SELECT INTO                           
* SELECT FOR SHARE or UPDATE                           
*/                           
if (IsA(node, SelectStmt))                           
{                           
/* SELECT INTO or SELECT FOR SHARE or UPDATE ? */                        
if (pool_has_insertinto_or_locking_clause(node))                        
return POOL_PRIMARY;                    
return POOL_EITHER;                        
}                           
…                           
/*                           
* Transaction commands                           
*/                           
else if (IsA(node, TransactionStmt))                           
{                           
/*                        
* Check "BEGIN READ WRITE" "START TRANSACTION READ WRITE"                        
*/                        
if (is_start_transaction_query(node))                        
{                        
/* But actually, we send BEGIN to standby if it's                    
BEGIN READ WRITE or START TRANSACTION READ WRITE */                    
if (is_read_write((TransactionStmt *)node))                    
return POOL_BOTH;               
/* Other TRANSACTION start commands are sent to both primary                    
and standby */               
else                    
return POOL_BOTH;               
}                        
/* SAVEPOINT related commands are sent to both primary and standby */                        
else if (is_savepoint_query(node))                        
return POOL_BOTH;                    
/*                        
* 2PC commands                        
*/                        
else if (is_2pc_transaction_query(node))                        
return POOL_PRIMARY;                    
else                        
/* COMMIT etc. */                        
return POOL_BOTH;                    
}                           
…                           
/*                           
* EXECUTE                           
*/                           
else if (IsA(node, ExecuteStmt))                           
{                           
/* This is temporary decision. where_to_send will inherit                        
* same destination AS PREPARE.                        
*/                        
return POOL_PRIMARY;                        
}                           
…                           
/*                           
* Other statements are sent to primary                           
*/                           
return POOL_PRIMARY;                           
}                                
/*                                
* All unknown statements are sent to primary                                
*/                                
return POOL_PRIMARY;                                
}                                    
  分析如下:
   send_to_where函数中,处在Master/Slave模式的时候,数据的增、删、改指令只向PrimaryDB发送。
 begin/commit这样的事务有关的指令,则既向Master送信,也向Slave送信。
  再看源代码概要B:
   通过上述的分析,从pool_process_query→send_to_where 的调用关系,
 commit则既向Master送信,也向Slave送信,但是!
 由于子进程与Slave之间的网络通信被中断,pool_read发生错误,那么此子进程就exit消亡了。
 而此时,已经向PrimaryDB发送了的commit指令,已经成功,是无法取消的。



/*                        
* child main loop                        
*/                        
void do_child(int unix_fd, int inet_fd)                        
{                        
…                    
for (;;)                    
{                    
…               
/* query process loop */               
for (;;)               
{               
POOL_STATUS status;            
status = pool_process_query(frontend, backend, 0);            
…            
switch (status)            
{            
…        
/* error occured. discard backend connection pool        
and disconnect connection to the frontend */        
case POOL_ERROR:        
pool_log("do_child: exits with status 1 due to error");   
child_exit(1);   
break;   
…        
default:        
break;   
}            
if (status != POOL_CONTINUE)            
break;        
}               
…               
}                    
child_exit(0);                    
}                        
/*                        
* Do house keeping works when pgpool child process exits                        
*/                        
void child_exit(int code)                        
{                        
…                    
/* let backend know now we are exiting */                    
send_frontend_exits();                    
exit(code);                    
}                        
/*                        
* send frontend exiting messages to all connections. this is called                        
* in any case when child process exits, for example failover, child                        
* life time expires or child max connections expires.                        
*/                        
static void send_frontend_exits(void)                        
{                        
…                    
for (i=0;i<pool_config->max_pool;i++, p++)                    
{                    
///ここで、マスタDB関連コネクションへ、exit信号は発送されません               
if (!MASTER_CONNECTION(p))               
continue;            
if (!MASTER_CONNECTION(p)->sp)               
continue;            
if (MASTER_CONNECTION(p)->sp->user == NULL)               
continue;            
pool_send_frontend_exits(p);               
}                    
POOL_SETMASK(&oldmask);                    
}                        

/*                        
* send "terminate"(X) message to all backends, indicating that                        
* backend should prepare to close connection to frontend (actually                        
* pgpool). Note that caller must be protecedt from a signal                        
* interruption while calling this function. Otherwise the number of                        
* valid backends might be changed by failover/failback.                        
*/                        
void pool_send_frontend_exits(POOL_CONNECTION_POOL *backend)                        
{                        
…                    
for (i=0;i<NUM_BACKENDS;i++)                    
{                    
…               
if (VALID_BACKEND(i) && CONNECTION_SLOT(backend, i))               
{               
…            
pool_set_nonblock(CONNECTION(backend, i)->fd);            
pool_flush_it(CONNECTION(backend, i));            
pool_unset_nonblock(CONNECTION(backend, i)->fd);            
}               
}                    
}                        

/*                        
* flush write buffer                        
*/                        
int pool_flush_it(POOL_CONNECTION *cp)                        
{                        
…                    
for (;;)                    
{                    
…               
if (sts > 0)               
{               
…            
}               
else if (errno == EAGAIN || errno == EINTR)               
{               
continue;            
}               
else               
{               
/* If this is the backend stream, report error. Otherwise            
* just report debug message.            
*/            
if (cp->isbackend)            
pool_error("pool_flush_it: write failed to backend (%d). reason: %s offset: %d wlen: %d",  
cp->db_node_id, strerror(errno), offset, wlen);
else            
pool_debug("pool_flush_it: write failed to frontend. reason: %s offset: %d wlen: %d",  
strerror(errno), offset, wlen);
cp->wbufpo = 0;            
return -1;            
}               
}                    
…                    
return 0;                    
}                        

/*                        
* Main module for query processing                        
* reset_request: if non 0, call reset_backend to execute reset queries                        
*/                        
POOL_STATUS pool_process_query(POOL_CONNECTION *frontend,                        
POOL_CONNECTION_POOL *backend,   
int reset_request)
{                        
…                    
for (;;)                    
{                    
…               
/*               
* If we are prcessing query, process it.               
*/               
if (pool_is_query_in_progress())               
{               
status = ProcessBackendResponse(frontend, backend, &state, &num_fields);            
if (status != POOL_CONTINUE)            
return status;        
}               
…                    
}                    
return POOL_CONTINUE;                    
}                        

POOL_STATUS ProcessBackendResponse(POOL_CONNECTION *frontend,                        
POOL_CONNECTION_POOL *backend,                        
int *state, short *num_fields)                        
{                        
…                    
status = read_kind_from_backend(frontend, backend, &kind);                    
if (status != POOL_CONTINUE)                    
return status;               
…                    
}                        

/*                        
* read_kind_from_backend: read kind from backends.                        
* the "frontend" parameter is used to send "kind mismatch" error message to the frontend.  
* the out parameter "decided_kind" is the packet kind decided by this function.                        
* this function uses "decide by majority" method if kinds from all backends do not agree.                        
*/                        
POOL_STATUS read_kind_from_backend(POOL_CONNECTION *frontend,                        
POOL_CONNECTION_POOL *backend, char *decided_kind)                        
{                        
…                    
for (i=0;i<NUM_BACKENDS;i++)                    
{                    
…               
if (VALID_BACKEND(i))               
{               
…            
do            
{            
char *p, *value;        
int len;        
if (pool_read(CONNECTION(backend, i), &kind, 1) < 0)        
{        
pool_error("read_kind_from_backend: failed to read kind from %d th backend", i);   
return POOL_ERROR;   
}        
…        
} while (kind == 'S');            
…            
}               
else               
kind_list = 0;            
}                    
…                    
return POOL_CONTINUE;                    
}                        

/*                        
* read len bytes from cp                        
* returns 0 on success otherwise -1.                        
*/                        
int pool_read(POOL_CONNECTION *cp, void *buf, int len)                        
{                        
…                    
while (len > 0)                    
{                    
…               
if (cp->ssl_active > 0) {               
readlen = pool_ssl_read(cp, readbuf, READBUFSZ);            
} else {               
readlen = read(cp->fd, readbuf, READBUFSZ);            
}               
…               
if (readlen == -1)               
{               
…            
pool_error("pool_read: read failed (%s)", strerror(errno));            
if (cp->isbackend)            
{            
/* if fail_over_on_backend_erro is true, then trigger failover */        
if (pool_config->fail_over_on_backend_error)        
{        
notice_backend_error(cp->db_node_id);   
child_exit(1);   
}        
else        
return -1;   
}            
else            
{            
return -1;        
}            
}               
else if (readlen == 0)               
{               
if (cp->isbackend)            
{            
pool_error("pool_read: EOF encountered with backend");        
return -1;        
}            
else            
{            
/*        
* if backend offers authentication method, frontend could close connection        
*/        
return -1;        
}            
}               
…               
}                    
return 0;                    
}                        
  回到上一级页面: PostgreSQL集群方案相关索引页     回到顶级页面:PostgreSQL索引页
  磨砺技术珠矶,践行数据之道,追求卓越价值

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-303812-1-1.html 上篇帖子: Sqlite在.NET下的使用和Sqlite数据库清理 下篇帖子: Moon.Orm 配置说明
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表