设为首页 收藏本站
查看: 741|回复: 0

[经验分享] C++ IOCP windows服务器

[复制链接]

尚未签到

发表于 2018-6-13 12:27:06 | 显示全部楼层 |阅读模式
  首先,启动主线程,接收来自客户端的请求。并启动4个子线程接收已经建连的客户端发来的消息。此时主线不阻塞,继续接收新的注册请求。4个子线程处理发来的消息,并解析消息,将要做的任务交给线程池处理。自己继续处理发来的消息。
  IOCP服务器实现
#pragma once
#include <string>
#include <winsock2.h>   
#include <Windows.h>  
#include <vector>
#include <iostream>
#include "CThreadPool.h"
#include "WorkA.h"
#include "WorkB.h"
using namespace std;
#pragma comment(lib, "Ws2_32.lib")      // Socket编程需用的动态链接库   
//#pragma comment(lib, "Kernel32.lib")
#define DefaultIP  "127.0.0.1"
#define DefaultPort 9999
#define DefaultClientNum 6000
#define MessMaxLen   1024
#define DataBuffSize   2 * 1024
/**
* 结构体名称:PER_IO_DATA
* 结构体功能:重叠I/O需要用到的结构体,临时记录IO数据
**/  
typedef struct  
{  
    OVERLAPPED overlapped;  
    WSABUF databuff;  
    char buffer[ DataBuffSize ];  
    int BufferLen;  
    int operationType;  
SOCKET socket;
}PER_IO_OPERATEION_DATA, *LPPER_IO_OPERATION_DATA, *LPPER_IO_DATA, PER_IO_DATA;
/**
* 结构体名称:PER_HANDLE_DATA
* 结构体存储:记录单个套接字的数据,包括了套接字的变量及套接字的对应的客户端的地址。
* 结构体作用:当服务器连接上客户端时,信息存储到该结构体中,知道客户端的地址以便于回访。
**/  
typedef struct  
{  
    SOCKET socket;  
    SOCKADDR_STORAGE ClientAddr;  
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
//单例类
class CMYIOCPServer
{
public:
~CMYIOCPServer(void);
bool ServerSetUp();
void SetServerIp(const string & sIP=DefaultIP);
void SetPort(const int &iPort=DefaultPort);
void SetMaxClientNum(const int &iMaxNum = DefaultClientNum);
static DWORD WINAPI  ServerWorkThread(LPVOID CompletionPortID);
static void SendMessage(SOCKET &tSOCKET,char MessAge[MessMaxLen]);
static CMYIOCPServer* GetInstance();
private:
//私有方法
CMYIOCPServer(void);
bool  LoadWindowsSocket();
bool InitServerSocket();
bool CreateServerSocker();
static void HandleMessage();
//私有数据
string m_sServerIP;
int m_iLisenPoint;
string m_sError;
int m_iMaxClientNum;
vector< PER_HANDLE_DATA* > m_vclientGroup;//保持客户端的连接信息  
    static HANDLE m_hMutex;//多线程访问互斥变量
static HANDLE m_completionPort;
SOCKET m_srvSocket;
static CMYIOCPServer *m_pInstance;
static char m_byteMsg[MessMaxLen];
static CWorkQueue m_CWorkQueue;//线程池
};
#include "StdAfx.h"
#include "MYIOCPServer.h"
HANDLE  CMYIOCPServer::m_completionPort = NULL;
HANDLE CMYIOCPServer:: m_hMutex = NULL;
CMYIOCPServer* CMYIOCPServer::  m_pInstance = NULL;
char CMYIOCPServer::m_byteMsg[MessMaxLen] = {0} ;
CWorkQueue CMYIOCPServer:: m_CWorkQueue;
/**************************
   获得单例对象
***************************/
CMYIOCPServer* CMYIOCPServer::GetInstance()
{
if(NULL == m_pInstance)
{
  m_pInstance = new CMYIOCPServer();
}
m_CWorkQueue.Create(10);
return m_pInstance;
}
/**************************
      类的构造函数
**************************/
CMYIOCPServer::CMYIOCPServer(void)
{
m_iLisenPoint = DefaultPort;
}
/**************************
      类的析构函数
**************************/
CMYIOCPServer::~CMYIOCPServer(void)
{
m_CWorkQueue.Destroy(5);
}
/**************************
      设置服务器IP
**************************/
void CMYIOCPServer::SetServerIp(const string & sIP)
{
m_sServerIP = sIP;
}
/**************************
      设置服务器端口
**************************/
void  CMYIOCPServer::SetPort(const int &iPort)
{
m_iLisenPoint = iPort;
}
/**************************
  设置最大的客户端连接数目
**************************/
void  CMYIOCPServer::SetMaxClientNum(const int &iMaxNum)
{
m_iMaxClientNum = iMaxNum;
}
/**************************
  服务器接收客户端消息,
  工作线程
**************************/
DWORD WINAPI   CMYIOCPServer::ServerWorkThread(LPVOID CompletionPortID)
{
    HANDLE CompletionPort = (HANDLE)CompletionPortID;  
    DWORD BytesTransferred;  
    LPOVERLAPPED IpOverlapped;  
    LPPER_HANDLE_DATA PerHandleData = NULL;  
    LPPER_IO_DATA PerIoData = NULL;  
    DWORD RecvBytes;  
    DWORD Flags = 0;  
    BOOL bRet = false;  
    while(true){  
        bRet = GetQueuedCompletionStatus(m_completionPort, &BytesTransferred, (PULONG_PTR)&PerHandleData, (LPOVERLAPPED*)&IpOverlapped, INFINITE);  
        if(bRet == 0){  
            cerr << "GetQueuedCompletionStatus Error: " << GetLastError() << endl;  
            continue;
   //这里不能返回,返回子线程就结束了
   //return -1;  
        }  
        PerIoData = (LPPER_IO_DATA)CONTAINING_RECORD(IpOverlapped, PER_IO_DATA, overlapped);  
        // 检查在套接字上是否有错误发生   
        if(0 == BytesTransferred){  
            closesocket(PerHandleData->socket);  
            GlobalFree(PerHandleData);  
            GlobalFree(PerIoData);  
            continue;  
        }  
  //得到消息码流
  memset(m_byteMsg,0,MessMaxLen);
        memcpy(m_byteMsg,PerIoData->databuff.buf,MessMaxLen);
  //得到客户端SOCKET信息
  SOCKET sClientSocket = PerHandleData->socket;
  printf("message is %s \n",m_byteMsg);
  HandleMessage();
  //SendMessage(sClientSocket,m_byteMsg);
        // 为下一个重叠调用建立单I/O操作数据   
        ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空内存   
        PerIoData->databuff.len = 1024;  
        PerIoData->databuff.buf = PerIoData->buffer;  
        PerIoData->operationType = 0;    // read   
        WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);  
    }
return  0;
}
/*******************
处理消息类
*******************/
void CMYIOCPServer::HandleMessage()
{
    printf("thread is %d\n",GetCurrentThreadId());
WorkItemBase *pCworkItem = NULL;
byte iCommand = m_byteMsg[0];
    switch(iCommand)
{
    case  '0':
   pCworkItem = new CWorkA();
    break;
  case '1':
   pCworkItem = new CWorkB();
    break;
  case '2':
    break;
  default:
   break;
}
//将任务交给线程池处理
if(NULL != pCworkItem)
{
  m_CWorkQueue.InsertWorkItem(pCworkItem);
}
printf("Finish Handle Message\n");
}
/**************************
   发送消息给制定客户端
**************************/
void CMYIOCPServer::SendMessage(SOCKET &tSOCKET,char MessAge[MessMaxLen])
{
   // 开始数据处理,接收来自客户端的数据   
      WaitForSingleObject(m_hMutex,INFINITE);  
       send(tSOCKET, MessAge, MessMaxLen, 0);  // 发送信息   
       ReleaseMutex(m_hMutex);  
}
/**************************
  初始化SOCKET对象,创建端口
  和线程数组
**************************/
bool  CMYIOCPServer::LoadWindowsSocket()
{
// 加载socket动态链接库   
    WORD wVersionRequested = MAKEWORD(2, 2); // 请求2.2版本的WinSock库   
    WSADATA wsaData;    // 接收Windows Socket的结构信息   
    DWORD err = WSAStartup(wVersionRequested, &wsaData);  
    if (0 != err){  // 检查套接字库是否申请成功   
  m_sError = "Request Windows Socket Library Error!\n";
        return false;  
    }  
    if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2){// 检查是否申请了所需版本的套接字库   
        WSACleanup();  
  m_sError ="Request Windows Socket Version 2.2 Error!\n";
        system("pause");  
        return false;  
    }
// 创建IOCP的内核对象   
    /**
     * 需要用到的函数的原型:
     * HANDLE WINAPI CreateIoCompletionPort(
     *    __in   HANDLE FileHandle,     // 已经打开的文件句柄或者空句柄,一般是客户端的句柄
     *    __in   HANDLE ExistingCompletionPort, // 已经存在的IOCP句柄
     *    __in   ULONG_PTR CompletionKey,   // 完成键,包含了指定I/O完成包的指定文件
     *    __in   DWORD NumberOfConcurrentThreads // 真正并发同时执行最大线程数,一般推介是CPU核心数*2
     * );
     **/  
    m_completionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0);  
    if (NULL == m_completionPort){    // 创建IO内核对象失败   
        m_sError ="CreateIoCompletionPort failed. Error:\n";
        return false;  
    }  
// 创建IOCP线程--线程里面创建线程池   
    // 确定处理器的核心数量   
    SYSTEM_INFO mySysInfo;  
    GetSystemInfo(&mySysInfo);  
    // 基于处理器的核心数量创建线程   
    for(DWORD i = 0; i < (mySysInfo.dwNumberOfProcessors * 2); ++i){  
        // 创建服务器工作器线程,并将完成端口传递到该线程   
  HANDLE ThreadHandle = CreateThread(NULL, 0,  &CMYIOCPServer::ServerWorkThread, m_completionPort, 0, NULL);  
        if(NULL == ThreadHandle){   
         m_sError ="Create Thread Handle failed. Error::\n";  
        }  
        CloseHandle(ThreadHandle);  
    }
return true;
}
/*************************
   初始化服务器SOCKET信息
*************************/
bool CMYIOCPServer::InitServerSocket()
{
// 建立流式套接字   
    m_srvSocket = socket(AF_INET, SOCK_STREAM, 0);  
// 绑定SOCKET到本机   
    SOCKADDR_IN srvAddr;  
    srvAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);  
    srvAddr.sin_family = AF_INET;  
    srvAddr.sin_port = htons(m_iLisenPoint);  
    int bindResult = bind(m_srvSocket, (SOCKADDR*)&srvAddr, sizeof(SOCKADDR));  
    if(SOCKET_ERROR == bindResult){  
  m_sError = "Bind failed. Error:";
        return false;  
    }
return true;
}
/**************************
创建服务器端的监听信息
**************************/
bool CMYIOCPServer::CreateServerSocker()
{  
// 将SOCKET设置为监听模式   
    int listenResult = listen(m_srvSocket, 10);  
    if(SOCKET_ERROR == listenResult){  
  m_sError = "Listen failed. Error: " ;
        return false;  
    }  
// 开始处理IO数据   
    cout << "本服务器已准备就绪,正在等待客户端的接入...\n";
    int icount = 0;
    while(true){  
        PER_HANDLE_DATA * PerHandleData = NULL;  
        SOCKADDR_IN saRemote;  
        int RemoteLen;  
        SOCKET acceptSocket;  
        // 接收连接,并分配完成端,这儿可以用AcceptEx()   
        RemoteLen = sizeof(saRemote);  
        acceptSocket = accept(m_srvSocket, (SOCKADDR*)&saRemote, &RemoteLen);  
        if(SOCKET_ERROR == acceptSocket){   // 接收客户端失败   
            cerr << "Accept Socket Error: " << GetLastError() << endl;  
            m_sError = "Accept Socket Error: " ;
   icount++;
   if(icount > 50)
   {
    return false;
   }
   continue;  
        }
  icount = 0;
        // 创建用来和套接字关联的单句柄数据信息结构   
        PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));  // 在堆中为这个PerHandleData申请指定大小的内存   
        PerHandleData -> socket = acceptSocket;  
        memcpy (&PerHandleData -> ClientAddr, &saRemote, RemoteLen);  
        m_vclientGroup.push_back(PerHandleData);       // 将单个客户端数据指针放到客户端组中   
        // 将接受套接字和完成端口关联   
        CreateIoCompletionPort((HANDLE)(PerHandleData -> socket), m_completionPort, (DWORD)PerHandleData, 0);  
         
        // 开始在接受套接字上处理I/O使用重叠I/O机制   
        // 在新建的套接字上投递一个或多个异步   
        // WSARecv或WSASend请求,这些I/O请求完成后,工作者线程会为I/O请求提供服务      
        // 单I/O操作数据(I/O重叠)   
        LPPER_IO_OPERATION_DATA PerIoData = NULL;  
        PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATEION_DATA));  
        ZeroMemory(&(PerIoData -> overlapped), sizeof(OVERLAPPED));  
        PerIoData->databuff.len = 1024;  
        PerIoData->databuff.buf = PerIoData->buffer;  
        PerIoData->operationType = 0;    // read   
        DWORD RecvBytes;  
        DWORD Flags = 0;  
        WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);  
    }
//销毁资源
DWORD dwByteTrans;
PostQueuedCompletionStatus(m_completionPort, dwByteTrans, 0, 0);
closesocket(listenResult);
return true;
}
/*********************
启动服务器
*********************/
bool CMYIOCPServer::ServerSetUp()
{
if(false == LoadWindowsSocket())
{
  return false;
}
if(false == InitServerSocket())
{
  return false;
}
if(false == CreateServerSocker())
{
  return false;
}
return true;
}  SOCKET客户端实现
// IOCPClient.cpp : 定义控制台应用程序的入口点。
//
#include "stdafx.h"
#include <iostream>   
#include <cstdio>   
#include <string>   
#include <cstring>   
#include <winsock2.h>   
#include <Windows.h>   
using namespace std;  
#pragma comment(lib, "Ws2_32.lib")      // Socket编程需用的动态链接库   
SOCKET sockClient;      // 连接成功后的套接字   
HANDLE bufferMutex;     // 令其能互斥成功正常通信的信号量句柄   
const int DefaultPort = 9999;
int _tmain(int argc, _TCHAR* argv[])
{  
// 加载socket动态链接库(dll)   
    WORD wVersionRequested;  
    WSADATA wsaData;    // 这结构是用于接收Wjndows Socket的结构信息的   
    wVersionRequested = MAKEWORD( 2, 2 );   // 请求2.2版本的WinSock库   
    int err = WSAStartup( wVersionRequested, &wsaData );  
    if ( err != 0 ) {   // 返回值为零的时候是表示成功申请WSAStartup   
        return -1;  
    }  
    if ( LOBYTE( wsaData.wVersion ) != 2 || HIBYTE( wsaData.wVersion ) != 2 ) { // 检查版本号是否正确   
        WSACleanup( );  
        return -1;   
    }  
// 创建socket操作,建立流式套接字,返回套接字号sockClient   
     sockClient = socket(AF_INET, SOCK_STREAM, 0);  
     if(sockClient == INVALID_SOCKET) {   
        printf("Error at socket():%ld\n", WSAGetLastError());   
        WSACleanup();   
        return -1;   
      }   
// 将套接字sockClient与远程主机相连   
    // int connect( SOCKET s,  const struct sockaddr* name,  int namelen);   
    // 第一个参数:需要进行连接操作的套接字   
    // 第二个参数:设定所需要连接的地址信息   
    // 第三个参数:地址的长度   
    SOCKADDR_IN addrSrv;  
    addrSrv.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");      // 本地回路地址是127.0.0.1;   
    addrSrv.sin_family = AF_INET;  
    addrSrv.sin_port = htons(DefaultPort);  
    while(SOCKET_ERROR == connect(sockClient, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR))){  
        // 如果还没连接上服务器则要求重连   
        cout << "服务器连接失败,是否重新连接?(Y/N):";  
        char choice;  
        while(cin >> choice && (!((choice != 'Y' && choice == 'N') || (choice == 'Y' && choice != 'N')))){  
            cout << "输入错误,请重新输入:";  
            cin.sync();  
            cin.clear();  
        }  
        if (choice == 'Y'){  
            continue;  
        }  
        else{  
            cout << "退出系统中...";  
            system("pause");  
            return 0;  
        }  
    }  
    cin.sync();  
    cout << "本客户端已准备就绪,用户可直接输入文字向服务器反馈信息。\n";  
    send(sockClient, "\nAttention: A Client has enter...\n", 200, 0);  
    bufferMutex = CreateSemaphore(NULL, 1, 1, NULL);   
    DWORD WINAPI SendMessageThread(LPVOID IpParameter);  
    DWORD WINAPI ReceiveMessageThread(LPVOID IpParameter);  
    HANDLE sendThread = CreateThread(NULL, 0, SendMessageThread, NULL, 0, NULL);   
    HANDLE receiveThread = CreateThread(NULL, 0, ReceiveMessageThread, NULL, 0, NULL);   
         
    WaitForSingleObject(sendThread, INFINITE);  // 等待线程结束   
    closesocket(sockClient);  
    CloseHandle(sendThread);  
    CloseHandle(receiveThread);  
    CloseHandle(bufferMutex);  
    WSACleanup();   // 终止对套接字库的使用   
    printf("End linking...\n");  
    printf("\n");  
    system("pause");  
    return 0;  
}  
  
DWORD WINAPI SendMessageThread(LPVOID IpParameter)  
{  
    while(1){  
  string talk;
          char talkbuffer[200];  
        //gets(talkbuffer);  
        int iRand = rand()%3;
  Sleep(iRand * 1000);
  if(1 == iRand)
  {
   talkbuffer[0]='0';
  }
  else
  {
   talkbuffer[0]='1';
  }
  talkbuffer[1] = '\0';
  int len;  
        for (len = 0; talkbuffer[len] != '\0'; ++len){  
            // 找出这个字符组的长度   
        }  
        talkbuffer[len] = '\n';  
        talkbuffer[++len] = '\0';
  talk = talkbuffer;
        WaitForSingleObject(bufferMutex, INFINITE);     // P(资源未被占用)     
        if("quit" == talk){  
            talk.push_back('\0');  
            send(sockClient, talk.c_str(), 200, 0);  
            break;  
        }  
        else{  
            talk.append("\n");  
        }  
        printf("\nI Say:(\"quit\"to exit):");  
      //  cout << talk;
  //talk = talk + "\n\0";
   //  string talka = "1234\n";
  string talkb  = "aaaaaaaaaaaaaaaaaaaaaaa";
  string talka  = "123\n";
  talk = talk + "\0";
// talk =  talk + "\n";
  send(sockClient, talkbuffer, 200, 0); // 发送信息  
  send(sockClient, talka.c_str(), 200, 0); // 发送信息  
// send(sockClient, "\nAttention: A Client has enter...\n", 200, 0);   
        ReleaseSemaphore(bufferMutex, 1, NULL);     // V(资源占用完毕)   
    }  
    return 0;  
}  
  
DWORD WINAPI ReceiveMessageThread(LPVOID IpParameter)  
{  
    while(1){     
        char recvBuf[300];  
        recv(sockClient, recvBuf, 200, 0);  
        WaitForSingleObject(bufferMutex, INFINITE);     // P(资源未被占用)     
        printf("%s Says: %s", "Server", recvBuf);       // 接收信息   
        ReleaseSemaphore(bufferMutex, 1, NULL);     // V(资源占用完毕)   
    }  
    return 0;  
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-523328-1-1.html 上篇帖子: Windows Logon Type的含义 下篇帖子: windows下缩短time_wait的时间
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表