|
要用到多线程以及线程的读写锁,之前写的Socket类、ServerSocket要做相应的改变
因为服务器端要维持着一个存储客户端Socket信息到数据结构,当多个线程同时访问这个结构时,要做同步处理,所以要在适当的时候加上读锁或写锁。
新的ServerSocket类
#ifndef SERVERSOCKET_H
#define SERVERSOCKET_H
#include "Socket.h"
#include
#include
#include "ThreadReadWriteLock.h"
using std::list;
class ServerSocket:public Socket
{
public:
ServerSocket(const int port);
ServerSocket();
virtual ~ServerSocket();
void Accept(Socket& socket);
//run server to connect multi-clients
void Run();
private:
//accept multi-clients
bool Accept();
void AddClient(Socket* clientSocket);
static void DeleteClient(Socket* clientSocket);
static void* ProcessMessage(void* arg);
static void SendMsgToAllUsers(const std::string& message);
static list clientSockets;
static bool serviceFlag;
//use thread-read-write-lock to synchronize threads
static ThreadReadWriteLock readWriteLock;
};
#endif
其中有static成员函数,因为创建一个新的线程时,要传递一个函数指针,不过类普通的成员函数的函数指针与一般的函数指针是不兼容的,所以要传递static成员函数的函数指针。
以下是ServerSocket的新实现:
ServerSocket.cpp
#include "ServerSocket.h"
#include "SocketException.h"
#include
#include
list ServerSocket::clientSockets;
ThreadReadWriteLock ServerSocket::readWriteLock;
bool ServerSocket::serviceFlag=true;
ServerSocket::ServerSocket(const int port)
{
if ( ! Socket::Create() )
{
throw SocketException ( "Could not create server socket." );
}
if ( ! Socket::Bind ( port ) )
{
throw SocketException ( "Could not bind to port." );
}
if ( ! Socket::Listen() )
{
throw SocketException ( "Could not listen to socket." );
}
}
ServerSocket::~ServerSocket()
{
list::iterator iter;
for(iter=clientSockets.begin();iter!=clientSockets.end();iter++)
delete (*iter);
}
void ServerSocket::Accept(Socket& socket)
{
if ( ! Socket::Accept ( socket ) )
{
throw SocketException ( "Could not accept socket." );
}
}
bool ServerSocket::Accept()
{
Socket* clientSocket=new Socket;
Accept(*clientSocket);
AddClient(clientSocket);
//create new thread for a new client
pthread_t newThread;
int result=pthread_create(&newThread,NULL,ProcessMessage,static_cast(clientSocket));
if(result!=0)
return false;
//detach the newThread
//so when newThread exits it can release it's resource
result=pthread_detach(newThread);
if(result!=0)
perror("Failed to detach thread");
return true;
}
void ServerSocket::Run()
{
while(serviceFlag)
{
if(clientSockets.size()>=static_cast(MAXCONNECTION))
serviceFlag=false;
else
serviceFlag=Accept();
sleep(1);
}
}
void* ServerSocket::ProcessMessage(void* arg)
{
std::string message;
Socket* clientSocket=static_cast(arg);
Send(*clientSocket,"Welcome!");
while(serviceFlag)
{
Receive(*clientSocket,message);
if(message=="exit")
{
Send(*clientSocket,"user_exit");
DeleteClient(clientSocket);
break;
}
else
SendMsgToAllUsers(message);
sleep(1);
}
pthread_exit(NULL);
return NULL;
}
void ServerSocket::AddClient(Socket* socket)
{
if(readWriteLock.SetWriteLock())
{
clientSockets.push_back(socket);
std::cout |
|
|