设为首页 收藏本站
查看: 842|回复: 0

[经验分享] JMS服务器ActiveMQ的初体验并持久化消息到MySQL数据库中.

[复制链接]

尚未签到

发表于 2016-10-23 07:29:09 | 显示全部楼层 |阅读模式
  一、JMS的理解

JMS(Java Message Service)是jcp组织02-03年定义了jsr914规范(http://jcp.org/en/jsr/detail?id=914),它定义了消息的格式和消息传递模式;

消息包括:消息头,消息扩展属性和消息体,其结构看起来与SOAP非常的相似,但一般情况下,SOAP主要关注远程服务调用,而消息则专注于信息的交换;

消息分为:消息生产者,消息服务器和消息消费者。生产者与消费者之间是透明的,生产者在产生消息之后,把消息发送到消息服务器,再由消息服务器发给消费者,因此它们构成了JMS的3点结构;

消息服务器再给消费者时,有2种模式:点到点(ptp: point to point)模式和发布/订阅(publish/subscribe)模式;

ptp:即生产者把消息投递到消息服务器后,这条消息只能由某一个消费者使用;

发布/订阅:顾名思义,就是共享消息了,只要愿意,消费者都可以监听消息;


二、消息服务器(ActiveMQ)

消息服务器在JMS的3点结构中起着重要作用,没有它,生产者的消息不知道如何投递出去,消费者不知道从哪里取得消息,它同样是隔离生产者和消费者的关键部分…………

JMS消息服务器有很多:ActiveMQ、Jboss MQ、Open MQ、RabbitMQ、ZeroMQ等等。

本文介绍的是开源的Java实现的Apache ActiveMQ(http://activemq.apache.org),它的特性在首页就能看到,我就不再介绍了;


1、下载AMQ:http://activemq.apache.org/download.html,最新版本是5.5.0;

2、解压apache-activemq-5.5.0-bin.zip文件到文件系统(比如D:\ActiveMQ-5.5.0);

3、执行bin/activemq.bat脚本即可启动AMQ:

INFO | ActiveMQ 5.5.0 JMS Message Broker (localhost) is starting
......
INFO | Listening for connections at: tcp://SHI-AP33382A:61616
  当看到上面的日志输出时,表示AMQ已经启动了;

4、默认情况下,AMQ使用conf/activemq.xml作为配置文件,我们可修改它,然后以 bin/activemq.bat xbean:./conf/my.xml启动AMQ;


三、持久化消息(MySQL)

因为接下来我们修改AMQ的默认配置文件,所以先备份conf/activemq.xml文件;

1、建立MySQL数据库:要使用MySQL存储消息,必须告诉AMQ数据源:

/**
* 创建数据库
*/
CREATE DATABASE misc DEFAULT CHARSET=UTF8;
/**
* 创建用户和授权
*/
GRANT ALL PRIVILEGES ON misc.* TO 'misc_root'@'%' IDENTIFIED BY 'misc_root_pwd';
GRANT ALL PRIVILEGES ON misc.* TO 'misc_root'@'localhost' IDENTIFIED BY 'misc_root_pwd';
  通过上面的SQL脚本,我们建立了名为misc的数据库,并且把所有权限都赋予了misc_root的用户;

由于AMQ需要在本数据库中建立数据表,因此用户的权限必须具有建表权限;

2、添加MySQL数据源:默认情况下,AMQ使用KahaDB存储(我对KahaDB不了解),注释到KahaDB的配置方式,改为MySQL的:

<!--
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#MySQL-DS"/>
</persistenceAdapter>
   

该配置表示,我们将要使用一个叫做“MySQL-DS”的JDBC数据源;

3、配置MySQL数据源:在</broker>节点后面,增加MySQL数据源配置:

<!-- MySQL DataSource -->
<bean id="MySQL-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1:3306/misc?useUnicode=true&amp;characterEncoding=UTF-8"/>
<property name="username" value="misc_root"/>
<property name="password" value="misc_root_pwd"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
   

其实这就是一个Spring的Bean的配置,注意id与上面的保持一致;


整个AMQ的配置文件内容为:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>classpath:/META-INF/credentials.properties</value>
</property>
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost">
<!--
For better performances use VM cursor and small memory limit. For more information, see: http://activemq.apache.org/message-cursors.html Also, if your producer is "hanging", it's probably due to producer flow control. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<!--
Use VM cursor for better latency For more information, see: http://activemq.apache.org/message-cursors.html <pendingQueuePolicy> <vmQueueCursor/> </pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false" />
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#MySQL-DS" />
</persistenceAdapter>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616" />
</transportConnectors>
</broker>
<!-- MySQL DataSource -->
<bean id="MySQL-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://127.0.0.1:3306/misc?useUnicode=true&amp;characterEncoding=UTF-8" />
<property name="username" value="misc_root" />
<property name="password" value="misc_root_pwd" />
<property name="poolPreparedStatements" value="true" />
</bean>
<!--
Enable web consoles, REST and Ajax APIs and demos
It also includes Camel (with its web console), see ${ACTIVEMQ_HOME}/conf/camel.xml for more info
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>
   


四、查看MySQL数据表

重新启动AMQ,启动完成之后,我们发现,misc数据库多了3张数据表:

mysql> SHOW tables;
+----------------+
| Tables_in_misc |
+----------------+
| activemq_acks  |
| activemq_lock  |
| activemq_msgs  |
+----------------+
   

数据表activemq_msgs即为持久化消息表;


五、持久化消息

系统启动完毕之后,消息表中内容为空:

mysql> SELECT * FROM activemq_msgs;
Empty set
   


1、发送消息:打开http://127.0.0.1:8161/demo/页面,找到“Send a message”链接,打开页面(http://127.0.0.1:8161/demo/send.html),填写完表格后,点击“Send”按键,即AMQ投递了一个消息;

2、查看消息:发送之后,我们可以看到数据表中多了一条消息:

mysql> SELECT * FROM activemq_msgs;
+----+-----------------+--------------------------------------------+-----------+------------+-----+----------+
| ID | CONTAINER       | MSGID_PROD                                 | MSGID_SEQ | EXPIRATION | MSG | PRIORITY |
+----+-----------------+--------------------------------------------+-----------+------------+-----+----------+
|  1 | queue://FOO.BAR | ID:SHI-AP33382A-1486-1309840138441-2:2:1:1 |         1 |          0 | |        5 |
+----+-----------------+--------------------------------------------+-----------+------------+-----+----------+
   

3、取得消息:找到“Receive a message”链接,打开页面(http://127.0.0.1:8161/demo/message/FOO/BAR?readTimeout=10000&type=queue),发现该页面不是一个标准HTML页面,查看其源代码,其内容是不是就是刚才的消息内容?

4、查看消息:消息消费之后,我们可以看到数据表没有消息了:

mysql> SELECT * FROM activemq_msgs;
Empty set
   

5、我们可以生产多条消息,然后一条一条的消费,发现消息表中的消息一条一条的减少;

6、在发送消息页面,“Destination Type”如果选择“Topic”的话,则消息表中并没有数据,原因在于“Queue”为ptp模式消息,“Topic”为发布/订阅模式消息,当没有订阅者时,消息直接丢掉了。


JMS的内容先介绍到这里,下面我将结合Spring来启动AMQ(即AMQ与应用一同启动,上面介绍的都是单独的启动),通过测试代码来发送和消费消息,敬请期待!


------------------------

欢迎大家批评指正:

http://obullxl.iyunv.com

http://www.cnblogs.com/obullxl

http://hi.baidu.com/obullxl

-----------------------

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-289982-1-1.html 上篇帖子: [MySQL生产环境复制故障修复] Last_IO_Errno: 1045 Last_IO_Error: error connecting to master 下篇帖子: MySQL 实现树形的遍历(关于多级菜单栏以及多级上下部门的查询问题)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表