设为首页 收藏本站
查看: 732|回复: 0

[经验分享] Apache Camel框架之事务控制

[复制链接]

尚未签到

发表于 2017-1-6 11:22:19 | 显示全部楼层 |阅读模式
本文简单介绍一下Apache Camel如何对route进行事务控制,首先介绍整个route只涉及到一个事务参与者的情况,然后再介绍route中涉及到多个事务参与者的情况.Camel是通过和Spring的框架集成进行事务控制的.
1,整个route只有一个事务参与者,"局部事务",这里用JMS的例子,后台的MQ为ActiveMQ,示例图如下:(图片来源于Camel in Action)

route的代码如下:
源码打印?
    public class JMSTransaction extends RouteBuilder {  
        public void configure() throws Exception {  
            TProcessor0 p0 = new TProcessor0();  
            TProcessor1 p1 = new TProcessor1();  
            from("jms:queue:TOOL.DEFAULT").process(p0).process(p1).to("file:d:/temp/outbox");         
        }  
    }  
Spring配置如下:
源码打印?
    <beans xmlns="http://www.springframework.org/schema/beans"  
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
           xmlns:broker="http://activemq.apache.org/schema/core"  
           xsi:schemaLocation="  
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
           http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd  
           http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">  
        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">  
            <package>  
                com.test.camel.transaction.jms  
            </package>  
        </camelContext>      
        <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">  
            <property name="transacted" value="true"/>  
            <property name="transactionManager" ref="txManager"/>              
        </bean>  
        <bean id="txManager" class="org.springframework.jms.connection.JmsTransactionManager">  
            <property name="connectionFactory" ref="jmsConnectionFactory"/>  
        </bean>  
        <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://localhost:61616"/>         
        </bean>  
    </beans>  
route定义的逻辑为从queue里取消息,然后进行一系列的处理(process(p0).process(p1)),<property name="transacted" value="true"/>的意思是通过这个jms进行的消息存取是有事务控制的.上面的route在process(p1)里抛出异常,txManager会进行rollback处理.(在activeMQ里,消息默认会redelivery到客户端6次,如果继续异常,消息会放到deadletter queue里(ActiveMQ.DLQ)),需要在AciveMQ的配置文件activemq.xml里配置如下:(non-persistent的queue的消息出错也转到dead letter queue)
<policyEntry queue=">">
<deadLetterStrategy>
   <sharedDeadLetterStrategy processNonPersistent="true" />
</deadLetterStrategy>
如果<property name="transacted" value="false"/>的话,消息在重发了6次后会丢失.
如果上面例子中的事务参与者是数据库的话,道理与之类似,只是配置的transaction manager不同,如:
<bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"/>
Camel里使用ActiveMQ JMS的例子可以参照 http://blog.csdn.net/kkdelta/article/details/7237096
2,Camel里的全局事务,一个route里有多个事务参与者,示例图如下:(图片来源于Camel in Action)

route的定义如下:
源码打印?
    public class XaTransaction extends RouteBuilder {  
        public void configure() throws Exception {  
            TProcessor1 p1 = new TProcessor1();  
            from("jms:queue:TOOL.DEFAULT")  
            .transacted()  
            .log("+++ before database +++")  
            .bean(SQLBean.class, "toSql")  
            .to("jdbc:myDataSource")  
            .process(p1)  
            .log("+++ after database +++");  
        }  
    }  
    public class SQLBean {  
        public String toSql(String str) {  
            //create table CamelTEST(msg varchar2(2000));  
            StringBuilder sb = new StringBuilder();  
            sb.append("INSERT INTO CamelTEST VALUES ('camel test')");  
            return sb.toString();  
        }  
    }  
route的逻辑是从queue里取消息,然后操作数据库,然后做后续其他操作(process(p1)),这里的process(p1)如果抛出异常的话,取消息和数据库操作都回滚,
如果整个route都成功完成的话,取消息和数据库操作提交.
这里用到JTA transaction manager是atomikos,相应的jar包可以从这里下载:http://download.csdn.net/detail/kkdelta/4056226
atomikos的主页 http://www.atomikos.com/Main/ProductsOverview
Spring的配置如下:
源码打印?
    <beans xmlns="http://www.springframework.org/schema/beans"  
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
           xmlns:broker="http://activemq.apache.org/schema/core"  
           xsi:schemaLocation="  
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
           http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd  
           http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">     
        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">  
            <package>  
                com.test.camel.transaction.xa  
            </package>  
        </camelContext>  
        <bean id="atomikosTransactionManager"  
              class="com.atomikos.icatch.jta.UserTransactionManager"  
              init-method="init" destroy-method="close" >  
            <!-- when close is called, should we force transactions to terminate or not? -->  
            <property name="forceShutdown" value="false"/>  
        </bean>  
        <!-- this is some atomikos setup you must do -->  
        <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" >  
            <property name="transactionTimeout" value="300"/>  
        </bean>  
        <!-- this is some atomikos setup you must do -->  
        <bean id="connectionFactory"  
              class="com.atomikos.jms.AtomikosConnectionFactoryBean" >  
            <property name="uniqueResourceName" value="amq1"/>  
            <property name="xaConnectionFactory" ref="jmsXaConnectionFactory"/>  
        </bean>  
        <!-- this is the Spring JtaTransactionManager which under the hood uses Atomikos -->  
        <bean id="jtaTransactionManager"  
              class="org.springframework.transaction.jta.JtaTransactionManager" >  
            <property name="transactionManager" ref="atomikosTransactionManager"/>  
            <property name="userTransaction" ref="atomikosUserTransaction"/>  
        </bean>  
        <!-- Is the ConnectionFactory to connect to the JMS broker -->  
        <!-- notice how we must use the XA connection factory -->  
        <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory" >  
            <property name="brokerURL" value="tcp://localhost:61616"/>  
        </bean>  
        <!-- define the activemq Camel component so we can integrate with the AMQ broker below -->  
        <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent" >  
            <property name="transacted" value="true"/>  
            <property name="transactionManager" ref="jtaTransactionManager"/>  
        </bean>  
       <bean id="myDataSource"   
          class="com.atomikos.jdbc.AtomikosDataSourceBean"   
          init-method="init" destroy-method="close">   
           <!-- set an arbitrary but unique name for the datasource -->   
          <property name="uniqueResourceName"><value>XADBMS</value></property>   
          <property name="xaDataSourceClassName">   
             <value>oracle.jdbc.xa.client.OracleXADataSource</value>   
          </property>   
          <property name="xaProperties">   
                    <props>   
                            <prop key="user">xxx</prop>   
                            <prop key="password">xxx</prop>   
                            <prop key="URL">jdbc:oracle:thin:@147.151.240.xxx:1521:orcl</prop>  
                    </props>   
          </property>      
          <property name="poolSize" value="1"/>   
       </bean>      
    </beans>
    <project>  
<modelVersion>4.0.0</modelVersion>  
<groupId>my-osgi-bundles</groupId>  
<artifactId>examplebundle</artifactId>  
<packaging>bundle</packaging>    <!-- (1) -->  
<version>1.0</version>  
<name>Example Bundle</name>  
<dependencies>  
<dependency>  
<groupId>org.apache.felix</groupId>  
<artifactId>org.osgi.core</artifactId>  
<version>1.0.0</version>  
</dependency>  
</dependencies>  
<build>  
<plugins>  
<plugin>    <!-- (2) START -->  
<groupId>org.apache.felix</groupId>  
<artifactId>maven-bundle-plugin</artifactId>  
<extensions>true</extensions>  
<configuration>  
<instructions>  
<Export-Package>com.my.company.api</Export-Package>  
<Private-Package>com.my.company.*</Private-Package>  
<Bundle-Activator>com.my.company.Activator</Bundle-Activator>  
</instructions>  
</configuration>  
</plugin>    <!-- (2) END -->  
</plugins>  
</build>  
</project>

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-324666-1-1.html 上篇帖子: Apache虚拟机的配置文件解说 下篇帖子: 转:让Apache Shiro保护你的应用
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表