设为首页 收藏本站
查看: 1271|回复: 0

[经验分享] Apache MINA (4) 接收处理请求的过程

[复制链接]

尚未签到

发表于 2017-1-11 10:56:23 | 显示全部楼层 |阅读模式
  上一篇博客 Apache MINA (3) NioSocketAcceptor初始化 了解了NioSocketAcceptor的初始化过程,完成了初始化,Acceptor线程被阻塞,处于等待客户端请求到达的状态,通过进一步研究源代码了解Mina处理请求的过程。
  当有客户端的请求到达时selector.select()被唤醒

if (selected > 0) {
// We have some connection request, let's process
// them here.
processHandles(selectedHandles());
}
  processHandles(selectedHandles())负责处理所有接收到的请求,首先调用了NioSocketAcceptor的accept方法,接受来自客户端的请求,并初始化一个NioSocketSession,并将SocketChannel绑定到NioSession中,指定了处理该NioSession的NioProcesser,IoServices,IoFilterChain等,最后执行

session.getProcessor().add(session); 
  把session交给SimpleIoProcessorPool处理。
  上一篇博客中介绍了SimpleIoProcessorPool的初始化,他初始化了一个无界线程池this.executor = Executors.newCachedThreadPool();并定义了一个cpu+1个NioProcessor绑定到这个executor中,每个NioProcessor中初始化了一个Selector,提供Nio相关的具体实现,如select(),read(),write()等方法
  SimpleIoProcessorPool的add(S session)方法首先指定一个处理session的NioProcessor;其中getProcessor(session),该方法会先从session的attributeMap中找有没有指定好的Processor,如果没有,根据sessionId和cpu+1取模取到一个NioProcessor放到attributeMap中。

processor = pool[Math.abs((int) session.getId()) % pool.length];
session.setAttributeIfAbsent(PROCESSOR, processor);


拿到对应的NioProcessor之后再执行NioPorcessor的add(S sission)方法,实现类是在其父类AbstractPollingIoProcessor中,session会被放到一个线程安全的队列newSessions中

private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
 然后启动processor,执行startupProcessor();

该方法初始化一个AbstractPollingIoProcessor的内部线程类Processor并放到executor中执行 

executor.execute(new NamePreservingRunnable(processor, threadName));
 processor开始执行,调用NioProcessor的select()方法,然后执行handleNewSessions(),处理newSessions队列中的session,执行addNow()方法,拿到session对应的SelectableChannel,设置为非阻塞的通道,注册通道为OP_READ,准备读取数据,同时为session构建filterChain,然后通知listeners激活创建事件,激活过滤器当前session创建和打开事件。




然后执行process()方法开始读取数据,初始化IoBuffer

IoBuffer buf = IoBuffer.allocate(bufferSize);
 调用NioProcessor的read()方法从channel中读取数据到IoBuffer中,这里指定的默认读取方式是读到buffer满或者数据读完然后结束然后调用buf.flip();重置buffer的指针为初始化状态

然后调用责任链激活数据已经读取到

filterChain.fireMessageReceived(buf);
 如果数据已经读完,会将当前session放到flushingSessions队列中。

filterChain负责将从chain中读取的byte数据转换成业务需要的数据,如对象,字符串等,从之前Hellow World的例子中看到,为这个责任链添加了一个按行读取数据的filter

chain.addLast("myChain", new ProtocolCodecFilter(new TextLineCodecFactory()));


 构造方法中指定了按行处理的数据的encoder和decoder,如果没有读到换行,则将临时数据存到TextLineDecoder的内部类Context中,直到读到换行,将整行放到AbstractProtocolDecoderOutput的消息队列中,如果消息队列不为空,才将转换好的数据传递给下一个filter

Queue<Object> messageQueue = getMessageQueue();
while (!messageQueue.isEmpty()) {
nextFilter.messageReceived(session, messageQueue.poll());
}


 这个时候消息进入到对应业务处理的IoHandler的实现类中去完成业务的相关操作,在之前的例子中创建了一个对应的实现类SimpleMinaServerHandler来处理

@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
String str = (String)message;
// 打印客户端
System.out.println("receive client message : [ " + str +" ]");
// 回写消息给客户端
session.write(count.incrementAndGet());
}
 然后再调用session的write方法将处理后的信息回写给客户端

初始化WriteRequest,交给filterChain,执行fireFilterWrite回写数据给客户端




小结


以上过程描述了一次简单的客户端请求处理的过程,其核心过程如下:

服务端接到客户端的请求,在bind监听端口后,调用startupAcceptor()方法,接收线程Acceptor被唤醒,创建IoSession,构造IoFilter链,将构造好的IoSession提交给IoProcessor来处理。

SimpleIoProcessorPool初始化时创建了一个cpu+1个IoProcessor来负责处处理请求,按照取模的方式找到一个对应的IoProcessor来处理IoSession,IoSession被放到一个newSessions的队列中等待被处理,startupProcessor()唤醒IoProcessor,开始读取数据,通过IoFilter链进行转换处理,最终提供给IoHandlerAdapter处理请求。



借用网上一张经典的图片来描述这个过程:


DSC0000.jpg
 接下来会再深入研究mina的线程模型。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-326979-1-1.html 上篇帖子: 如何将Apache Shiro集成到基于Spring的应用 下篇帖子: StringUtil包函数(org.apache.commons.lang.StringUtil)之用法
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表