设为首页 收藏本站
查看: 861|回复: 0

[经验分享] Apache Mina使用手记(四)

[复制链接]
发表于 2015-11-14 08:58:05 | 显示全部楼层 |阅读模式
  上一篇中,我们介绍了如何在mina中编写自己的日志过滤器,这一篇我们自己实现一个编解器。
  实际应用当,很多应用系统应用的都不是标准的web service或XML等,比如象中国移动/联通/电信的短信网关程序,都有自己不同的协议实现,并且都是基于TCP/IP的字节流。Mina自带的编解码器实现了TextLineEncoder和TextLineDecoder,可以进行按行的字符串处理,对于象短信网关程序,就要自己实现编解码过滤器了。
  我们定义一个简单的基于TCP/IP字节流的协议,实现在客户端和服务端之间的数据包传输。数据包MyProtocalPack有消息头和消息体组成,消息头包括:length(消息包的总长度,数据类型int),flag(消息包标志位,数据类型byte),消息体content是一个字符串,实际实现的时候按byte流处理。源代码如下:
  package com.gftech.mytool.mina;import com.gftech.util.GFCommon;public class MyProtocalPack {private int length;private byte flag;private String content;public MyProtocalPack(){}public MyProtocalPack(byte flag,String content){this.flag=flag;this.content=content;int len1=content==null?0:content.getBytes().length;this.length=5+len1;}public MyProtocalPack(byte[] bs){if(bs!=null && bs.length>=5){length=GFCommon.bytes2int(GFCommon.bytesCopy(bs, 0, 4));flag=bs[4];content=new String(GFCommon.bytesCopy(bs, 5, length-5));}}public int getLength() {return length;}public void setLength(int length) {this.length = length;}public byte getFlag() {return flag;}public void setFlag(byte flag) {this.flag = flag;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public String toString(){StringBuffer sb=new StringBuffer();sb.append(" Len:").append(length);sb.append(" flag:").append(flag);sb.append(" content:").append(content);return sb.toString();}}
  
  回过头来,我们先看一下在MinaTimeServer中,如何使用一个文本的编解码过滤器,它是在过滤器链中添加了一个叫ProtocalCodecFilter的类,其中它调用了一个工厂方法TextLineCodecFactory的工厂类,创建具休的TextLineEncoder和TextLineDecoder编码和解码器。我们看一下具体的源代码:acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("GBK"))));
  package org.apache.mina.filter.codec.textline;import java.nio.charset.Charset;import org.apache.mina.core.buffer.BufferDataException;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFactory;import org.apache.mina.filter.codec.ProtocolDecoder;import org.apache.mina.filter.codec.ProtocolEncoder;/*** A {@link ProtocolCodecFactory} that performs encoding and decoding between* a text line data and a Java string object.  This codec is useful especially* when you work with a text-based protocols such as SMTP and IMAP.** @author The Apache MINA Project (dev@mina.apache.org)* @version $Rev$, $Date$*/public class TextLineCodecFactory implements ProtocolCodecFactory {private final TextLineEncoder encoder;private final TextLineDecoder decoder;/*** Creates a new instance with the current default {@link Charset}.*/public TextLineCodecFactory() {this(Charset.defaultCharset());}/*** Creates a new instance with the specified {@link Charset}.  The* encoder uses a UNIX {@link LineDelimiter} and the decoder uses* the AUTO {@link LineDelimiter}.** @param charset*  The charset to use in the encoding and decoding*/public TextLineCodecFactory(Charset charset) {encoder = new TextLineEncoder(charset, LineDelimiter.UNIX);decoder = new TextLineDecoder(charset, LineDelimiter.AUTO);}/*** Creates a new instance of TextLineCodecFactory.  This constructor* provides more flexibility for the developer.** @param charset*  The charset to use in the encoding and decoding* @param encodingDelimiter*  The line delimeter for the encoder* @param decodingDelimiter*  The line delimeter for the decoder*/public TextLineCodecFactory(Charset charset,String encodingDelimiter, String decodingDelimiter) {encoder = new TextLineEncoder(charset, encodingDelimiter);decoder = new TextLineDecoder(charset, decodingDelimiter);}/*** Creates a new instance of TextLineCodecFactory.  This constructor* provides more flexibility for the developer.** @param charset*  The charset to use in the encoding and decoding* @param encodingDelimiter*  The line delimeter for the encoder* @param decodingDelimiter*  The line delimeter for the decoder*/public TextLineCodecFactory(Charset charset,LineDelimiter encodingDelimiter, LineDelimiter decodingDelimiter) {encoder = new TextLineEncoder(charset, encodingDelimiter);decoder = new TextLineDecoder(charset, decodingDelimiter);}public ProtocolEncoder getEncoder(IoSession session) {return encoder;}public ProtocolDecoder getDecoder(IoSession session) {return decoder;}/*** Returns the allowed maximum size of the encoded line.* If the size of the encoded line exceeds this value, the encoder* will throw a {@link IllegalArgumentException}.  The default value* is {@link Integer#MAX_VALUE}.* <p>* This method does the same job with {@link TextLineEncoder#getMaxLineLength()}.*/public int getEncoderMaxLineLength() {return encoder.getMaxLineLength();}/*** Sets the allowed maximum size of the encoded line.* If the size of the encoded line exceeds this value, the encoder* will throw a {@link IllegalArgumentException}.  The default value* is {@link Integer#MAX_VALUE}.* <p>* This method does the same job with {@link TextLineEncoder#setMaxLineLength(int)}.*/public void setEncoderMaxLineLength(int maxLineLength) {encoder.setMaxLineLength(maxLineLength);}/*** Returns the allowed maximum size of the line to be decoded.* If the size of the line to be decoded exceeds this value, the* decoder will throw a {@link BufferDataException}.  The default* value is <tt>1024</tt> (1KB).* <p>* This method does the same job with {@link TextLineDecoder#getMaxLineLength()}.*/public int getDecoderMaxLineLength() {return decoder.getMaxLineLength();}/*** Sets the allowed maximum size of the line to be decoded.* If the size of the line to be decoded exceeds this value, the* decoder will throw a {@link BufferDataException}.  The default* value is <tt>1024</tt> (1KB).* <p>* This method does the same job with {@link TextLineDecoder#setMaxLineLength(int)}.*/public void setDecoderMaxLineLength(int maxLineLength) {decoder.setMaxLineLength(maxLineLength);}}
  TextLineFactory实现了ProtocalCodecFactory接口,该接口主要有一个编码的方法getEncoder()和一个解码的方法getDecoder():
  package org.apache.mina.filter.codec;import org.apache.mina.core.session.IoSession;/*** Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates* binary or protocol specific data into message object and vice versa.* <p>* Please refer to* <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html" mce_href="xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html"><code>ReverserProtocolProvider</code></a>* example.** @author The Apache MINA Project (dev@mina.apache.org)* @version $Rev$, $Date$*/public interface ProtocolCodecFactory {/*** Returns a new (or reusable) instance of {@link ProtocolEncoder} which* encodes message objects into binary or protocol-specific data.*/ProtocolEncoder getEncoder(IoSession session) throws Exception;/*** Returns a new (or reusable) instance of {@link ProtocolDecoder} which* decodes binary or protocol-specific data into message objects.*/ProtocolDecoder getDecoder(IoSession session) throws Exception;}
  我们主要是仿照TextLineEncoder实现其中的encode()方法,仿照TextLineDecoder实现其中的decode()即可,它们分别实现了ProtocalEncoder和ProtocalDecoder接口。我们要编写三个类分别是:MyProtocalCodecFactory,MyProtocalEncoder,MyProtocalDecoder对应TextLineCodecFactory,TextLineEncoder,TextLineDecoder。
  package com.gftech.mytool.mina;import java.nio.charset.Charset;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFactory;import org.apache.mina.filter.codec.ProtocolDecoder;import org.apache.mina.filter.codec.ProtocolEncoder;public class MyProtocalCodecFactory   implements ProtocolCodecFactory {private final MyProtocalEncoder encoder;private final MyProtocalDecoder decoder;public MyProtocalCodecFactory(Charset charset) {encoder=new MyProtocalEncoder(charset);decoder=new MyProtocalDecoder(charset);}public ProtocolEncoder getEncoder(IoSession session) {return encoder;}public ProtocolDecoder getDecoder(IoSession session) {return decoder;}}
  
  package com.gftech.mytool.mina;import java.nio.charset.Charset;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolEncoderAdapter;import org.apache.mina.filter.codec.ProtocolEncoderOutput;public class MyProtocalEncoder extends ProtocolEncoderAdapter {private final Charset charset;public MyProtocalEncoder(Charset charset) {this.charset = charset;}//在此处实现对MyProtocalPack包的编码工作,并把它写入输出流中public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {MyProtocalPack value = (MyProtocalPack) message;IoBuffer buf = IoBuffer.allocate(value.getLength());buf.setAutoExpand(true);buf.putInt(value.getLength());buf.put(value.getFlag());if (value.getContent() != null)buf.put(value.getContent().getBytes());buf.flip();out.write(buf);}public void dispose() throws Exception {}}
  
  package com.gftech.mytool.mina;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.AttributeKey;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolDecoder;import org.apache.mina.filter.codec.ProtocolDecoderOutput;public class MyProtocalDecoder implements ProtocolDecoder {private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");private final Charset charset;private int maxPackLength = 100;public MyProtocalDecoder() {this(Charset.defaultCharset());}public MyProtocalDecoder(Charset charset) {this.charset = charset;}public int getMaxLineLength() {return maxPackLength;}public void setMaxLineLength(int maxLineLength) {if (maxLineLength <= 0) {throw new IllegalArgumentException("maxLineLength: " + maxLineLength);}this.maxPackLength = maxLineLength;}private Context getContext(IoSession session) {Context ctx;ctx = (Context) session.getAttribute(CONTEXT);if (ctx == null) {ctx = new Context();session.setAttribute(CONTEXT, ctx); } return ctx;}public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {final int packHeadLength = 5;//先获取上次的处理上下文,其中可能有未处理完的数据Context ctx = getContext(session);// 先把当前buffer中的数据追加到Context的buffer当中 ctx.append(in); //把position指向0位置,把limit指向原来的position位置IoBuffer buf = ctx.getBuffer();buf.flip(); // 然后按数据包的协议进行读取while (buf.remaining() >= packHeadLength) {buf.mark();// 读取消息头部分int length = buf.getInt();byte flag = buf.get();//检查读取的包头是否正常,不正常的话清空bufferif (length<0 ||length > maxPackLength) {buf.clear(); break;} //读取正常的消息包,并写入输出流中,以便IoHandler进行处理else if (length >= packHeadLength && length - packHeadLength <= buf.remaining()) {int oldLimit2 = buf.limit();buf.limit(buf.position() + length - packHeadLength);String content = buf.getString(ctx.getDecoder());buf.limit(oldLimit2);MyProtocalPack pack = new MyProtocalPack(flag, content);out.write(pack);} else {// 如果消息包不完整// 将指针重新移动消息头的起始位置 buf.reset(); break;}}if (buf.hasRemaining()) {// 将数据移到buffer的最前面 IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);temp.put(buf);temp.flip();buf.clear();buf.put(temp);} else {// 如果数据已经处理完毕,进行清空buf.clear(); }}public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {}public void dispose(IoSession session) throws Exception { Context ctx = (Context) session.getAttribute(CONTEXT);if (ctx != null) {session.removeAttribute(CONTEXT);}}//记录上下文,因为数据触发没有规模,很可能只收到数据包的一半//所以,需要上下文拼起来才能完整的处理private class Context {private final CharsetDecoder decoder;private IoBuffer buf;private int matchCount = 0;private int overflowPosition = 0;private Context() {decoder = charset.newDecoder();buf = IoBuffer.allocate(80).setAutoExpand(true);}public CharsetDecoder getDecoder() {return decoder;}public IoBuffer getBuffer() {return buf;}public int getOverflowPosition() {return overflowPosition;}public int getMatchCount() {return matchCount;}public void setMatchCount(int matchCount) {this.matchCount = matchCount;}public void reset() {overflowPosition = 0;matchCount = 0;decoder.reset();}public void append(IoBuffer in) { getBuffer().put(in);}}}
  
  在MyProtocalServer中,添加自己实现的Log4jFilter和编解码过滤器:
  package com.gftech.mytool.mina;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.charset.Charset;import org.apache.log4j.Logger;import org.apache.log4j.PropertyConfigurator;import org.apache.mina.core.service.IoAcceptor;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.transport.socket.nio.NioSocketAcceptor;public class MyProtocalServer {private static final int PORT = 2500;static Logger logger = Logger.getLogger(MyProtocalServer.class);public static void main(String[] args) throws IOException {PropertyConfigurator.configure("conf//log4j.properties");IoAcceptor acceptor = new NioSocketAcceptor();Log4jFilter lf = new Log4jFilter(logger);acceptor.getFilterChain().addLast("logger", lf);acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));acceptor.getSessionConfig().setReadBufferSize(1024);acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);acceptor.setHandler(new MyHandler());acceptor.bind(new InetSocketAddress(PORT));System.out.println("start server ...");}}class MyHandler extends IoHandlerAdapter {static Logger logger = Logger.getLogger(MyHandler.class);@Overridepublic void exceptionCaught(IoSession session, Throwable cause) throws Exception {cause.printStackTrace();}@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {MyProtocalPack pack=(MyProtocalPack)message;logger.debug("Rec:" + pack);}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws Exception {logger.debug("IDLE " + session.getIdleCount(status));}}
  
  编写一个客户端程序进行测试:
  package com.gftech.mytool.mina;import java.io.DataOutputStream;import java.net.Socket;public class MyProtocalClient {public static void main(String[] args) {try {Socket socket = new Socket("127.0.0.1", 2500);DataOutputStream out =  new DataOutputStream( socket.getOutputStream() ) ;for (int i = 0; i < 1000; i++) {MyProtocalPack pack=new MyProtocalPack((byte)i,i+"测试MyProtocalaaaaaaaaaaaaaa");out.writeInt(pack.getLength());out.write(pack.getFlag());out.write(pack.getContent().getBytes());out.flush();System.out.println(i + " sended");}Thread.sleep(1000 );} catch (Exception e) {e.printStackTrace();}}}
  
  也可以用IoConnector实现自己的客户端:
  package com.gftech.mytool.mina;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.charset.Charset;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.future.IoFutureListener;import org.apache.mina.core.service.IoConnector;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.transport.socket.nio.NioSocketConnector;public class MyProtocalClient2 {private static final String HOST = "192.168.10.8";private static final int PORT = 2500;static long counter = 0;final static int FC1 = 100;static long start = 0;/*** 使用Mina的框架结构进行测试* * @param args*/public static void main(String[] args) throws IOException {start = System.currentTimeMillis();IoConnector connector = new NioSocketConnector();connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));connector.setHandler(new TimeClientHandler2());connector.getSessionConfig().setReadBufferSize(100);connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); ConnectFuture connFuture = connector.connect(new InetSocketAddress(HOST, PORT));connFuture.addListener(new IoFutureListener<ConnectFuture>() {public void operationComplete(ConnectFuture future) {try {if (future.isConnected()) {IoSession session = future.getSession(); sendData(session);  } else {System.out.println("连接不存在 ");}} catch (Exception e) {e.printStackTrace();}}});System.out.println("start client ...");}public static void sendData(IoSession session) throws IOException {for (int i = 0; i < FC1; i++) {String content = "afdjkdafk张新波测试" + i;MyProtocalPack pack = new MyProtocalPack((byte) i, content);session.write(pack);System.out.println("send data:" + pack);}}}class TimeClientHandler2 extends IoHandlerAdapter {@Overridepublic void sessionOpened(IoSession session) {// Set reader idle time to 10 seconds.// sessionIdle(...) method will be invoked when no data is read// for 10 seconds.session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 60);}@Overridepublic void sessionClosed(IoSession session) {// Print out total number of bytes read from the remote peer.System.err.println("Total " + session.getReadBytes() + " byte(s)");}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) {// Close the connection if reader is idle.if (status == IdleStatus.READER_IDLE) {session.close(true);}}@Overridepublic void messageReceived(IoSession session, Object message) {MyProtocalPack pack = (MyProtocalPack) message;System.out.println("rec:" + pack);}}
             版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-139004-1-1.html 上篇帖子: Windows下Apache+php+MySql配置和wordpress的安装 下篇帖子: Ubuntu Apache 基于域名的多站点设置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表