Apache Mina使用手记(四)
上一篇中,我们介绍了如何在mina中编写自己的日志过滤器,这一篇我们自己实现一个编解器。实际应用当,很多应用系统应用的都不是标准的web service或XML等,比如象中国移动/联通/电信的短信网关程序,都有自己不同的协议实现,并且都是基于TCP/IP的字节流。Mina自带的编解码器实现了TextLineEncoder和TextLineDecoder,可以进行按行的字符串处理,对于象短信网关程序,就要自己实现编解码过滤器了。
我们定义一个简单的基于TCP/IP字节流的协议,实现在客户端和服务端之间的数据包传输。数据包MyProtocalPack有消息头和消息体组成,消息头包括:length(消息包的总长度,数据类型int),flag(消息包标志位,数据类型byte),消息体content是一个字符串,实际实现的时候按byte流处理。源代码如下:
package com.gftech.mytool.mina;import com.gftech.util.GFCommon;public class MyProtocalPack {private int length;private byte flag;private String content;public MyProtocalPack(){}public MyProtocalPack(byte flag,String content){this.flag=flag;this.content=content;int len1=content==null?0:content.getBytes().length;this.length=5+len1;}public MyProtocalPack(byte[] bs){if(bs!=null && bs.length>=5){length=GFCommon.bytes2int(GFCommon.bytesCopy(bs, 0, 4));flag=bs;content=new String(GFCommon.bytesCopy(bs, 5, length-5));}}public int getLength() {return length;}public void setLength(int length) {this.length = length;}public byte getFlag() {return flag;}public void setFlag(byte flag) {this.flag = flag;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public String toString(){StringBuffer sb=new StringBuffer();sb.append(" Len:").append(length);sb.append(" flag:").append(flag);sb.append(" content:").append(content);return sb.toString();}}
回过头来,我们先看一下在MinaTimeServer中,如何使用一个文本的编解码过滤器,它是在过滤器链中添加了一个叫ProtocalCodecFilter的类,其中它调用了一个工厂方法TextLineCodecFactory的工厂类,创建具休的TextLineEncoder和TextLineDecoder编码和解码器。我们看一下具体的源代码:acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("GBK"))));
package org.apache.mina.filter.codec.textline;import java.nio.charset.Charset;import org.apache.mina.core.buffer.BufferDataException;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFactory;import org.apache.mina.filter.codec.ProtocolDecoder;import org.apache.mina.filter.codec.ProtocolEncoder;/*** A {@link ProtocolCodecFactory} that performs encoding and decoding between* a text line data and a Java string object.This codec is useful especially* when you work with a text-based protocols such as SMTP and IMAP.** @author The Apache MINA Project (dev@mina.apache.org)* @version $Rev$, $Date$*/public class TextLineCodecFactory implements ProtocolCodecFactory {private final TextLineEncoder encoder;private final TextLineDecoder decoder;/*** Creates a new instance with the current default {@link Charset}.*/public TextLineCodecFactory() {this(Charset.defaultCharset());}/*** Creates a new instance with the specified {@link Charset}.The* encoder uses a UNIX {@link LineDelimiter} and the decoder uses* the AUTO {@link LineDelimiter}.** @param charset*The charset to use in the encoding and decoding*/public TextLineCodecFactory(Charset charset) {encoder = new TextLineEncoder(charset, LineDelimiter.UNIX);decoder = new TextLineDecoder(charset, LineDelimiter.AUTO);}/*** Creates a new instance of TextLineCodecFactory.This constructor* provides more flexibility for the developer.** @param charset*The charset to use in the encoding and decoding* @param encodingDelimiter*The line delimeter for the encoder* @param decodingDelimiter*The line delimeter for the decoder*/public TextLineCodecFactory(Charset charset,String encodingDelimiter, String decodingDelimiter) {encoder = new TextLineEncoder(charset, encodingDelimiter);decoder = new TextLineDecoder(charset, decodingDelimiter);}/*** Creates a new instance of TextLineCodecFactory.This constructor* provides more flexibility for the developer.** @param charset*The charset to use in the encoding and decoding* @param encodingDelimiter*The line delimeter for the encoder* @param decodingDelimiter*The line delimeter for the decoder*/public TextLineCodecFactory(Charset charset,LineDelimiter encodingDelimiter, LineDelimiter decodingDelimiter) {encoder = new TextLineEncoder(charset, encodingDelimiter);decoder = new TextLineDecoder(charset, decodingDelimiter);}public ProtocolEncoder getEncoder(IoSession session) {return encoder;}public ProtocolDecoder getDecoder(IoSession session) {return decoder;}/*** Returns the allowed maximum size of the encoded line.* If the size of the encoded line exceeds this value, the encoder* will throw a {@link IllegalArgumentException}.The default value* is {@link Integer#MAX_VALUE}.* <p>* This method does the same job with {@link TextLineEncoder#getMaxLineLength()}.*/public int getEncoderMaxLineLength() {return encoder.getMaxLineLength();}/*** Sets the allowed maximum size of the encoded line.* If the size of the encoded line exceeds this value, the encoder* will throw a {@link IllegalArgumentException}.The default value* is {@link Integer#MAX_VALUE}.* <p>* This method does the same job with {@link TextLineEncoder#setMaxLineLength(int)}.*/public void setEncoderMaxLineLength(int maxLineLength) {encoder.setMaxLineLength(maxLineLength);}/*** Returns the allowed maximum size of the line to be decoded.* If the size of the line to be decoded exceeds this value, the* decoder will throw a {@link BufferDataException}.The default* value is <tt>1024</tt> (1KB).* <p>* This method does the same job with {@link TextLineDecoder#getMaxLineLength()}.*/public int getDecoderMaxLineLength() {return decoder.getMaxLineLength();}/*** Sets the allowed maximum size of the line to be decoded.* If the size of the line to be decoded exceeds this value, the* decoder will throw a {@link BufferDataException}.The default* value is <tt>1024</tt> (1KB).* <p>* This method does the same job with {@link TextLineDecoder#setMaxLineLength(int)}.*/public void setDecoderMaxLineLength(int maxLineLength) {decoder.setMaxLineLength(maxLineLength);}}
TextLineFactory实现了ProtocalCodecFactory接口,该接口主要有一个编码的方法getEncoder()和一个解码的方法getDecoder():
package org.apache.mina.filter.codec;import org.apache.mina.core.session.IoSession;/*** Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates* binary or protocol specific data into message object and vice versa.* <p>* Please refer to* <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html" mce_href="xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html"><code>ReverserProtocolProvider</code></a>* example.** @author The Apache MINA Project (dev@mina.apache.org)* @version $Rev$, $Date$*/public interface ProtocolCodecFactory {/*** Returns a new (or reusable) instance of {@link ProtocolEncoder} which* encodes message objects into binary or protocol-specific data.*/ProtocolEncoder getEncoder(IoSession session) throws Exception;/*** Returns a new (or reusable) instance of {@link ProtocolDecoder} which* decodes binary or protocol-specific data into message objects.*/ProtocolDecoder getDecoder(IoSession session) throws Exception;}
我们主要是仿照TextLineEncoder实现其中的encode()方法,仿照TextLineDecoder实现其中的decode()即可,它们分别实现了ProtocalEncoder和ProtocalDecoder接口。我们要编写三个类分别是:MyProtocalCodecFactory,MyProtocalEncoder,MyProtocalDecoder对应TextLineCodecFactory,TextLineEncoder,TextLineDecoder。
package com.gftech.mytool.mina;import java.nio.charset.Charset;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFactory;import org.apache.mina.filter.codec.ProtocolDecoder;import org.apache.mina.filter.codec.ProtocolEncoder;public class MyProtocalCodecFactory implements ProtocolCodecFactory {private final MyProtocalEncoder encoder;private final MyProtocalDecoder decoder;public MyProtocalCodecFactory(Charset charset) {encoder=new MyProtocalEncoder(charset);decoder=new MyProtocalDecoder(charset);}public ProtocolEncoder getEncoder(IoSession session) {return encoder;}public ProtocolDecoder getDecoder(IoSession session) {return decoder;}}
package com.gftech.mytool.mina;import java.nio.charset.Charset;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolEncoderAdapter;import org.apache.mina.filter.codec.ProtocolEncoderOutput;public class MyProtocalEncoder extends ProtocolEncoderAdapter {private final Charset charset;public MyProtocalEncoder(Charset charset) {this.charset = charset;}//在此处实现对MyProtocalPack包的编码工作,并把它写入输出流中public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {MyProtocalPack value = (MyProtocalPack) message;IoBuffer buf = IoBuffer.allocate(value.getLength());buf.setAutoExpand(true);buf.putInt(value.getLength());buf.put(value.getFlag());if (value.getContent() != null)buf.put(value.getContent().getBytes());buf.flip();out.write(buf);}public void dispose() throws Exception {}}
package com.gftech.mytool.mina;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.AttributeKey;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolDecoder;import org.apache.mina.filter.codec.ProtocolDecoderOutput;public class MyProtocalDecoder implements ProtocolDecoder {private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");private final Charset charset;private int maxPackLength = 100;public MyProtocalDecoder() {this(Charset.defaultCharset());}public MyProtocalDecoder(Charset charset) {this.charset = charset;}public int getMaxLineLength() {return maxPackLength;}public void setMaxLineLength(int maxLineLength) {if (maxLineLength <= 0) {throw new IllegalArgumentException("maxLineLength: " + maxLineLength);}this.maxPackLength = maxLineLength;}private Context getContext(IoSession session) {Context ctx;ctx = (Context) session.getAttribute(CONTEXT);if (ctx == null) {ctx = new Context();session.setAttribute(CONTEXT, ctx); } return ctx;}public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {final int packHeadLength = 5;//先获取上次的处理上下文,其中可能有未处理完的数据Context ctx = getContext(session);// 先把当前buffer中的数据追加到Context的buffer当中 ctx.append(in); //把position指向0位置,把limit指向原来的position位置IoBuffer buf = ctx.getBuffer();buf.flip(); // 然后按数据包的协议进行读取while (buf.remaining() >= packHeadLength) {buf.mark();// 读取消息头部分int length = buf.getInt();byte flag = buf.get();//检查读取的包头是否正常,不正常的话清空bufferif (length<0 ||length > maxPackLength) {buf.clear(); break;} //读取正常的消息包,并写入输出流中,以便IoHandler进行处理else if (length >= packHeadLength && length - packHeadLength <= buf.remaining()) {int oldLimit2 = buf.limit();buf.limit(buf.position() + length - packHeadLength);String content = buf.getString(ctx.getDecoder());buf.limit(oldLimit2);MyProtocalPack pack = new MyProtocalPack(flag, content);out.write(pack);} else {// 如果消息包不完整// 将指针重新移动消息头的起始位置 buf.reset(); break;}}if (buf.hasRemaining()) {// 将数据移到buffer的最前面 IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);temp.put(buf);temp.flip();buf.clear();buf.put(temp);} else {// 如果数据已经处理完毕,进行清空buf.clear(); }}public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {}public void dispose(IoSession session) throws Exception { Context ctx = (Context) session.getAttribute(CONTEXT);if (ctx != null) {session.removeAttribute(CONTEXT);}}//记录上下文,因为数据触发没有规模,很可能只收到数据包的一半//所以,需要上下文拼起来才能完整的处理private class Context {private final CharsetDecoder decoder;private IoBuffer buf;private int matchCount = 0;private int overflowPosition = 0;private Context() {decoder = charset.newDecoder();buf = IoBuffer.allocate(80).setAutoExpand(true);}public CharsetDecoder getDecoder() {return decoder;}public IoBuffer getBuffer() {return buf;}public int getOverflowPosition() {return overflowPosition;}public int getMatchCount() {return matchCount;}public void setMatchCount(int matchCount) {this.matchCount = matchCount;}public void reset() {overflowPosition = 0;matchCount = 0;decoder.reset();}public void append(IoBuffer in) { getBuffer().put(in);}}}
在MyProtocalServer中,添加自己实现的Log4jFilter和编解码过滤器:
package com.gftech.mytool.mina;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.charset.Charset;import org.apache.log4j.Logger;import org.apache.log4j.PropertyConfigurator;import org.apache.mina.core.service.IoAcceptor;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.transport.socket.nio.NioSocketAcceptor;public class MyProtocalServer {private static final int PORT = 2500;static Logger logger = Logger.getLogger(MyProtocalServer.class);public static void main(String[] args) throws IOException {PropertyConfigurator.configure("conf//log4j.properties");IoAcceptor acceptor = new NioSocketAcceptor();Log4jFilter lf = new Log4jFilter(logger);acceptor.getFilterChain().addLast("logger", lf);acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));acceptor.getSessionConfig().setReadBufferSize(1024);acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);acceptor.setHandler(new MyHandler());acceptor.bind(new InetSocketAddress(PORT));System.out.println("start server ...");}}class MyHandler extends IoHandlerAdapter {static Logger logger = Logger.getLogger(MyHandler.class);@Overridepublic void exceptionCaught(IoSession session, Throwable cause) throws Exception {cause.printStackTrace();}@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {MyProtocalPack pack=(MyProtocalPack)message;logger.debug("Rec:" + pack);}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws Exception {logger.debug("IDLE " + session.getIdleCount(status));}}
编写一个客户端程序进行测试:
package com.gftech.mytool.mina;import java.io.DataOutputStream;import java.net.Socket;public class MyProtocalClient {public static void main(String[] args) {try {Socket socket = new Socket("127.0.0.1", 2500);DataOutputStream out =new DataOutputStream( socket.getOutputStream() ) ;for (int i = 0; i < 1000; i++) {MyProtocalPack pack=new MyProtocalPack((byte)i,i+"测试MyProtocalaaaaaaaaaaaaaa");out.writeInt(pack.getLength());out.write(pack.getFlag());out.write(pack.getContent().getBytes());out.flush();System.out.println(i + " sended");}Thread.sleep(1000 );} catch (Exception e) {e.printStackTrace();}}}
也可以用IoConnector实现自己的客户端:
package com.gftech.mytool.mina;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.charset.Charset;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.future.IoFutureListener;import org.apache.mina.core.service.IoConnector;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.transport.socket.nio.NioSocketConnector;public class MyProtocalClient2 {private static final String HOST = "192.168.10.8";private static final int PORT = 2500;static long counter = 0;final static int FC1 = 100;static long start = 0;/*** 使用Mina的框架结构进行测试* * @param args*/public static void main(String[] args) throws IOException {start = System.currentTimeMillis();IoConnector connector = new NioSocketConnector();connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));connector.setHandler(new TimeClientHandler2());connector.getSessionConfig().setReadBufferSize(100);connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); ConnectFuture connFuture = connector.connect(new InetSocketAddress(HOST, PORT));connFuture.addListener(new IoFutureListener<ConnectFuture>() {public void operationComplete(ConnectFuture future) {try {if (future.isConnected()) {IoSession session = future.getSession(); sendData(session);} else {System.out.println("连接不存在 ");}} catch (Exception e) {e.printStackTrace();}}});System.out.println("start client ...");}public static void sendData(IoSession session) throws IOException {for (int i = 0; i < FC1; i++) {String content = "afdjkdafk张新波测试" + i;MyProtocalPack pack = new MyProtocalPack((byte) i, content);session.write(pack);System.out.println("send data:" + pack);}}}class TimeClientHandler2 extends IoHandlerAdapter {@Overridepublic void sessionOpened(IoSession session) {// Set reader idle time to 10 seconds.// sessionIdle(...) method will be invoked when no data is read// for 10 seconds.session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 60);}@Overridepublic void sessionClosed(IoSession session) {// Print out total number of bytes read from the remote peer.System.err.println("Total " + session.getReadBytes() + " byte(s)");}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) {// Close the connection if reader is idle.if (status == IdleStatus.READER_IDLE) {session.close(true);}}@Overridepublic void messageReceived(IoSession session, Object message) {MyProtocalPack pack = (MyProtocalPack) message;System.out.println("rec:" + pack);}}
版权声明:本文为博主原创文章,未经博主允许不得转载。
页:
[1]