设为首页 收藏本站
查看: 412|回复: 0

[经验分享] Linux高性能服务器编程——I/O复用

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-7-2 10:10:50 | 显示全部楼层 |阅读模式
IO复用
I/O复用使得程序能同时监听多个文件描述符,通常网络程序在下列情况下需要使用I/O复用技术:
  • 客户端程序要同时处理多个socket
  • 客户端程序要同时处理用户输入和网络连接
  • TCP服务器要同时处理监听socket和连接socket,这是I/O复用使用最多的场合
  • 服务器要同时处理TCP请求和UDP请求。比如本章将要讨论的会社服务器
  • 服务器要同时监听多个端口,或者处理多种服务。

I/O复用虽然能同时监听多个文件描述符,但它本身是阻塞的。并且当多个文件描述符同时就绪时,如果不采用额外措施,程序就只能按顺序依次处理其中的每一个文件描述符,这使得服务器程序看起来像是串行工作。如果要实现并发,只能使用多进程或多线程等变成手段。
select系统复用
select系统调用的用途是:在一段指定时间内,几件套用户感兴趣的文件描述符上的可读可写和异常等事件。
#include <sys/select.h>
int select(int nfds, fd_set *readfds,fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
  • nfds参数指定被监听的文件描述符的总数。通常被设置为select监听的所有文件描述符中的最大值加1,因为文件描述符是从0开始计数的
  • readfds, writefds和exceptfds参数分别指向可读、可写和异常等事件对应的文件描述符集合。
    fd_set结构体仅包含一个整形数组,高数组的每个元素的每一位标记一个文件描述符。
    可用如下宏来访问fd_set结构体中的位:
    voidFD_CLR(int fd, fd_set *set);
    int  FD_ISSET(int fd, fd_set *set);
    voidFD_SET(int fd, fd_set *set);
        void FD_ZERO(fd_set*set);
  • timeout参数用来设置select函数的超时时间。它是一个timeval指针,timeval结构体定义如下:

struct timeval {
              long   tv_sec;        /* seconds */
              long   tv_usec;       /* microseconds */
          };
如果给timeout传递NULL,则select将一直阻塞,直到某个文件描述符就绪。
select成功时返回就绪文件描述符的总数,如果在超时时间内没有任何文件描述符就绪返回0,失败返回-1,并设置errno;如果select在等待期间收到信号,则select立即返回-1,并设置errno为EINTR。
poll系统调用
poll系统调用和select类似,也是在指定时间内伦旭一定数量的文件描述符,以测试其中是否有就绪。poll原型如下:
#include<poll.h>
int poll(structpollfd *fds, nfds_t nfds, int timeout);
1)fds参数是一个pollfd结构类型的数组,它指定所以我们感兴趣的文件描述符上发生的刻度、可写和异常等时间。其结构定义如下:
struct pollfd {
              int  fd;        /* file descriptor */
              short events;    /* requested events */
              short revents;   /* returned events */
          };
其中fd成员指定文件描述符;events成员告诉poll监听f上的那些时间,它是一系列时间的按位或;revents成员则由内核修改,以通知应用程序fd上实际发生了哪些事件。
2)nfds参数指定被监听事件集合的大小。其类型nfds_t定义如下:
typedef unsignedlong int nfds_t;
  • timeout参数指定poll的超时时间,单位是毫秒。当timeout为-1时,poll调用将永远阻塞,直到某个事件发生;当为0时,poll调用立即返回。
    poll返回值含义与select相同。

epoll系列系统调用内核事件表
epoll是Linux特有的I/O复用函数。它在实现和使用上与select、poll有很大差异。首先,epoll使用一组函数来完成任务,而不是单个函数。其次,epoll把用户关心的文件描述符上的时间放在内核里的一个时间表中,从而无需向select和poll那样每次调用都要重复传入文件描述符集或事件集。但epoll需要使用一个额外的文件描述符,来唯一标识内核中的这个时间表。这个文件描述符使用如下epoll_create函数创建:
#include <sys/epoll.h>
int epoll_create(int size);
size参数给内核一个提示,告诉它时间表需要多大。该函数返回的文件描述符将作用其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表。
下面的函数用来操作epoll的内核事件表:
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);
fd参数是要操作的文件描述符,op参数则制定操作类型,操作类型有如下3种:
EPOLL_CTL_ADD:往事件表中注册fd上的事件
EPOLL_CTL_MOD:修改fd上的注册事件
EPOLL_CTL_DEL:删除fd上的注册事件
event参数指定时间,它是epoll_event结构指针类型。epoll_event的定义如下:
struct epoll_event {
              uint32_t    events;     /* Epoll events */
              epoll_data_t data;       /* User data variable */
          };
其中events成员描述事件类型。data成员用于存储用户数据,其类型epoll_data的定义如下:
typedef union epoll_data {
              void       *ptr;
              int         fd;
              uint32_t    u32;
              uint64_t    u64;
          } epoll_data_t;
epoll_data_t是一个联合体,其中4个成员中使用最多的是fd,它指定事件所丛书的目标文件描述符。
epoll_ctl成功时返回0,失败时返回-1并设置errno。
epoll_wait函数
epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间内等待一组文件描述符上的事件,其原型如下:
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);
该函数成功时返回就绪的文件描述符的个数,失败是返回-1,并设置errno。
maxevents参数指定最多监听多少时间,必须大于0.
epoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表中复制到它的第二个参数events指向的数组中。这个数组只用于输出epoll_wait检测到的就绪时间,而不像select和poll数组那样即用于传入用户注册的时间,有用于输出内核检测到的就绪时间。这就极大的提高了应用程序索引就绪文件描述符的效率。下面的代码体现了这个差别:
/*如何索引poll返回的就绪文件描述符*/
int ret = poll(fds, MAX_EVENT_NUMBER, -1);
/*必须遍历所有注册文件描述符并找到其中的就绪着*/
for(int i=0;i<MAX_EVENT_NUMBER; ++i)
{
        if(fds.revents & POLLIN)
{
        int sockfd = fds.fd;
        /*处理sockfd*/
}
}
/*如何索引epoll返回的就绪文件描述符*/
int ret =epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1);
/*遍历就绪的ret个文件描述符*/
for( int i=0;i<ret; i++)
{
        int socketfd = events.data.fd;
        /*socket肯定就绪,直接处理*/
}
LTET模式
epoll对文件描述符的操作有两种模式:LT模式(Levek Trigger,电平触发)和ET模式(E多个Trigger,边沿触发)。LT模式是默认的工作模式,这种模式下epoll相当于一个效率较高的poll。当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描述符。ET模式是epoll的搞笑工作模式。
对于采用LT工作模式的文件描述符,当epoll_wait检测到其上有时间发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。而对于采用ET工作模式的文件描述符,当epoll_wait检测到其上有时间发生并将此时间通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向用用程序通知这一事件。可见,ET在很大程度上降低了同一个epoll事件被重复触发的次数,因此效率比LT模式高。
文章最后的程序清单1比较了两种模式:
当在客户端telnet传输“abcdefghijklmnopqrstuvwxyz”字符串时,输出如下
ET模式输出:
event trigger once
get 9 bytes of content: abcdefghi
get 9 bytes of content: jklmnopqr
get 9 bytes of content: stuvwxyz
get 1 bytes of content:
LT模式输出:
event trigger once
get 9 bytes of content: abcdefghi
event trigger once
get 9 bytes of content: jklmnopqr
event trigger once
get 9 bytes of content: stuvwxyz
event trigger once
get 1 bytes of content:
可以看到正如我们预期,ET模式下时间只被触发一次,要比LT模式下少很多。
EPOLLONESHOT事件
即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中会引起一个问题。比如一个县城在读取完某个socket上的数据后开始处理这些数据,二在数据的处理工程中该socket上又有新数据可读,此时另外一个县城北唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面,这当然不是我们期望的。我们期望的是一个socket连接在任一时刻都只被一个线程处理,这一点可以使用spoll的EPOLLONESHOT事件实现。
        对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,而且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件ain.zheyang,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。但反过来思考,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会处理这个socket。
程序清单2展示了EPOLLONESHOT事件的使用。
三组I/O复用函数的比较
系统调用
select
poll
epoll
事件集合
用户通过3个参数分别传入感兴趣的可读、可写及异常等事件,内核通过对这些参数在线修改来反馈其中的就绪事件。这使得用户每次调用select都要重置这3个参数
统一处理所有事件类型,因此只需要一个事件集参数。用户通过pollfd.events传入感兴趣的事件,内核通过修改pollfd.revents反馈其中就绪的事件
内核通过一个时间表直接管理用户感兴趣的所有事件。因此每次调用epoll_wait时,无需反复传入用户感兴趣的时间。epoll_wait系统调用的参数events仅用来反馈就绪的事件。
应用程序索引就绪文件描述符的时间复杂度
O(N)
O(N)
O(1)
最大支持文件描述符数
一般有最大值限制
65535
65535
工作模式
LT
LT
支持ET高效模式
内核实现和工作效率
采用轮询方法来检测就绪事件,算法复杂度为O(N)
采用轮询方式检测就绪事件,算法复杂度为O(N)
采用回调方式来检测就绪事件,算法复杂度为O(1)
聊天程序见程序(poll实现)见清单3
同时处理TCP和UDP服务的回射服务器程序(epoll程序)见清单4



    程序清单1:  
    #include <sys/types.h>  
    #include <sys/socket.h>  
    #include <netinet/in.h>  
    #include <arpa/inet.h>  
    #include <assert.h>  
    #include <stdio.h>  
    #include <unistd.h>  
    #include <errno.h>  
    #include <string.h>  
    #include <fcntl.h>  
    #include <stdlib.h>  
    #include <sys/epoll.h>  
    #include <pthread.h>  
      
    #define MAX_EVENT_NUMBER 1024  
    #define BUFFER_SIZE 10  
      
    int setnonblocking( int fd )  
    {  
        int old_option = fcntl( fd, F_GETFL );  
        int new_option = old_option | O_NONBLOCK;  
        fcntl( fd, F_SETFL, new_option );  
        return old_option;  
    }  
      
    void addfd( int epollfd, int fd, bool enable_et )  
    {  
        epoll_event event;  
        event.data.fd = fd;  
        event.events = EPOLLIN;  
        if( enable_et )  
        {  
            event.events |= EPOLLET;  
        }  
        epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );  
        setnonblocking( fd );  
    }  
      
    void lt( epoll_event* events, int number, int epollfd, int listenfd )  
    {  
        char buf[ BUFFER_SIZE ];  
        for ( int i = 0; i < number; i++ )  
        {  
            int sockfd = events.data.fd;  
            if ( sockfd == listenfd )  
            {  
                struct sockaddr_in client_address;  
                socklen_t client_addrlength = sizeof( client_address );  
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );  
                addfd( epollfd, connfd, false );  
            }  
            else if ( events.events & EPOLLIN )  
            {  
                printf( "event trigger once\n" );  
                memset( buf, '\0', BUFFER_SIZE );  
                int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );  
                if( ret <= 0 )  
                {  
                    close( sockfd );  
                    continue;  
                }  
                printf( "get %d bytes of content: %s\n", ret, buf );  
            }  
            else  
            {  
                printf( "something else happened \n" );  
            }  
        }  
    }  
      
    void et( epoll_event* events, int number, int epollfd, int listenfd )  
    {  
        char buf[ BUFFER_SIZE ];  
        for ( int i = 0; i < number; i++ )  
        {  
            int sockfd = events.data.fd;  
            if ( sockfd == listenfd )  
            {  
                struct sockaddr_in client_address;  
                socklen_t client_addrlength = sizeof( client_address );  
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );  
                addfd( epollfd, connfd, true );  
            }  
            else if ( events.events & EPOLLIN )  
            {  
                printf( "event trigger once\n" );  
                while( 1 )  
                {  
                    memset( buf, '\0', BUFFER_SIZE );  
                    int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );  
                    if( ret < 0 )  
                    {  
                        if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )  
                        {  
                            printf( "read later\n" );  
                            break;  
                        }  
                        close( sockfd );  
                        break;  
                    }  
                    else if( ret == 0 )  
                    {  
                        close( sockfd );  
                    }  
                    else  
                    {  
                        printf( "get %d bytes of content: %s\n", ret, buf );  
                    }  
                }  
            }  
            else  
            {  
                printf( "something else happened \n" );  
            }  
        }  
    }  
      
    int main( int argc, char* argv[] )  
    {  
        if( argc <= 2 )  
        {  
            printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );  
            return 1;  
        }  
        const char* ip = argv[1];  
        int port = atoi( argv[2] );  
      
        int ret = 0;  
        struct sockaddr_in address;  
        bzero( &address, sizeof( address ) );  
        address.sin_family = AF_INET;  
        inet_pton( AF_INET, ip, &address.sin_addr );  
        address.sin_port = htons( port );  
      
        int listenfd = socket( PF_INET, SOCK_STREAM, 0 );  
        assert( listenfd >= 0 );  
      
        ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );  
        assert( ret != -1 );  
      
        ret = listen( listenfd, 5 );  
        assert( ret != -1 );  
      
        epoll_event events[ MAX_EVENT_NUMBER ];  
        int epollfd = epoll_create( 5 );  
        assert( epollfd != -1 );  
        addfd( epollfd, listenfd, true );  
      
        while( 1 )  
        {  
            int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );  
            if ( ret < 0 )  
            {  
                printf( "epoll failure\n" );  
                break;  
            }  
         
            lt( events, ret, epollfd, listenfd );  
            //et( events, ret, epollfd, listenfd );  
        }  
      
        close( listenfd );  
        return 0;  
    }  



    程序清单2  
    #include <sys/types.h>  
    #include <sys/socket.h>  
    #include <netinet/in.h>  
    #include <arpa/inet.h>  
    #include <assert.h>  
    #include <stdio.h>  
    #include <unistd.h>  
    #include <errno.h>  
    #include <string.h>  
    #include <fcntl.h>  
    #include <stdlib.h>  
    #include <sys/epoll.h>  
    #include <pthread.h>  
      
    #define MAX_EVENT_NUMBER 1024  
    #define BUFFER_SIZE 1024  
    struct fds  
    {  
       int epollfd;  
       int sockfd;  
    };  
      
    int setnonblocking( int fd )  
    {  
        int old_option = fcntl( fd, F_GETFL );  
        int new_option = old_option | O_NONBLOCK;  
        fcntl( fd, F_SETFL, new_option );  
        return old_option;  
    }  
      
    void addfd( int epollfd, int fd, bool oneshot )  
    {  
        epoll_event event;  
        event.data.fd = fd;  
        event.events = EPOLLIN | EPOLLET;  
        if( oneshot )  
        {  
            event.events |= EPOLLONESHOT;  
        }  
        epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );  
        setnonblocking( fd );  
    }  
      
    void reset_oneshot( int epollfd, int fd )  
    {  
        epoll_event event;  
        event.data.fd = fd;  
        event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;  
        epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );  
    }  
      
    void* worker( void* arg )  
    {  
        int sockfd = ( (fds*)arg )->sockfd;  
        int epollfd = ( (fds*)arg )->epollfd;  
        printf( "start new thread to receive data on fd: %d\n", sockfd );  
        char buf[ BUFFER_SIZE ];  
        memset( buf, '\0', BUFFER_SIZE );  
        while( 1 )  
        {  
            int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );  
            if( ret == 0 )  
            {  
                close( sockfd );  
                printf( "foreiner closed the connection\n" );  
                break;  
            }  
            else if( ret < 0 )  
            {  
                if( errno == EAGAIN )  
                {  
                    reset_oneshot( epollfd, sockfd );  
                    printf( "read later\n" );  
                    break;  
                }  
            }  
            else  
            {  
                printf( "get content: %s\n", buf );  
                sleep( 5 );  
            }  
        }  
        printf( "end thread receiving data on fd: %d\n", sockfd );  
    }  
      
    int main( int argc, char* argv[] )  
    {  
        if( argc <= 2 )  
        {  
            printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );  
            return 1;  
        }  
        const char* ip = argv[1];  
        int port = atoi( argv[2] );  
      
        int ret = 0;  
        struct sockaddr_in address;  
        bzero( &address, sizeof( address ) );  
        address.sin_family = AF_INET;  
        inet_pton( AF_INET, ip, &address.sin_addr );  
        address.sin_port = htons( port );  
      
        int listenfd = socket( PF_INET, SOCK_STREAM, 0 );  
        assert( listenfd >= 0 );  
      
        ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );  
        assert( ret != -1 );  
      
        ret = listen( listenfd, 5 );  
        assert( ret != -1 );  
      
        epoll_event events[ MAX_EVENT_NUMBER ];  
        int epollfd = epoll_create( 5 );  
        assert( epollfd != -1 );  
        addfd( epollfd, listenfd, false );  
      
        while( 1 )  
        {  
            int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );  
            if ( ret < 0 )  
            {  
                printf( "epoll failure\n" );  
                break;  
            }  
         
            for ( int i = 0; i < ret; i++ )  
            {  
                int sockfd = events.data.fd;  
                if ( sockfd == listenfd )  
                {  
                    struct sockaddr_in client_address;  
                    socklen_t client_addrlength = sizeof( client_address );  
                    int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );  
                    addfd( epollfd, connfd, true );  
                }  
                else if ( events.events & EPOLLIN )  
                {  
                    pthread_t thread;  
                    fds fds_for_new_worker;  
                    fds_for_new_worker.epollfd = epollfd;  
                    fds_for_new_worker.sockfd = sockfd;  
                    pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );  
                }  
                else  
                {  
                    printf( "something else happened \n" );  
                }  
            }  
        }  
      
        close( listenfd );  
        return 0;  
    }  




    程序清单3  
    客户端程序  
    #define _GNU_SOURCE 1  
    #include <sys/types.h>  
    #include <sys/socket.h>  
    #include <netinet/in.h>  
    #include <arpa/inet.h>  
    #include <assert.h>  
    #include <stdio.h>  
    #include <unistd.h>  
    #include <string.h>  
    #include <stdlib.h>  
    #include <poll.h>  
    #include <fcntl.h>  
      
    #define BUFFER_SIZE 64  
      
    int main( int argc, char* argv[] )  
    {  
        if( argc <= 2 )  
        {  
            printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );  
            return 1;  
        }  
        const char* ip = argv[1];  
        int port = atoi( argv[2] );  
      
        struct sockaddr_in server_address;  
        bzero( &server_address, sizeof( server_address ) );  
        server_address.sin_family = AF_INET;  
        inet_pton( AF_INET, ip, &server_address.sin_addr );  
        server_address.sin_port = htons( port );  
      
        int sockfd = socket( PF_INET, SOCK_STREAM, 0 );  
        assert( sockfd >= 0 );  
        if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 )  
        {  
            printf( "connection failed\n" );  
            close( sockfd );  
            return 1;  
        }  
      
        pollfd fds[2];  
        fds[0].fd = 0;  
        fds[0].events = POLLIN;  
        fds[0].revents = 0;  
        fds[1].fd = sockfd;  
        fds[1].events = POLLIN | POLLRDHUP;  
        fds[1].revents = 0;  
        char read_buf[BUFFER_SIZE];  
        int pipefd[2];  
        int ret = pipe( pipefd );  
        assert( ret != -1 );  
      
        while( 1 )  
        {  
            ret = poll( fds, 2, -1 );  
            if( ret < 0 )  
            {  
                printf( "poll failure\n" );  
                break;  
            }  
      
            if( fds[1].revents & POLLRDHUP )  
            {  
                printf( "server close the connection\n" );  
                break;  
            }  
            else if( fds[1].revents & POLLIN )  
            {  
                memset( read_buf, '\0', BUFFER_SIZE );  
                recv( fds[1].fd, read_buf, BUFFER_SIZE-1, 0 );  
                printf( "%s\n", read_buf );  
            }  
      
            if( fds[0].revents & POLLIN )  
            {  
                ret = splice( 0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );  
                ret = splice( pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );  
            }  
        }  
         
        close( sockfd );  
        return 0;  
    }  
      
    服务器程序  
    #define _GNU_SOURCE 1  
    #include <sys/types.h>  
    #include <sys/socket.h>  
    #include <netinet/in.h>  
    #include <arpa/inet.h>  
    #include <assert.h>  
    #include <stdio.h>  
    #include <unistd.h>  
    #include <errno.h>  
    #include <string.h>  
    #include <fcntl.h>  
    #include <stdlib.h>  
    #include <poll.h>  
      
    #define USER_LIMIT 5  
    #define BUFFER_SIZE 64  
    #define FD_LIMIT 65535  
      
    struct client_data  
    {  
        sockaddr_in address;  
        char* write_buf;  
        char buf[ BUFFER_SIZE ];  
    };  
      
    int setnonblocking( int fd )  
    {  
        int old_option = fcntl( fd, F_GETFL );  
        int new_option = old_option | O_NONBLOCK;  
        fcntl( fd, F_SETFL, new_option );  
        return old_option;  
    }  
      
    int main( int argc, char* argv[] )  
    {  
        if( argc <= 2 )  
        {  
            printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );  
            return 1;  
        }  
        const char* ip = argv[1];  
        int port = atoi( argv[2] );  
      
        int ret = 0;  
        struct sockaddr_in address;  
        bzero( &address, sizeof( address ) );  
        address.sin_family = AF_INET;  
        inet_pton( AF_INET, ip, &address.sin_addr );  
        address.sin_port = htons( port );  
      
        int listenfd = socket( PF_INET, SOCK_STREAM, 0 );  
        assert( listenfd >= 0 );  
      
        ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );  
        assert( ret != -1 );  
      
        ret = listen( listenfd, 5 );  
        assert( ret != -1 );  
      
        client_data* users = new client_data[FD_LIMIT];  
        pollfd fds[USER_LIMIT+1];  
        int user_counter = 0;  
        for( int i = 1; i <= USER_LIMIT; ++i )  
        {  
            fds.fd = -1;  
            fds.events = 0;  
        }  
        fds[0].fd = listenfd;  
        fds[0].events = POLLIN | POLLERR;  
        fds[0].revents = 0;  
      
        while( 1 )  
        {  
            ret = poll( fds, user_counter+1, -1 );  
            if ( ret < 0 )  
            {  
                printf( "poll failure\n" );  
                break;  
            }  
         
            for( int i = 0; i < user_counter+1; ++i )  
            {  
                if( ( fds.fd == listenfd ) && ( fds.revents & POLLIN ) )  
                {  
                    struct sockaddr_in client_address;  
                    socklen_t client_addrlength = sizeof( client_address );  
                    int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );  
                    if ( connfd < 0 )  
                    {  
                        printf( "errno is: %d\n", errno );  
                        continue;  
                    }  
                    if( user_counter >= USER_LIMIT )  
                    {  
                        const char* info = "too many users\n";  
                        printf( "%s", info );  
                        send( connfd, info, strlen( info ), 0 );  
                        close( connfd );  
                        continue;  
                    }  
                    user_counter++;  
                    users[connfd].address = client_address;  
                    setnonblocking( connfd );  
                    fds[user_counter].fd = connfd;  
                    fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR;  
                    fds[user_counter].revents = 0;  
                    printf( "comes a new user, now have %d users\n", user_counter );  
                }  
                else if( fds.revents & POLLERR )  
                {  
                    printf( "get an error from %d\n", fds.fd );  
                    char errors[ 100 ];  
                    memset( errors, '\0', 100 );  
                    socklen_t length = sizeof( errors );  
                    if( getsockopt( fds.fd, SOL_SOCKET, SO_ERROR, &errors, &length ) < 0 )  
                    {  
                        printf( "get socket option failed\n" );  
                    }  
                    continue;  
                }  
                else if( fds.revents & POLLRDHUP )  
                {  
                    users[fds.fd] = users[fds[user_counter].fd];  
                    close( fds.fd );  
                    fds = fds[user_counter];  
                    i--;  
                    user_counter--;  
                    printf( "a client left\n" );  
                }  
                else if( fds.revents & POLLIN )  
                {  
                    int connfd = fds.fd;  
                    memset( users[connfd].buf, '\0', BUFFER_SIZE );  
                    ret = recv( connfd, users[connfd].buf, BUFFER_SIZE-1, 0 );  
                    printf( "get %d bytes of client data %s from %d\n", ret, users[connfd].buf, connfd );  
                    if( ret < 0 )  
                    {  
                        if( errno != EAGAIN )  
                        {  
                            close( connfd );  
                            users[fds.fd] = users[fds[user_counter].fd];  
                            fds = fds[user_counter];  
                            i--;  
                            user_counter--;  
                        }  
                    }  
                    else if( ret == 0 )  
                    {  
                        printf( "code should not come to here\n" );  
                    }  
                    else  
                    {  
                        for( int j = 1; j <= user_counter; ++j )  
                        {  
                            if( fds[j].fd == connfd )  
                            {  
                                continue;  
                            }  
                              
                            fds[j].events |= ~POLLIN;  
                            fds[j].events |= POLLOUT;  
                            users[fds[j].fd].write_buf = users[connfd].buf;  
                        }  
                    }  
                }  
                else if( fds.revents & POLLOUT )  
                {  
                    int connfd = fds.fd;  
                    if( ! users[connfd].write_buf )  
                    {  
                        continue;  
                    }  
                    ret = send( connfd, users[connfd].write_buf, strlen( users[connfd].write_buf ), 0 );  
                    users[connfd].write_buf = NULL;  
                    fds.events |= ~POLLOUT;  
                    fds.events |= POLLIN;  
                }  
            }  
        }  
      
        delete [] users;  
        close( listenfd );  
        return 0;  
    }  



    程序清单4 回射服务器程序  
    #include <sys/types.h>  
    #include <sys/socket.h>  
    #include <netinet/in.h>  
    #include <arpa/inet.h>  
    #include <assert.h>  
    #include <stdio.h>  
    #include <unistd.h>  
    #include <errno.h>  
    #include <string.h>  
    #include <fcntl.h>  
    #include <stdlib.h>  
    #include <sys/epoll.h>  
    #include <pthread.h>  
      
    #define MAX_EVENT_NUMBER 1024  
    #define TCP_BUFFER_SIZE 512  
    #define UDP_BUFFER_SIZE 1024  
      
    int setnonblocking( int fd )  
    {  
        int old_option = fcntl( fd, F_GETFL );  
        int new_option = old_option | O_NONBLOCK;  
        fcntl( fd, F_SETFL, new_option );  
        return old_option;  
    }  
      
    void addfd( int epollfd, int fd )  
    {  
        epoll_event event;  
        event.data.fd = fd;  
        //event.events = EPOLLIN | EPOLLET;  
        event.events = EPOLLIN;  
        epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );  
        setnonblocking( fd );  
    }  
      
    int main( int argc, char* argv[] )  
    {  
        if( argc <= 2 )  
        {  
            printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );  
            return 1;  
        }  
        const char* ip = argv[1];  
        int port = atoi( argv[2] );  
      
        int ret = 0;  
        struct sockaddr_in address;  
        bzero( &address, sizeof( address ) );  
        address.sin_family = AF_INET;  
        inet_pton( AF_INET, ip, &address.sin_addr );  
        address.sin_port = htons( port );  
      
        int listenfd = socket( PF_INET, SOCK_STREAM, 0 );  
        assert( listenfd >= 0 );  
      
        ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );  
        assert( ret != -1 );  
      
        ret = listen( listenfd, 5 );  
        assert( ret != -1 );  
      
        bzero( &address, sizeof( address ) );  
        address.sin_family = AF_INET;  
        inet_pton( AF_INET, ip, &address.sin_addr );  
        address.sin_port = htons( port );  
        int udpfd = socket( PF_INET, SOCK_DGRAM, 0 );  
        assert( udpfd >= 0 );  
      
        ret = bind( udpfd, ( struct sockaddr* )&address, sizeof( address ) );  
        assert( ret != -1 );  
      
        epoll_event events[ MAX_EVENT_NUMBER ];  
        int epollfd = epoll_create( 5 );  
        assert( epollfd != -1 );  
        addfd( epollfd, listenfd );  
        addfd( epollfd, udpfd );  
      
        while( 1 )  
        {  
            int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );  
            if ( number < 0 )  
            {  
                printf( "epoll failure\n" );  
                break;  
            }  
         
            for ( int i = 0; i < number; i++ )  
            {  
                int sockfd = events.data.fd;  
                if ( sockfd == listenfd )  
                {  
                    struct sockaddr_in client_address;  
                    socklen_t client_addrlength = sizeof( client_address );  
                    int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );  
                    addfd( epollfd, connfd );  
                }  
                else if ( sockfd == udpfd )  
                {  
                    char buf[ UDP_BUFFER_SIZE ];  
                    memset( buf, '\0', UDP_BUFFER_SIZE );  
                    struct sockaddr_in client_address;  
                    socklen_t client_addrlength = sizeof( client_address );  
      
                    ret = recvfrom( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, &client_addrlength );  
                    if( ret > 0 )  
                    {  
                        sendto( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, client_addrlength );  
                    }  
                }  
                else if ( events.events & EPOLLIN )  
                {  
                    char buf[ TCP_BUFFER_SIZE ];  
                    while( 1 )  
                    {  
                        memset( buf, '\0', TCP_BUFFER_SIZE );  
                        ret = recv( sockfd, buf, TCP_BUFFER_SIZE-1, 0 );  
                        if( ret < 0 )  
                        {  
                            if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )  
                            {  
                                break;  
                            }  
                            close( sockfd );  
                            break;  
                        }  
                        else if( ret == 0 )  
                        {  
                            close( sockfd );  
                        }  
                        else  
                        {  
                            send( sockfd, buf, ret, 0 );  
                        }  
                    }  
                }  
                else  
                {  
                    printf( "something else happened \n" );  
                }  
            }  
        }  
      
        close( listenfd );  
        return 0;  
    }  






运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-21436-1-1.html 上篇帖子: 解决VTune错误.../lib64/libstdc++.so.6: version `GLIBCXX_3.4.14' not found... 下篇帖子: centos 安装 网络安装使用本地镜像文件安装 服务器 Linux
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表