设为首页 收藏本站
查看: 760|回复: 0

[经验分享] Apache Commons Pipeline 使用学习(一)

[复制链接]

尚未签到

发表于 2017-1-7 08:05:04 | 显示全部楼层 |阅读模式
  本系列源于对commons-pipeline 使用的学习:
  首先是:翻译官方文档
  本文是针对使用 Apache Commons Pipeline工作流程框架的基础介绍,本文档目标读者为需要组装现有stages或编写自己的stages的开发人员。该项目提供了一个Java类库旨在提升使用和重用模块化的stage的易用性。
一、Pipeline结构:

  • Stages:stages在Pipeline中代表来处理数据所需的逻辑单元。每一个stage代表一个高层次的处理概念:如查找文件,读取文件格式,从数据计算产品,或者将数据写入到数据库。使用工作流架构并且构建处理单元到Stages的主要优点是提升Stages在其他Pipeline的可重用性。
DSC0000.png
 


  • Pipeline:一个Pipeline由stages构建而成,这些stages可以将数据传递给后续的stages。上图中箭头所标记的“EMIT”表明一个stage的数据输出被传递到下一个stage。从代码层面看,有一个 EMIT() 方法将数据发送到下一个stage。数据流开始于左侧,在那里有一个标为“FEED”的箭头。FEED通常通过一个配置文件开始一个Pipeline,这点在后面讨论。Stage自己不关心输入的数据是来自FEED或前一阶段的EMIT()。
  Pipeline也可发送相同或不同的数据到不同的分支(branch),使数据沿不同的处理路线流转。
  
DSC0001.png
 

  • 通过Digester 或者 Spring 配置:有两种方法来配置Pipeline,都是是基于XML的控制文件。其中更简单的方法是使用Digester,Pipeline的最终用户可以修改该配置来满足自己的需求。 Spring框架也被用于配置Pipeline,但是由于他的结构与java编程对象更加接近,使得配置更复杂,同时也更灵活。通过XML配置文件的顺序来控制stage的顺序,以及stage的特殊参数都在该文件中进行设置。这些控制文件还允许以环境遍历的形式设置对所有stage可见全部参数,这种配置的方式使得完全不需要重新编译Java代码即可实现很多改变Pipeline布局和行为的需求。
  本教程将介绍使用Digester配置的pipelines,因为这是比较简单的方法。
 二、Stages使用
  一个标准的stage都有一个队列来缓存输入的数据对象。当某些stage比其他stage具有不同吞吐量或不规则的处理速度时,排队是一种有效的处理手段,特别是那些数据依赖于网络连接或近线媒体的stage。这个队列并不是stage本身的一部分,而是由stage driver来管理,stage driver负责当数据准备好的时候,将其送入stage。stage将一个数据对象传给下一级stage的时候,它可能在一个队列中排队等待(按顺序接收),直到下一个stage准备好来处理它。通常情况下,每个stage运行一个单独的处理线程。当然,对于一些应用也可以配置pipelines,对于同一个对象一次处理的所以stage运行一个单一的线程,也就是说,除非前一个对象以及完成了所有stage。下一个对象不会开始处理。
  Stages都继承自抽象类(org.apache.commons.pipeline.stage.BaseStage),有许多现有的stage可以直接使用,以满足各种加工要求。你还可以通过扩展BaseStage或其他现有的stage之一创建自定义的stage。
  下图展示了各种类性和数量。
  
DSC0002.png
 
  上图中stage说明:

  • 通常所有进入一个stage的这些对象是同一类型的。避免在stage的代码中重复的写switch语句去区分对象,可以通过使用分支来隔离不同的对象类型。
  • 一个数据对象输入stage并不总是产生一个输出对象。

  •  终端stage不传递(EMIT)任何数据对象。应当避免创建这种类型的阶段,因为他们限制了你建造pipelines时的可能性。 (这很容易做到,将数据传递到下一个阶段只需要一行代码。)
  • 传递(EMIT)对象到一个以上的后续stage的stage被称为分支stage。
  • 如果stages传递的对象与他们解释的对象为同一类型的对象,仅仅是做选择条件过滤,被称为过滤stage。
  • 通常都有读取数据的stage和写数据的stage,他们将数据读入pipeline或者pipeline作为的输出。
  • 创造一个与传递给他们的不同的对象的stage被称为转换stage。


  • 传递的(emitted)对象类型没有必要保持一致。
  • 当碰到分支的时候,进入到不同stage中的对象不必是相同类型的,或有相同的量。注意上图中的“FileReader”stage对于每个到来的文件产生100单元的数据对象,但是通过只有一个边界形状被传递给给了分支。
  其他注意事项(不一定是从上面的图中很明显):

  • 虽然数据被传递道stage是Java对象的方式,但是stage接受到他们往往期待是一个更具体的数据类型,如文件或数据记录。通常对接受的对象进行检查,看它们是否是期望的数据类型的实例,然后在具体工作之前转换为该类性的对象。
  • 您可以为你的pipeline的每个stage设置 stage driver。他们可以限制队列的大小来控制内存和资源使用。对于有界队列来说,上游stage将阻塞并等待,直到下游stage的队列有足够的空间。
StageDriver的作用
  StageDriver是一个Java接口,他控制将数据提交(feeding)给stage和不同stages之间的通信。因此,stage的生命周期和不同stage之间的相互作用是非常依赖于通过这些stage drivers来控制。这些StageDriver的工厂实现StageDriverFactory接口。在pipeline初始化的时候,StageDrivers是由产生特定类型的StageDriver的工厂类提供。每个stage都会有其自己的 StageDriver的实例,并在pipeline内的不同stage可以使用不同类型的StageDrivers,虽然常见的情况是pipeline中所有stage使用相同类型StageDriver(全部共享相同的StageDriverFactory实现)。
  下面是一些通用的StageDriver
DedicatedThreadStageDriver为每一个stage生成一个单独的线程,DedicatedThreadStageDriverFactory()提供
SynchronousStageDriver这个是非线程的StageDriverSynchronousStageDriverFactory()提供
ThreadPoolStageDriver使用一个线程池处理输入的对象,ThreadPoolStageDriverFactory()提供
  本教程将介绍DedicatedThreadStageDriver,因为这是一个很好的通用驱动程序。某些时候你肯希望编写自己的StageDriver实现,这里没有涉及到一个高级的主题。
 
Stage内部解析
  如果你需要编写自己的stage,本节给出了一些为了实现Stage接口你需要了解的方法的概述。
  stage本身定义org.apache.commons.pipeline.Stage接口,它具有下列方法:
Stage Interface Methods
 
init(StageContext) 关联stage和environment。在生命周期中运行一次。
preprocess() 做任何必要的设置。在生命周期中运行一次。
process(Object) 处理数据对象和传递结果到下一stage。跑N次,为每一个传递到其中的对象运行一次。
postprocess() 处理汇总数据等,在生命周期中运行一次。
release() 清理该stage持有的资源。在生命周期中运行一次。
  一个可用的抽象类为org.apache.commons.pipeline.BaseStage,许多其它的stage都是从来衍生出来的。你可以扩展这个类或者在BaseStage之上建造的其他stage。它提供了stage接口的所有方法的无操作实现。您可以按照需求重写这些方法。对于简单的处理,你可能并不需要重写init(StageContext),postprocess(), nor release()方法 。你几乎只需要提供自己的process(Object)方法。从软件设计的角度来看,认为反转控制的,因为你不是在写一个自定义的主程序来调用标准的子程序,而是,写自定义的子程序通过一个标准的主程序调用。
      BaseStage提供了一个emit(Object obj)方法和提交到分支的emit(String branch, Object obj) 方法来传递对象到下一个stage。因此,通常会在process()方法的结束的附近调用EMIT()。终端stage不调用EMIT(),所以没有对象被传递。这也很容易改变,通过添加EMIT()的代码,终端stage变成一个普通的stage。注意,一个stage提交一个数据对象没有后续stage使用它,这样提交的对象只是没有使用而已并没有什么坏处。有时EMIT()方法在postprocess中调用。当处理缓存,或输入和输出对象的汇总,则process()方法通常仅仅存储进入对象的信息,postprocess()完成了工作之后并提交一个新的对象。
  接下一篇:Apache Commons Pipeline 使用学习(二)

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-324863-1-1.html 上篇帖子: Apache commons-pool对象池原理分析 下篇帖子: Apache common pool 1 原理分析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表