设为首页 收藏本站
查看: 568|回复: 0

[经验分享] ServerSocketChannel实现多Selector高并发server

[复制链接]

尚未签到

发表于 2017-12-7 23:31:25 | 显示全部楼层 |阅读模式
  参考hbase RpcServer,编写了一个简洁版多Selector server,对nio怎么用,Selector如何选择事件会有更深入的认识。
  client端发送消息:内容长度 + 内容,200线程同时发送
  server端接收消息:解析内容长度和内容,返回2MB测试数据给客户端
  Server端:一个accept selector,多个read selector,一个write selector



package com.ai.nio;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Created by wangkai8 on 17/1/5.
*/
public class Server {
public static final Log LOG = LogFactory.getLog(Server.class);
private BlockingQueue<Call> queue = new LinkedBlockingQueue<Call>();
private Queue<Call> responseCalls = new ConcurrentLinkedQueue<Call>();
volatile boolean running = true;
private Responder responder = null;
private static int NIO_BUFFER_LIMIT = 64 * 1024;
private int handler = 10;

class Listener extends Thread {
Selector selector;
Reader[] readers;
int robin;
int readNum;
Listener(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port), 150);
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
readNum = 10;
readers = new Reader[readNum];
for(int i = 0; i < readNum; i++) {
readers = new Reader(i);
readers.start();
}
}

public void run() {
while(running) {
try {
selector.select();
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if(key.isValid()) {
if(key.isAcceptable()) {
doAccept(key);
}
}
}
} catch (IOException e) {
LOG.error("", e);
}
}
}
public void doAccept(SelectionKey selectionKey) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel;
while((socketChannel = serverSocketChannel.accept()) != null) {
try {
socketChannel.configureBlocking(false);
socketChannel.socket().setTcpNoDelay(true);
socketChannel.socket().setKeepAlive(true);
} catch (IOException e) {
socketChannel.close();
throw e;
}
Reader reader = getReader();
try {
reader.startAdd();
SelectionKey readKey = reader.registerChannel(socketChannel);
Connection c = new Connection(socketChannel);
readKey.attach(c);
} finally {
reader.finishAdd();
}
}
}
public Reader getReader() {
if(robin == Integer.MAX_VALUE) {
robin = 0;
}
return readers[(robin ++) % readNum];
}
}

class Reader extends Thread {
Selector readSelector;
boolean adding;
Reader(int i) throws IOException {
setName("Reader-" + i);
this.readSelector = Selector.open();
LOG.info("Starting Reader-" + i + "...");
}
@Override
public void run() {
while(running) {
try {
readSelector.select();
while(adding) {
synchronized(this) {
this.wait(1000);
}
}
Iterator<SelectionKey> it = readSelector.selectedKeys().iterator();
while(it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if(key.isValid()) {
if(key.isReadable()) {
doRead(key);
}
}
}
} catch (IOException e) {
LOG.error("", e);
} catch (InterruptedException e) {
LOG.error("", e);
}
}
}
public void doRead(SelectionKey selectionKey) {
Connection c = (Connection) selectionKey.attachment();
if(c == null) {
return;
}
int n;
try {
n = c.readAndProcess();
} catch (IOException e) {
LOG.error("", e);
n = -1;
} catch (Exception e) {
LOG.error("", e);
n = -1;
}
if(n == -1) {
c.close();
}
}
public SelectionKey registerChannel(SocketChannel channel) throws IOException {
return channel.register(readSelector, SelectionKey.OP_READ);
}
public void startAdd() {
adding = true;
readSelector.wakeup();
}
public synchronized void finishAdd() {
adding = false;
this.notify();
}
}

class Connection {
private SocketChannel channel;
private ByteBuffer dataBufferLength;
private ByteBuffer dataBuffer;
private boolean skipHeader;
public Connection(SocketChannel channel) {
this.channel = channel;
this.dataBufferLength = ByteBuffer.allocate(4);
}
public int readAndProcess() throws IOException {
int count;
if(!skipHeader) {
count = channelRead(channel, dataBufferLength);
if (count < 0 || dataBufferLength.remaining() > 0) {
return count;
}
}
skipHeader = true;
if(dataBuffer == null) {
dataBufferLength.flip();
int dataLength = dataBufferLength.getInt();
dataBuffer = ByteBuffer.allocate(dataLength);
}
count = channelRead(channel, dataBuffer);
if(count >= 0 && dataBuffer.remaining() == 0) {
process();
}
return count;
}

/**
* process the dataBuffer
*/
public void process() {
dataBuffer.flip();
byte[] data = dataBuffer.array();
Call call = new Call(this, data, responder);
try {
queue.put(call);
} catch (InterruptedException e) {
LOG.error("", e);
}
}

public void close() {
if(channel != null) {
try {
channel.close();
} catch (IOException e) {
}
}
}
}

class Responder extends Thread {
Selector writeSelector;
public Responder() throws IOException {
writeSelector = Selector.open();
}
public void run() {
while(running) {
try {
registWriters();
int n = writeSelector.select(1000);
if(n == 0) {
continue;
}
Iterator<SelectionKey> it = writeSelector.selectedKeys().iterator();
while(it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if(key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}
}
} catch (IOException e) {
LOG.error("", e);
}
}
}

public void registWriters() throws IOException {
Iterator<Call> it = responseCalls.iterator();
while(it.hasNext()) {
Call call = it.next();
it.remove();
SelectionKey key = call.conn.channel.keyFor(writeSelector);
try {
if (key == null) {
try {
call.conn.channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//the client went away
if (LOG.isTraceEnabled())
LOG.trace("the client went away", e);
}
} else {
key.interestOps(SelectionKey.OP_WRITE);
}
} catch (CancelledKeyException e) {
if (LOG.isTraceEnabled())
LOG.trace("the client went away", e);
}
}
}

public void registerForWrite(Call call) throws IOException {
responseCalls.add(call);
writeSelector.wakeup();
}
private void doAsyncWrite(SelectionKey key) throws IOException {
Call call = (Call) key.attachment();
if(call.conn.channel != key.channel()) {
throw new IOException("bad channel");
}
int numBytes = channelWrite(call.conn.channel, call.response);
if(numBytes < 0 || call.response.remaining() == 0) {
try {
key.interestOps(0);
} catch (CancelledKeyException e) {
LOG.warn("Exception while changing ops : " + e);
}
}
}
private void doResponse(Call call) throws IOException {
//if data not fully send, then register the channel for async writer
if(!processResponse(call)) {
registerForWrite(call);
}
}
private boolean processResponse(Call call) throws IOException {
boolean error = true;
try {
int numBytes = channelWrite(call.conn.channel, call.response);
if (numBytes < 0) {
throw new IOException("error socket write");
}
error = false;
} finally {
if(error) {
call.conn.close();
}
}
if(!call.response.hasRemaining()) {
call.done = true;
return true;
}
return false;
}
}
class Handler extends Thread {
public Handler(int i) {
setName("handler-" + i);
LOG.info("Starting Handler-" + i + "...");
}
public void run() {
while(running) {
try {
Call call = queue.take();
process(call);
} catch (InterruptedException e) {
LOG.error("", e);
} catch (IOException e) {
LOG.error("", e);
}
}
}
public void process(Call call) throws IOException {
byte[] request = call.request;
String message = new String(request);
LOG.info("received mseesage: " + message);
//each channel write 2MB data for test
int dataLength = 2 * 1024 * 1024;
ByteBuffer buffer = ByteBuffer.allocate(4 + dataLength);
buffer.putInt(dataLength);
writeDataForTest(buffer);
buffer.flip();
call.response = buffer;
responder.doResponse(call);
}
}
public void writeDataForTest(ByteBuffer buffer) {
int n = buffer.limit() - 4;
for(int i = 0; i < n; i++) {
buffer.put((byte)0);
}
}

class Call {
Connection conn;
byte[] request;
Responder responder;
ByteBuffer response;
boolean done;
public Call(Connection conn, byte[] request, Responder responder) {
this.conn = conn;
this.request = request;
this.responder = responder;
}
}

public int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
return buffer.remaining() <= NIO_BUFFER_LIMIT ? channel.read(buffer) : channleIO(channel, null, buffer);
}
public int channelWrite(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
return buffer.remaining() <= NIO_BUFFER_LIMIT ? channel.write(buffer) : channleIO(null, channel, buffer);
}

public int channleIO(ReadableByteChannel readCh, WritableByteChannel writeCh, ByteBuffer buffer) throws IOException {
int initRemaining = buffer.remaining();
int originalLimit = buffer.limit();
int ret = 0;
try {
while (buffer.remaining() > 0) {
int ioSize = Math.min(buffer.remaining(), NIO_BUFFER_LIMIT);
buffer.limit(buffer.position() + ioSize);
ret = readCh == null ? writeCh.write(buffer) : readCh.read(buffer);
if (ret < ioSize) {
break;
}
}
} finally {
buffer.limit(originalLimit);
}
int byteRead = initRemaining - buffer.remaining();
return byteRead > 0 ? byteRead : ret;
}

public void startHandler() {
for(int i = 0; i < handler; i++) {
new Handler(i).start();
}
}

public void start() throws IOException {
new Listener(10000).start();
responder = new Responder();
responder.start();
startHandler();
LOG.info("server startup! ");
}
public static void main(String[] args) throws IOException {
Server server = new Server();
server.start();
}
}
  Client端:



package com.ai.nio;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.net.SocketFactory;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* Created by wangkai8 on 17/1/6.
*/
public class Client {
public static final Log LOG = LogFactory.getLog(Client.class);
Socket socket;
OutputStream out;
InputStream in;
public Client() throws IOException {
socket = SocketFactory.getDefault().createSocket();
socket.setTcpNoDelay(true);
socket.setKeepAlive(true);
InetSocketAddress server = new InetSocketAddress("localhost", 10000);
socket.connect(server, 10000);
out = socket.getOutputStream();
in = socket.getInputStream();
}

public void send(String message) throws IOException {
byte[] data = message.getBytes();
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(data.length);
dos.write(data);
out.flush();
}

public static void main(String[] args) throws IOException {
int n = 200;
for(int i = 0; i < n; i++) {
new Thread() {
Client client = new Client();
public void run() {
try {
client.send(getName() + "_xiaomiemie");
DataInputStream inputStream = new DataInputStream(client.in);
int dataLength = inputStream.readInt();
byte[] data = new byte[dataLength];
inputStream.readFully(data);
client.socket.close();
LOG.info("receive from server: dataLength=" + data.length);
} catch (IOException e) {
LOG.error("", e);
} catch (Exception e) {
LOG.error("", e);
}
}
}.start();
}
}
}
  转载请标注原文地址:http://www.cnblogs.com/yueweimian/p/6262211.html

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-421954-1-1.html 上篇帖子: Windows 2012 IIS ASP.NET 安装 下篇帖子: SQL Server插入数据和删除数据
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表