设为首页 收藏本站
查看: 560|回复: 0

[经验分享] Camel之AsyncProcessor

[复制链接]

尚未签到

发表于 2017-7-2 12:53:27 | 显示全部楼层 |阅读模式
Camel支持一种更复杂的异步的处理模型,异步处理器实现一个继承自Processor接口的AsyncProcessor接口,使用异步Processor的长处:

a.异步Processor不会因等待堵塞调用而耗尽线程。这样在处理相同工作量的情况下,通过降低线程的数量能够添加系统的伸缩性

b.使用异步Processor,能够将路由分阶段处理。不同的线程池处理其对应的路由阶段。这就意味着路由能够并行处理。  

缺点:实现异步的Processor要比同步的Processor复杂得多。
  



异步Processor与同步Processor的差别:

a.必须提供一个AsyncCallback对象,该对象在exchange处理完毕后被通知

b.在异步Processor处理exchange的时候不能抛出不论什么异常,而应该将异常存储在exchange的Exception属性中

c.异步Processor必须知道它将以什么方式完毕处理,异步或同步,假设process方法返回true。则是同步完毕。假设process方法返回false,则是异步完毕。

d.当处理器处理完exchange时,它必须调用callback.done(boolean sync)方法。sync參数必须与process方法的返回值一致。



对于一个路由来说,全然使用异步模式能够减少线程的使用量,这要求从Consumer開始就必须使用异步的处理API(即调用异步的

process方法),假设Consumer调用的是同步process()方法,那么消费者线程在处理Exchange时将被强制堵塞。

有一点必须注意的是当你调用了异步的API。这并不意味着处理过程就是异步的,这不过为不捆绑在调用者线程提供了可能。

  至于是否是进行异步处理依赖于Camel路由的配置.
  以上是Camel官方对异步Processor的解释,以下是本人用于測试的一个样例:
  

public static void main(String[] args) throws Exception {
RouteBuilder builder = new RouteBuilder() {
@Override
public void configure() throws Exception {
RouteDefinition definition1 = this.from("file:H:/temp/in");
RouteDefinition definition2 = definition1.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(Thread.currentThread().getName());
System.out.println("process1");
}
}).process(new AsyncProcessor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("process");
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
System.out.println(Thread.currentThread().getName());
System.out.println("async process");
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.done(false);
return false;
}
}).process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(Thread.currentThread().getName());
System.out.println("process2");
}
});
definition2.to("file:H:/temp/out");
}
};
DefaultCamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(builder);
camelContext.start();
Object object = new Object();
synchronized (object) {
object.wait();
}

}

当我看到异步两个字时,直觉就是使用异步Processor时会启用新的线程进行处理,但在上面的样例中,三个线程名称是一样的,

而且在堵塞了10秒后process2才打印出来,这说明上面的三个processor是在同一个线程中运行的,这也是堵塞10秒的原因。  



我个人觉得是对Camel异步Processor的"异步"两字理解出现了偏差。这里的异步仅仅为processor的processor方法。提供一个

回调函数,而不是另启线程。
  并且我们自己写Processor处理器对这个异步的使用也非常有限,由于我们写的处理器是被调用者。AsyncCallback是由上层提供的,我们仅仅是能调用其done方法通知上层本次处理完毕。而我们很多其它的需求应该是自己去注冊回调函数,并且我们可以控制这个回调函数的回调时机。而如今我们无法提供回调函数的注冊。那我们不禁要问,这个AsyncCallback对象那究竟是谁提供的呢?AsyncCallback对象的源头当然是在消费者类提供的,对上面的样例来说是在FileConsumer类中。例如以下是GenericFileConsumer的processExchange方法的一个片段(FileConsumer继承自GenericFileConsumer)

  

getAsyncProcessor().process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
// noop
if (log.isTraceEnabled()) {
log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously");
}
}
});

这时创建的AsyncCallback对象就是源始的回调对象。当然在路由运行的兴许过程中,该回调对象能够被包装。当中CamelInternalProcessor的process(Exchange exchange, AsyncCallback callback)方法就是一个样例:

callback = new InternalCallback(states, exchange, callback);

这里我们不禁又会问。既然CamelInternalProcessor可以对源始AsyncCallback对象进行包装增加自己的回调逻辑,为什么我们自己不行呢。其原来还是我们写的Processor是被调用者。是被包装者,详细过程可參看Camel路由启动过程。



假设非要加入自己的回调逻辑也不是不可能。就仅仅能自己写消费者。自己写消费者就能控制源AsyncCallback对象,其兴许仅仅是对

源AsyncCallback对象的一个包装的过程。仅仅要保证最外层的AsyncCallback对象被调用,那么源AsyncCallback对象也一定会被调用。所以在上例中,假设在第二个Processor中假设不运行callback.done(false);的话路由过程将永远不会结束,由于上层一直觉得下层处理还未结束。  当然假设我们不写异常Processor,路由过程还是会正常结束的,Camel内部会自行处理,可是假设我们写了异步Processor就一定要调用callback.done方法。



所以这么一通下来。并没有感受到官方提及的不堵塞调用、降价线程使用、路由分阶段处理等,个人的感觉就是多了一个回调方法,并且这个回调功能还非常有限,当然这也有可能是自己什么地方理解错了,如若如此。尽请指正......

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390339-1-1.html 上篇帖子: AMQP 0-9-1模型简介 下篇帖子: RabbitMQ基础总结
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表