Apache MIna 之旅
一、概述现在已经是World Wide Web的时代,无数的web应用框架被创造出来从而大大的提高了web开发的速度。抛开WWW的这个优势,我们知道还有很多协议是HTTP协议所无法替代的。有时,我们仍然需要构造c/s应用来实现适当的协议。
二、MINA是什么?
你有没有曾经使用java或者其他语言实现过某个协议栈?就像你所经历过的那样,编写网络应用即使对于有经验的开发者也不是容易的事情。这归咎于以下几个方面:
* 没有为开发者设计的合适的网络应用框架.
* 使你无法在有限的时间内创建你的应用.
* 网络I/O编码,消息的编/解码,业务逻辑常常纠缠在一起.
* 使程序失去可维护性和可复用性
* 网络应用难于进行单元测试
* 你失去了敏捷性
MINA是一个网络应用框架,在不牺牲性能和可扩展性的前提下用于解决上面的所有问题。
三、编写一个 Echo Server
MINA包含两层:IO层和协议层。我们首先仅使用IO层来实现一个echo服务,因为协议层通常是建立在IO层之上的。
上面的图展示了MINA的IO层同客户端的交互。IoAcceptor执行所有底层IO,将他们翻译成抽象的IO事件,并把翻译过的事件和关联的 IoSession 发送给IoHandler。
1、IoSession
通过IoSession,你可以写出message到远程实体,访问session的配置,并且更改session的属性。
2、IoHandler
通过IoHandler,你可以写出Iosession各个状态时执行的内容。
* sessionCreated:当一个IO连接建立时被调用,这个方法在任何IO操作之前被调用,以便socket参数或session属性能够最先被设置。
* sessionOpened: 在sessionCreated调用之后被调用。
* sessionClosed: 当IO连接被关闭时被调用。
* sessionIdle: 当在远程实体和用户程序之间没有数据传输的时候被调用。
* exceptionCaught: 当IoAcceptor 或者你的IoHandler.中出现异常时被调用。
* messageReceived: 当从远程实体读取数据时被调用。
* messageSent: 当向远程实体发送数据时被调用。
下面是一个简单的IoHandler实例:
import java.text.SimpleDateFormat;
import java.util.Calendar;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.util.SessionLog;
public class EchoProtocolHandler extends IoHandlerAdapter {
public void sessionOpened(IoSession session) throws Exception {
super.sessionOpened(session);
SessionLog.info(session, "session is operned");
}
public void sessionClosed(IoSession session) throws Exception {
super.sessionClosed(session);
SessionLog.info(session, "session is closed");
}
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
super.sessionIdle(session, status);
SessionLog.info(session, "session is Idle");
}
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
super.exceptionCaught(session, cause);
}
public void messageReceived(IoSession session, Object message) throws Exception {
super.messageReceived(session, message);
if (message instanceof ByteBuffer) {
// 接收数据
ByteBuffer buffer = (ByteBuffer) message;
byte[] fromClinet = new byte;
buffer.get(fromClinet);
String clientContext = new String(fromClinet);
System.out.println(clientContext);
// 返回收据
byte[] echoContext = ("已收到" + clientContext).getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(echoContext.length);
byteBuffer.put(echoContext);
byteBuffer.flip();
session.write(byteBuffer);
} else if (message instanceof String) {
String str = message.toString();
if (str.trim().equalsIgnoreCase("quit")) {
session.close();
return;
} else if (str.trim().equalsIgnoreCase("now")) {
Calendar cal = Calendar.getInstance();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String now = dateFormat.format(cal.getTime());
session.write("当前时间:" + now);
} else {
session.write("错误的指令");
}
}
}
public void messageSent(IoSession session, Object message) throws Exception {
super.messageSent(session, message);
SessionLog.info(session, message.toString());
}
}
3、IoFilters
IoFilter提供了更加有力的方式来扩展MINA。它拦截所有的IO事件进行事件的预处理和后处理。你可以把它想象成Servlet的 filters。IoFilter能够实现以下几种目的:
* 事件日志
* 性能检测
* 数据转换(e.g. SSL support)
* 防火墙…等等
4、协议层(解码器)
MINA提供了一个协议层来解决复杂的协议解析。协议层将ByteBuffer事件转换成高层的POJO事件。
要实现一个完整的解码器,需要实现以下接口
* ProtocolCodecFactory
* ProtocolEncoder encodes message objects into binary or protocol-specific data
* ProtocolDecoder decodes binary or protocol-specific data into message objects
MINA提供了一个文本格式的解码器。
最后我们将我们写好的解码器加入到IoFilters(ProtocolCodecFilter)中就可以了。
5、ByteBuffers
MINA没有直接使用使用java NIO的ByteBuffer类。它使用一个自制的ByteBuffer来扩展java NIO ByteBuffer的功能。
以下是它们的一些区别:
* MINA ByteBuffer是一个抽象类,用户可以自由的扩展它。
* MINA 管理 MINA ByteBuffers 并对其提供对象池。
* MINA ByteBuffer提供很多便利的方法,如:无符号数值的getter和基于String的getter和putter。
如果你使用 MINA,你将不需要直接使用NIO buffers,因为仅使用MINA buffers就可以完成大多数buffer操作。
MINA有一个全局的ByteBuffer池,它被在同一个虚拟机下的所有MINA应用共享。任何分配的buffers将在IO操作或者事件处理方法被执行之后被释放。所以你可以调用ByteBuffer.allocate()来从池中得到一个ByteBuffer而不需要将它返回到池中。请查阅 ByteBuffer JavaDocs获得更多信息。
6、线程模式
MINA有三种线程模式都是以NIO的方式实现的。
* Acceptor thread 服务端接收线程,然后交给I/O processor thread来执行数据的读写,默认是每个SocketAcceptor服务创建一个线程。
* Connector thread 客户端连接远程服务线程,连接成功后交给I/O processor thread来执行数据的读写,默认是每个SocketConnector创建一个线程。
* I/O processor thread 直到连接关闭前,实际执行数据读写操作的线程。
例如:
SocketAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());
SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());
IoAcceptor acceptor = new SocketAcceptor();
ExecutorService executor = Executors.newCachedThreadPool();
DefaultIoFilterChainBuilder filterChainBuilder = acceptor.getDefaultConfig().getFilterChain();
filterChainBuilder.addLast("threadPool", new ExecutorFilter(executor));
7、服务端实现类
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.filter.LoggingFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
public class EchoServer {
private static final int PORT = 9999;
public static void main(String[] args) throws IOException {
//The following two lines change the default buffer type to 'heap',
//which yields better performance.c
ByteBuffer.setUseDirectBuffers(false);
ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
IoAcceptor acceptor = new SocketAcceptor();
SocketAcceptorConfig cfg = new SocketAcceptorConfig();
cfg.getFilterChain().addLast("logger", new LoggingFilter());
cfg.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("GBK"))));
acceptor.bind(new InetSocketAddress(PORT), new EchoProtocolHandler(), cfg);
System.out.println("MINA Echo server started.");
}
}
8、客户端实现类
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.LoggingFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
public class EchoClient {
public static final String ADDRESS = "127.0.0.1:9999";
private IoSession session;
public static void main(String[] args) throws IOException {
EchoClient client = new EchoClient();
client.connect(ADDRESS);
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String in = reader.readLine();
if (in.trim().equalsIgnoreCase("quit")) {
break;
}
client.send(in);
}
client.session.close();
System.exit(0);
}
public boolean connect(String address) {
try {
if (session != null && session.isConnected()) {
throw new IllegalStateException("Already connected. Disconnect first.");
}
SocketConnector connector = new SocketConnector();
//connector.getFilterChain().addLast("logger", new LoggingFilter());
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("GBK"))));
connector.setWorkerTimeout(1);
SocketConnectorConfig config = new SocketConnectorConfig();
ConnectFuture future = connector.connect(parseSocketAddress(address), new EchoClientHandler(), config);
future.join();
if (!future.isConnected()) {
return false;
}
session = future.getSession();
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public void send(String data) {
if (session != null && !session.isClosing()) {
session.write(data);
} else {
throw new RuntimeException("session is closed or empty data");
}
}
private SocketAddress parseSocketAddress(String s) {
s = s.trim();
int colonIndex = s.indexOf(":");
if (colonIndex > 0) {
String host = s.substring(0, colonIndex);
int port = parsePort(s.substring(colonIndex + 1));
return new InetSocketAddress(host, port);
} else {
int port = parsePort(s.substring(colonIndex + 1));
return new InetSocketAddress(port);
}
}
private int parsePort(String s) {
try {
return Integer.parseInt(s);
} catch (NumberFormatException nfe) {
throw new IllegalArgumentException("Illegal port number: " + s);
}
}
}
客户端也需要一个Handler,所以写一个只打印平台端发过来的数据。
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
public class EchoClientHandler extends IoHandlerAdapter {
public void messageReceived(IoSession session, Object message) throws Exception {
System.out.println(message.toString());
}
}
四、与Spring的结合
Spring框架是目前使用非常广泛的一个开源框架,所以其他框架势在要与Spring进行一些整合,下面我们就介绍Apache Mina是如何与Spring结合的。
其实就是增加一个配置文件,注意这里不能使用延迟加载和“byName”的方式找bean
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"
default-lazy-init="false">
<!--
This makes it possible to specify java.net.SocketAddress values (e.g.
:80 below) as Strings. They will be converted into
java.net.InetSocketAddress objects by Spring.
-->
<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer">
<property name="customEditors">
<map>
<entry key="java.net.SocketAddress">
<bean class="org.apache.mina.integration.spring.InetSocketAddressEditor" />
</entry>
</map>
</property>
</bean>
<bean id="moAcceptor" class="org.apache.mina.integration.spring.IoAcceptorFactoryBean">
<property name="target">
<bean class="org.apache.mina.transport.socket.nio.SocketAcceptor" />
</property>
<property name="bindings">
<list>
<ref bean="echoBinding" />
</list>
</property>
</bean>
<bean id="echoBinding" class="org.apache.mina.integration.spring.Binding">
<property name="address" value=":9999" />
<property name="handler" ref="moMinaServiceHandler" />
<property name="serviceConfig" ref="serviceConfig" />
</bean>
<!-- The IoHandler implementation -->
<bean id="echoHandler" class="com.justinmobile.echo.mina.EchoProtocolHandler" />
<bean id="serviceConfig" class="org.apache.mina.transport.socket.nio.SocketAcceptorConfig">
<property name="filterChainBuilder" ref="filterChainBuilder" />
<property name="reuseAddress" value="true" />
<property name="threadModel" ref="threadModel" />
</bean>
<bean id="filterChainBuilder"
class="org.apache.mina.integration.spring.DefaultIoFilterChainBuilderFactoryBean">
<property name="filters">
<list>
<bean class="org.apache.mina.filter.LoggingFilter" />
<bean class="org.apache.mina.filter.codec.ProtocolCodecFilter">
<constructor-arg>
<bean class="org.apache.mina.filter.codec.textline.TextLineCodecFactory" />
</constructor-arg>
</bean>
</list>
</property>
</bean>
<!--
By default MINA uses an ExecutorThreadModel. This demonstrates how to
use your own with some non default settings. The threadModel will be
set on the SocketAcceptorConfig defined below. To configure a
ExecutorFilter directly you will have to use the ThreadModel.MANUAL
ThreadModel instead.
-->
<bean id="threadModel"
class="org.apache.mina.integration.spring.ExecutorThreadModelFactoryBean">
<property name="serviceName" value="TlvService" />
<property name="executor">
<bean
class="org.apache.mina.integration.spring.ThreadPoolExecutorFactoryBean">
<property name="corePoolSize" value="1" />
<property name="maxPoolSize" value="20" />
<property name="keepAliveSeconds" value="30" />
</bean>
</property>
</bean>
</beans>
页:
[1]