设为首页 收藏本站
查看: 544|回复: 0

[经验分享] apache mina 学习(十)-----Codec Filter

[复制链接]

尚未签到

发表于 2017-1-8 08:09:25 | 显示全部楼层 |阅读模式
  首先明白为什么用ProtocolCodecFilter:
  1、TCP保证了按顺序传输所有的数据包,但是不能保证发送端进行了一个写操作会导致接收端相应的进行一个读操作。
  2、在mina中如果没有ProtocolCodecFilter,发送端的一个IoSession.write(Object message)操作会触发接收端的多个messageReceived(IoSessionsession, Object message)事件,多个IoSession.write(Object message)操作也可能会导致只触发了一个messageReceived事件,这不就乱套了嘛。
  3、很多时候我们需要知道当前message的终止位置和下一个message的起始位置。
  4、分离基础协议逻辑和业务逻辑。
  一般来说我们如果想从一长串字节流中得到我们要的数据并组织成业务上的pojo,我们一般用以下几种方法:
  1、采用固定长度的message
  2、用固定的头标示body的长度
  3、用基于文本的标示,如换行、回车等
  我们基于前两种方式的会比较多一些。
  下面是官方的一个例子:
  首先定义一下网络协议采用TCP/IP协议,客户端发送的消息的格式如下:
  4 bytes
  4 bytes
  4 bytes
  width
  height
  numchars
  开头4个字节是一个图片的宽度,中间4个字节是图片的高度,最后4个字节是图片中字符的数量,我们可以暂且认为这是个图片验证码的小demo。
  服务端发回给客户端的就是图片,当然图片是通过字节流的方式发过来的,消息体如下:
  4 bytes
  variable length body
  4 bytes
  variable length body
  length1
  image1
  length2
  image2
  开头的四个字节代表第一张图片的长度,然后是图片的具体内容,然后是第二张图片的长度和第二张图片的具体内容。
  然后定义两个实体,用来封装服务端的响应和客户端的请求:
  public class ImageRequest {
  private int width;
  private int height;
  private int numberOfCharacters;
  public ImageRequest(int width, int height, int numberOfCharacters) {
  this.width = width;
  this.height = height;
  this.numberOfCharacters = numberOfCharacters;
  }
  public int getWidth() {
  return width;
  }
  public int getHeight() {
  return height;
  }
  public int getNumberOfCharacters() {
  return numberOfCharacters;
  }
  }
  public class ImageResponse {
  private BufferedImageimage1;
  private BufferedImageimage2;
  publicImageResponse(BufferedImage image1, BufferedImage image2) {
  this.image1= image1;
  this.image2= image2;
  }
  public BufferedImagegetImage1() {
  returnimage1;
  }
  public BufferedImagegetImage2() {
  returnimage2;
  }
  }
  然后需要把这两个业务的bean转换为事先定义的message格式,这就是传说中的encode和decode:
  首先对ImageRequest进行encode,mina中需要实现ProtocolEncode接口,并重写encode方法:
  public class ImageRequestEncoder implements ProtocolEncoder {
  public voidencode(IoSession session, Object message,ProtocolEncoderOutput out) throws Exception {
  ImageRequest request =(ImageRequest) message;
  IoBuffer buffer =IoBuffer.allocate(12, false);
  buffer.putInt(request.getWidth());
  buffer.putInt(request.getHeight());
  buffer.putInt(request.getNumberOfCharacters());
  buffer.flip();
  out.write(buffer);
  }
  public voiddispose(IoSession session) throws Exception {
  // nothing to dispose
  }
  }
  同样,我们需要实现一个解码器来把底层传输的byte转换为我们的业务对象ImageRequest,注意要实现CumulativeProtocolDecoder的doDecode方法:
  public class ImageRequestDecoder extendsCumulativeProtocolDecoder {
  protected booleandoDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
  if (in.remaining()>= 12) {
  int width =in.getInt();
  int height =in.getInt();
  intnumberOfCharachters = in.getInt();
  ImageRequest request = newImageRequest(width, height, numberOfCharachters);
  out.write(request);
  return true;
  } else {
  return false;
  }
  }
  }
  同理,对ImageResponse进行编码和解码:
  public class ImageResponseEncoder extendsProtocolEncoderAdapter {
  public voidencode(IoSession session, Object message,ProtocolEncoderOutput out) throws Exception {
  ImageResponse imageResponse= (ImageResponse) message;
  byte[] bytes1 =getBytes(imageResponse.getImage1());
  byte[] bytes2 =getBytes(imageResponse.getImage2());
  int capacity =bytes1.length + bytes2.length + 8;
  IoBuffer buffer =IoBuffer.allocate(capacity, false);
  buffer.setAutoExpand(true);//设置自动扩充
  buffer.putInt(bytes1.length);
  buffer.put(bytes1);
  buffer.putInt(bytes2.length);
  buffer.put(bytes2);
  buffer.flip();
  out.write(buffer);
  }
  private byte[]getBytes(BufferedImage image) throws IOException {
  ByteArrayOutputStream baos =newByteArrayOutputStream();
  ImageIO.write(image, "PNG", baos);
  returnbaos.toByteArray();
  }
  }
  public class ImageResponseDecoder extendsCumulativeProtocolDecoder {
  private static final String DECODER_STATE_KEY= ImageResponseDecoder.class.getName() + ".STATE";//存储decoding的进度
  public static final int MAX_IMAGE_SIZE= 5 * 1024 * 1024;
  private static classDecoderState {
  BufferedImage image1;
  }
  protected booleandoDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
  DecoderState decoderState =(DecoderState) session.getAttribute(DECODER_STATE_KEY);
  if (decoderState== null) {
  decoderState = new DecoderState();
  session.setAttribute(DECODER_STATE_KEY, decoderState);
  }
  if(decoderState.image1 == null) {
  // try to read firstimage
  if(in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {//这个方法对于有长度前缀的message解析很好用
  decoderState.image1= readImage(in);
  } else {
  // not enough dataavailable to read first image
  return false;
  }
  }
  if(decoderState.image1 != null) {
  // try to read second image
  if(in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
  BufferedImage image2= readImage(in);
  ImageResponseimageResponse = new ImageResponse(decoderState.image1,image2);
  out.write(imageResponse);
  decoderState.image1= null;
  return true;
  } else {
  // not enough dataavailable to read second image
  return false;
  }
  }
  return false;
  }
  private BufferedImagereadImage(IoBuffer in) throws IOException {
  int length =in.getInt();
  byte[] bytes = new byte[length];
  in.get(bytes);
  ByteArrayInputStream bais = newByteArrayInputStream(bytes);
  returnImageIO.read(bais);
  }
  }
  然后把这四个编码解码器注册为自己的CodecFactory:
  public class ImageCodecFactory implementsProtocolCodecFactory {
  private ProtocolEncoderencoder;
  private ProtocolDecoderdecoder;
  publicImageCodecFactory(boolean client) {
  if (client) {
  encoder = newImageRequestEncoder();
  decoder = newImageResponseDecoder();
  } else {
  encoder = newImageResponseEncoder();
  decoder = newImageRequestDecoder();
  }
  }
  public ProtocolEncodergetEncoder(IoSession ioSession) throws Exception {
  return encoder;
  }
  public ProtocolDecodergetDecoder(IoSession ioSession) throws Exception {
  return decoder;
  }
  }
  然后是客户端和服务端的测试代码:
  public class ImageServer {
  public static final int PORT = 33789;
  public static void main(String[] args) throws IOException {
  ImageServerIoHandler handler= new ImageServerIoHandler();
  NioSocketAcceptor acceptor =newNioSocketAcceptor();
  acceptor.getFilterChain().addLast("protocol", newProtocolCodecFilter(new ImageCodecFactory(false)));
  acceptor.setLocalAddress(newInetSocketAddress(PORT));
  acceptor.setHandler(handler);
  acceptor.bind();
  System.out.println("server islistenig at port " + PORT);
  }
  }
  public class ImageClient extendsIoHandlerAdapter {
  public static final int CONNECT_TIMEOUT =3000;
  private String host;
  private int port;
  private SocketConnectorconnector;
  private IoSession session;
  private ImageListenerimageListener;
  public ImageClient(String host, int port, ImageListener imageListener) {
  this.host= host;
  this.port= port;
  this.imageListener= imageListener;
  connector = newNioSocketConnector();
  connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(newImageCodecFactory(true)));
  connector.setHandler(this);
  }
  public voidmessageReceived(IoSession session, Object message) throws Exception {
  ImageResponse response =(ImageResponse) message;
  imageListener.onImages(response.getImage1(), response.getImage2());
  }
  ...
  public class ImageServerIoHandler extends IoHandlerAdapter {
  private final static String characters = "mina rocksabcdefghijklmnopqrstuvwxyz0123456789";
  public static final String INDEX_KEY =ImageServerIoHandler.class.getName() + ".INDEX";
  private Logger logger =LoggerFactory.getLogger(this.getClass());
  public voidsessionOpened(IoSession session) throws Exception {
  session.setAttribute(INDEX_KEY, 0);
  }
  public voidexceptionCaught(IoSession session, Throwable cause) throwsException {
  IoSessionLogger sessionLogger =IoSessionLogger.getLogger(session, logger);
  sessionLogger.warn(cause.getMessage(),cause);
  }
  public voidmessageReceived(IoSession session, Object message) throws Exception {
  ImageRequest request = (ImageRequest)message;
  String text1= generateString(session, request.getNumberOfCharacters());
  String text2= generateString(session, request.getNumberOfCharacters());
  BufferedImage image1 =createImage(request, text1);
  BufferedImage image2 =createImage(request, text2);
  ImageResponse response = new ImageResponse(image1, image2);
  session.write(response);
  }
  private BufferedImagecreateImage(ImageRequest request, String text) {
  BufferedImage image = new BufferedImage(request.getWidth(), request.getHeight(),BufferedImage.TYPE_BYTE_INDEXED);
  Graphics graphics =image.createGraphics();
  graphics.setColor(Color.YELLOW);
  graphics.fillRect(0, 0,image.getWidth(), image.getHeight());
  Font serif = new Font("serif", Font.PLAIN, 30);
  graphics.setFont(serif);
  graphics.setColor(Color.BLUE);
  graphics.drawString(text, 10, 50);
  returnimage;
  }
  private String generateString(IoSession session, int length) {
  Integer index= (Integer) session.getAttribute(INDEX_KEY);
  StringBuffer buffer = new StringBuffer(length);
  while(buffer.length() < length) {
  buffer.append(characters.charAt(index));
  index++;
  if(index >= characters.length()) {
  index = 0;
  }
  }
  session.setAttribute(INDEX_KEY, index);
  returnbuffer.toString();
  }
  }
  运行结果:

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-325301-1-1.html 上篇帖子: apache mina 学习(十一)-----状态机(stateMachine) 下篇帖子: apache mina 学习(三)-----Mina架构
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表