设为首页 收藏本站
查看: 729|回复: 0

[经验分享] Apache Mina 文档翻译

[复制链接]

尚未签到

发表于 2017-1-11 10:43:35 | 显示全部楼层 |阅读模式
第九章 - Codec过滤器
  本章中会说明为什么要使用ProtocolCodecFilter和如何使用ProtocolCodecFilter。



为什么使用ProtocolCodecFilter?
  TCP可以保证数据包按照正确的顺序投递。但是不能保证在发送端的写操作对应在接收端一定有一次读操作。具体参照:http://en.wikipedia.org/wiki/IPv4#Fragmentation_and_reassembly 和 http://en.wikipedia.org/wiki/Nagle%27s_algorithm。 在MINA里如果没有ProtocolCodecFilter, 发送端的一次IoSession.write(Object message) 调用可能在接收端产生多次messageReceived(IoSession session, Object message)事件。同样多次IoSession.write(Object message)可能产生一次messageReceived事件。当你的应用程序运行在同一个主机(或本地网络)上时没有遇到上面的情况,但你的应用程序必须要对应这个问题。
    很多网络应用程序需要一个方法来找到当前的消息是在哪里结束,下一个消息是在哪里开始。
    你可以把消息编码解码的逻辑写到IoHandler里,但是通过ProtocolCodecFilter的方式会使你的代码更简介和易于维护。   



如何使用ProtocolCodecFilter?
  你的应用程序实际上从接收到的是一段字节,你必须把这些字节转换成消息对象(高级对象)

有3种常用的方法来把字节流分解成消息对象:
    使用定长消息
    一个定长的消息头里存放后面的消息体的长度
    使用定界符,例如在很多基于文本的协议中在每条消息的结尾添加换行符(或CR LF对)。

在下面的例子中我们先使用第一和第二中方法,因为这两种方法最简单。然后我们再看看如何使用定界符。

例子

我们会开发一个(实际上没有什么用的)图形化的文字生成服务器来演示如何实现自己的协议编码(ProtocolEncoder, ProtocolDecoder和ProtocolCodecFactory)。协议本身很简单,请求消息的布局如下:

4字节4字节 4字节
宽度高度字符数
  
    宽度: 要求的画像宽度(网络字节序的整型数值)
    高度: 要求的画像高度 (网络字节序的整型数值)
    字符数: 生成的字符数 (网络字节序的整型数值)

服务器根据请求的长宽生成图片并且指定个数的字符画在上面。response消息布局:

4字节变长消息体4字节变长消息体
image2length2image1length1
  
下面是我们在编码解码请求和响应所需要的类:

    ImageRequest: 一个简单的POJO表示发送到ImageServer的请求.
    ImageRequestEncoder: 把ImageRequest对象转换传输用的数据(客户端使用)
    ImageRequestDecoder: 把传输用的数据转换为ImageRequest对象(服务器使用)
    ImageResponse: 一个简单的POJO表示服务器发送的响应.
    ImageResponseEncoder: 服务器把ImageResponse转换为传输用的数据。
    ImageResponseDecoder: 客户端把传输用的数据转换为ImageResponse对象。
    ImageCodecFactory: 这个类用来创建编码器和解码器的实例

ImageRequest类:



public class ImageRequest {
private int width;
private int height;
private int numberOfCharacters;
public ImageRequest(int width, int height, int numberOfCharacters) {
this.width = width;
this.height = height;
this.numberOfCharacters = numberOfCharacters;
}
public int getWidth() {
return width;
}
public int getHeight() {
return height;
}
public int getNumberOfCharacters() {
return numberOfCharacters;
}
}
 

编码一般比解码简单,我们从ImageRequestEncoder开始:

public class ImageRequestEncoder implements ProtocolEncoder {
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
ImageRequest request = (ImageRequest) message;
IoBuffer buffer = IoBuffer.allocate(12, false);
buffer.putInt(request.getWidth());
buffer.putInt(request.getHeight());
buffer.putInt(request.getNumberOfCharacters());
buffer.flip();
out.write(buffer);
}
public void dispose(IoSession session) throws Exception {
// nothing to dispose
}
}
 

说明:
    MINA会为IoSession的写操作队列里面的每一个消息调用encode方法。因为客户端只会写入ImageRequest对象,所以我们可以放心的把消息转会为ImageRequest对象。
    我们在Heap上分配了一个IoBuffer,最好避免使用直接Buffer,因为一般在Heap的Buffer的性能更好。参见http://issues.apache.org/jira/browse/DIRMINA-289
    你不需要释放Buffer,MINA会为你管理Buffer。参见http://mina.apache.org/mina-project/apidocs/org/apache/mina/core/buffer/IoBuffer.html
   在dispose方法里你应该释放所有为了编码而获取的资源。如果你没有什么要释放的,就让encoder实例直接继承至ProtocolEncoderAdapter。

现在我们看一下解码器。在实现自己的解码器时CumulativeProtocolDecoder很有用:它会为你缓冲进来的数据直到满足你的要求。在这个例子里消息是固定长度的,所以等到所有的数据都可用了再处理是非常容易的:

public class ImageRequestDecoder extends CumulativeProtocolDecoder {
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
if (in.remaining() >= 12) {
int width = in.getInt();
int height = in.getInt();
int numberOfCharachters = in.getInt();
ImageRequest request = new ImageRequest(width, height, numberOfCharachters);
out.write(request);
return true;
} else {
return false;
}
}
}
 

说明:
    每次一个完整的消息解码完毕,你应该把它写到ProtocolDecoderOutput,这些消息会通过过滤器链最后到达IoHandler.messageReceived方法。
    你不用释放IoBuffer
    当数据还不够转换为消息,只要返回false既可。

response对象就是一个简单的POJO

public class ImageResponse {
private BufferedImage image1;
private BufferedImage image2;
public ImageResponse(BufferedImage image1, BufferedImage image2) {
this.image1 = image1;
this.image2 = image2;
}
public BufferedImage getImage1() {
return image1;
}
public BufferedImage getImage2() {
return image2;
}
}
 

编码response对象也是非常简单的:

public class ImageResponseEncoder extends ProtocolEncoderAdapter {
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
ImageResponse imageResponse = (ImageResponse) message;
byte[] bytes1 = getBytes(imageResponse.getImage1());
byte[] bytes2 = getBytes(imageResponse.getImage2());
int capacity = bytes1.length + bytes2.length + 8;
IoBuffer buffer = IoBuffer.allocate(capacity, false);
buffer.setAutoExpand(true);
buffer.putInt(bytes1.length);
buffer.put(bytes1);
buffer.putInt(bytes2.length);
buffer.put(bytes2);
buffer.flip();
out.write(buffer);
}
private byte[] getBytes(BufferedImage image) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ImageIO.write(image, "PNG", baos);
return baos.toByteArray();
}
}

 
说明:
    当不能事先判断IoBuffer的长度时,你可以通过调用buffer.setAutoExpand(true)来
使用自动扩展的Buffer。

接下来看看如何解码response:

public class ImageResponseDecoder extends CumulativeProtocolDecoder {
private static final String DECODER_STATE_KEY = ImageResponseDecoder.class.getName() + ".STATE";
public static final int MAX_IMAGE_SIZE = 5 * 1024 * 1024;
private static class DecoderState {
BufferedImage image1;
}
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
DecoderState decoderState = (DecoderState) session.getAttribute(DECODER_STATE_KEY);
if (decoderState == null) {
decoderState = new DecoderState();
session.setAttribute(DECODER_STATE_KEY, decoderState);
}
if (decoderState.image1 == null) {
// 尝试读取第一个图片
if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
decoderState.image1 = readImage(in);
} else {
// 没有足够的数据来读取第一个图片
return false;
}
}
if (decoderState.image1 != null) {
// 尝试读取第二张图片
if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
BufferedImage image2 = readImage(in);
ImageResponse imageResponse = new ImageResponse(decoderState.image1, image2);
out.write(imageResponse);
decoderState.image1 = null;
return true;
} else {
// 没有足够的数据来读取第二张图片
return false;
}
}
return false;
}
private BufferedImage readImage(IoBuffer in) throws IOException {
int length = in.getInt();
byte[] bytes = new byte[length];
in.get(bytes);
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
return ImageIO.read(bais);
}
}
 

Remarks:

    我们把解码处理的状态保存到会话的属性里。你也可以把这个状态保存到Decoder对象里,但是这样做的话有一些问题:
        每一个IoSession都需要一个Decoder的实例。
        MINA可以确保对于同一个会话只有一个线程在执行decode方法,但是不能保证每次调用decode方法的都是同一个线程。例如当第一个线程执行decode方法时数据还没有完全可用,当数据都到齐了decode方法被第二个线程调用。为了避免可见性问题,我们必须同步的访问decoder的状态(IoSession的属性是保存在ConcurrentHashMap里所以他对于其他线程都是自动可见的)
        在邮件组中的一个讨论得到了这样一个结论:选择把状态放到IoSession里还是Decoder实例里更多得是个人喜好问题。要保证同时没有两个线程运行decode方法,MINA需要加入一些同步处理, 这些同步处理本身可以确保不会出现线程可见性问题。参见:http://www.nabble.com/Tutorial-on-ProtocolCodecFilter,-state-and-threads-t3965413.html

    当使用length-prefix协议时,IoBuffer.prefixedDataAvailable()方法非常有用。它支持1,2和4个字节的消息头。
    当解码完毕后,不要忘记重置解码器的状态(从会话属性里把它删除)

如果response里只有一个图片时,我们就不需要保存解码的状态了:

protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
if (in.prefixedDataAvailable(4)) {
int length = in.getInt();
byte[] bytes = new byte[length];
in.get(bytes);
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
BufferedImage image = ImageIO.read(bais);
out.write(image);
return true;
} else {
return false;
}
}

 
现在我们把他们放在一起:

public class ImageCodecFactory implements ProtocolCodecFactory {
private ProtocolEncoder encoder;
private ProtocolDecoder decoder;
public ImageCodecFactory(boolean client) {
if (client) {
encoder = new ImageRequestEncoder();
decoder = new ImageResponseDecoder();
} else {
encoder = new ImageResponseEncoder();
decoder = new ImageRequestDecoder();
}
}
public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {
return encoder;
}
public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {
return decoder;
}
}
 

说明:
    对于每一个新的会话,MINA会向ImageCodecFactory要求一个encoder和一个decoder。
    因为encode可以decoder里没有保存状态,对于所有会话返回共享的实例是安全的。

下面是在服务器里怎么使用ProtocolCodecFactory:

public class ImageServer {
public static final int PORT = 33789;
public static void main(String[] args) throws IOException {
ImageServerIoHandler handler = new ImageServerIoHandler();
NioSocketAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new ImageCodecFactory(false)));
acceptor.setLocalAddress(new InetSocketAddress(PORT));
acceptor.setHandler(handler);
acceptor.bind();
System.out.println("server is listenig at port " + PORT);
}
}

 
在客户端用法是相同的:

public class ImageClient extends IoHandlerAdapter {
public static final int CONNECT_TIMEOUT = 3000;
private String host;
private int port;
private SocketConnector connector;
private IoSession session;
private ImageListener imageListener;
public ImageClient(String host, int port, ImageListener imageListener) {
this.host = host;
this.port = port;
this.imageListener = imageListener;
connector = new NioSocketConnector();
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ImageCodecFactory(true)));
connector.setHandler(this);
}
public void messageReceived(IoSession session, Object message) throws Exception {
ImageResponse response = (ImageResponse) message;
imageListener.onImages(response.getImage1(), response.getImage2());
}
...
 

最后作为一个完整的例子,我们把IoHandler里的代码也加上。

public class ImageServerIoHandler extends IoHandlerAdapter {
private final static String characters = "mina rocks abcdefghijklmnopqrstuvwxyz0123456789";
public static final String INDEX_KEY = ImageServerIoHandler.class.getName() + ".INDEX";
private Logger logger = LoggerFactory.getLogger(this.getClass());
public void sessionOpened(IoSession session) throws Exception {
session.setAttribute(INDEX_KEY, 0);
}
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
IoSessionLogger sessionLogger = IoSessionLogger.getLogger(session, logger);
sessionLogger.warn(cause.getMessage(), cause);
}
public void messageReceived(IoSession session, Object message) throws Exception {
ImageRequest request = (ImageRequest) message;
String text1 = generateString(session, request.getNumberOfCharacters());
String text2 = generateString(session, request.getNumberOfCharacters());
BufferedImage image1 = createImage(request, text1);
BufferedImage image2 = createImage(request, text2);
ImageResponse response = new ImageResponse(image1, image2);
session.write(response);
}
private BufferedImage createImage(ImageRequest request, String text) {
BufferedImage image = new BufferedImage(request.getWidth(), request.getHeight(), BufferedImage.TYPE_BYTE_INDEXED);
Graphics graphics = image.createGraphics();
graphics.setColor(Color.YELLOW);
graphics.fillRect(0, 0, image.getWidth(), image.getHeight());
Font serif = new Font("serif", Font.PLAIN, 30);
graphics.setFont(serif);
graphics.setColor(Color.BLUE);
graphics.drawString(text, 10, 50);
return image;
}
private String generateString(IoSession session, int length) {
Integer index = (Integer) session.getAttribute(INDEX_KEY);
StringBuffer buffer = new StringBuffer(length);
while (buffer.length() < length) {
buffer.append(characters.charAt(index));
index++;
if (index >= characters.length()) {
index = 0;
}
}
session.setAttribute(INDEX_KEY, index);
return buffer.toString();
}
}
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-326964-1-1.html 上篇帖子: Apache ActiveMQ Queue Topic 详解 教程 加入代码解释说明 下篇帖子: Apache Pig的一些基础概念及用法总结(2)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表