设为首页 收藏本站
查看: 905|回复: 0

[经验分享] Linux:进程池实现

[复制链接]

尚未签到

发表于 2018-5-19 11:24:23 | 显示全部楼层 |阅读模式
  进程池在服务器应用中有很多很多=。=
  下面是半同步半异步进程池的实现代码:
#ifndef _PROCESSPOOL_H
#define _PROCESSPOOL_H
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<errno,h>
#include<string.h>
#include<fcntl.h>
#include<stdlib.h>
#include<signal.h>
#include<sys/wait.h>
#include<sys/stat.h>
class process
{
public:
process():m_pid(-1){}
public:
pid_t m_pid;
int m_pipefd[2];
};
template<typename T>
classe processpool
{
private:
//构造函数私有,类似于单例
proceepool(int listenfd,int process_number = 8);
public:
static processpool<T>* create(int listenfd,int process_number = 8)
{
if(!m_instance)
{
m_instance = new processpool<T>(listenfd,process_number);
}
return m_instance;
}
~processpool()
{
delete [] m_sub_process;
}
private:
void setup_sig_pipe();
void run_parent();
void run_child();
private:
//允许最大的子进村数量
static const int MAX_PROCESS_NUMBER = 16;
//子紧凑最多能处理的客户数量
static const int USER_PER_PROCESS = 65536;
//epoll最多处理的事件数
static const int MAX_EVENT_NUMEBR = 1000;
//进程池进程总数
int m_process_number;
//子进程在池中的序号,0开始
int m_idx;
//每个紧凑都有一个epool内核事件表,用m_epollfd标识
int m_epollfd;
//监听socket
int m_listenfd;
//子进程通过m_stop来决定是否停止运行
int m_stop;
//保存所有子进程的描述信息
process* m_sub_process;
//进程池实例
static process<T>* m_instance;
};
template <class T>
processpool<T>* processpool<T>::m_instance = NULL;
static int sig_pipefd[2];
static int setnonblocking(int fd)
{
int old_option = fcntl(fd,F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd,F_SETFL,new_option);
return old_option;
}
statit void addfd(int epollfd,int fd)
{
epoll_fd event;
event.data.fd = fd;
event.events = EPOLLIN|EPOLLET;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
setnonblocking(fd);
}
stacit void removefd(int epollfd,int fd)
{
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,0);
close(fd);
}
stacit void sig_handler(int sig)
{
int save_errno = errno;
int msg = sig;
send(sig_pipefd[1],(char *)&msg,1,0);
errno = save_errno;
}
static void addsig(int sig,void(handler)(int),bool restart = true)
{
struct sigaction sa;
memset(&sa,'\0',sizeof(sa));
sa.sa_handler = handler;
if(restart)
{
sa.sa_flags = SA_RESTAT;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig,&sa,NULL)!= -1);
}
template<class T>
processpool<T>::processpool(int listenfd,int process_number)
:m_listenfd(listenfd),m_process_numebr(process_number),m_ide(-1),m_stop(false)
{
assert((process_number>0)&&(process_number<=MAX_PROCESS_NUMBER));
m_sub_process = new process[process_number];
assert(m_sub_process);
for(int i =0;i<process_number;++i)
{
int ret = socketpair(PF_UINX,SOCK_STREAM,0,m_sub_process.m_pipefd);
assert(ret == 0);
m_sub_process.m_pid = fork();
assert(m_sub_procee.m_pid >= 0);
if(m_sub_procee.m_pid > 0)
{
close(m_sub_process.m_pipefd[1]);
continue;
}
else
{
close(m_sub_process.m_pipefd[0]);
m_idx = i;
break;
}
}
}
template<class T>
void processpool<T>::setup_sig_pipe()
{
m_epollfd = epoll_create(5);
assert(m_epollfd != -1);
int ret = socketpair(PF_UNIX,SOCK_STREAM,0,sig_pipefd);
assert(ret!= -1);
setnonblocking(sig_pipefd[1]);
addfd(m_epollfd,sig_pipefd[0]);
addsig(SIGCHLD,sig_handler);
addsig(SIGTERM,sig_handler);
addsig(SIGINT,sig_handler);
addsig(SIGPIPE,SIG_IGN);
}
template<class T>
void processpool<T>::run_child()
{
setup_sig_pipe();
int pipefd = m_sub_process[m_idx].pipefd[1];
addfd(m_epollfd,pipefd);
epoll_event events[MAX_EVENT_NUMBER];
T* users = new T[USER_PER_PROCESS];
assert(users);
int number = 0;
int ret = -1;
while(! m_stop)
{
number = epoll_wait(m_epollfd,events,MAX_EVENT_NUMBER,-1);
if((number < 0) && (errno !=EINTR))
{
printf("epoll failure\n");
break;
}
for(int i = 0;i<number;++i)
{
int sockfd = events.data.fd;
if((sockfd == pipefd)&&(events.events& EPOLLIN))
{
int client = 0;
ret = recv(sockfd,(char*)&clinet,sizeof(client),0);
if(((ret <0)&&(errno!=EAGAIN))||ret == 0)
{
continue;
}
else
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_addrss);
int connfd = accept(m_listenfd,(struct sockaddr*)&client_address,&client_addrlength);
if(connfd < 0)
{
printf("errno is:%d\b",errno);
continue;
}
addfd(m_epollfd,connfd);
users[connfd].init(m_epollfd,connfd,client_address);
}
}
else if((sockfd == sig_pipefd[0])&&(events.events&EPOLLIN))
{
int sig;
char signals[1024];
ret = recv(sig_pipefd[0],signals,sizeof(signals),0);
if(ret<= 0)
{
continue;
}
else
{
for(int i = 0;i< ret;++i)
{
switch(signals)
{
case SIGCHLD:
{
pid_t pid;
int stat;
while(pid = waitpid(-1,&stat,WNOHANG)>0)
{
continue;
}
break;
}
case SIGTERM:
case SIGINT:
{
m_stop = true;
break;
}
default:
{
break;
}
}
}
}
}
else if(events.events&EPOLLIN)
{
users[sockfd].process();
}
else
{
continue;
}
}
}
delete [] users;
users = NULL;
close(pipefd);
close(m_epollfd);
}
template<class T>
void processpool<T>::run_parent()
{
setup_sig_pipe();
addfd(m_epollfd,m_listenfd);
epoll_event events[MAX_EVENT_NUMBER];
int sub_process_counter = 0;
int new_conn = 1;
int number = 0;
int ret = -1;
while(!m_stop)
{
number = epoll_wait(m_epollfd,events,MAX_EVENT_NUMBER,-1);
if((number< 0) &&(errno!=EINTER))
{
printf("epoll failure\n");
break;
}
for(int i = 0;i<number;++i)
{
int sockfd = events.data.fd;
if(sockfd == m_listenfd)
{
int i = sub_process_counter;
do
{
if(m_sub_process.m_pid != -1)
{
break;
}
i = (i+1)%m_process_number;
}
while(i!= sub_process_counter);
if(m_sub_number.m_pid == -1)
{
m_stop = true;
break;
}
sub_process_counter = (i+1)%m_process_number;
send(m_sub_process.m_pipefd[0],(char*)&new_conn,sizeof(new_conn),0);
printf("send request to child %d\n",i);
}
else if((sockfd == sig_pipefd[0]) && (events.events &EPOLLIN))
{
int sig;
char signals[1024];
ret = recv(sig_pipefd[0],signals,sizeof(signals),0);
if(ret<= 0)
{
continue;
}
else
{
for(int i = 0;i<ret;++i)
{
switch(signal)
{
case SIGCHLD:
{
pid_t pid;
int stat;
while((pid = waitpid(-1,&stat,WNOHANG))>0)
{
for(int i = 0;i<m_process_number;++i)
{
if(m_sub_process.m_pid== pid)
{
printf("child %d join\n",i);
close(m_sub_process.m_pipfd[0]);
m_sub_process.m_pid = -1;
}
}
}
m_stop = true;
for(int i = 0;i<m_process_number;++i)
{
if(m_sub_process.m_pid!= -1)
{
m_stop = false;
}
}
break;
}
case SIGTERM:
case SIGINT:
{
printf("kill all the child now\n");
for(int i = 0;i<m_process_number;++i)
{
int pid = m_sub_process.m_pid;
if(pid != -1)
{
kill(pid,SIGTERM);
}
}
break;
}
default:
{
break;
}
}
}
}
}
else
{
continue;
}
}
}
close(m_epollfd);
}
#endif  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-461977-1-1.html 上篇帖子: linux博文 下篇帖子: linux sed基本使用
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表