设为首页 收藏本站
查看: 822|回复: 0

[经验分享] ActiveMQ笔记(1):编译、安装、示例代码

[复制链接]

尚未签到

发表于 2017-2-28 11:58:05 | 显示全部楼层 |阅读模式
  一、编译
  虽然ActiveMQ提供了发布版本,但是建议同学们自己下载源代码编译,以后万一有坑,还可以尝试自己改改源码。
1.1 https://github.com/apache/activemq/releases 到这里下载最新的release版源码(当前最新版本为5.13.2),并解压到某个目录(以下用$ACTIVEMQ_HOME代替解压根目录)
1.2 编译



cd $ACTIVEMQ_HOME
mvn clean install -Dmaven.test.skip=true

  编译成功后,在$ACTIVEMQ_HOME/assembly/target下会生成可xxx.bin.tar.gz的可执行文件压缩包
  二、启动
将编译后得到的xxx.bin.tar.gz解压,然后执行



tar -zxvf apache-activemq-5.13.2-bin.tar.gz
cd apache-activemq-5.13.2/bin
./activemq start

  后面的可选参数还有 status、restart、stop、list等,不清楚的地方,直接 --help 查看。
  注:生产环境中,可能会对activemq的jvm内存设置上限,可以直接修改bin/activemq启动脚本,vi bin/activemq 找到下面的位置:



# Note: This function uses globally defined variables
# - $ACTIVEMQ_PIDFILE : the name of the pid file
# - $ACTIVEMQ_OPTS : Additional options
ACTIVEMQ_OPTS="-server -Xms512M -Xmx512M -XX:PermSize=64M -XX:MaxPermSize=128M "
# - $ACTIVEMQ_SUNJMX_START : options for JMX settings
# - $ACTIVEMQ_SSL_OPTS : options for SSL encryption

  设置ACTIVEMQ_OPTS即可,然后重启activemq,建议启动成功后,用jinfo {activemq的pid} 来验证查看一下  
  三、管理界面
启动成功后,可以浏览 http://localhost:8161/admin/
默认用户名、密码:admin/admin
管理界面是用jetty做容器的,如果想修改管理界面的端口,可以编辑../conf/jetty.xml,找到下面这一段:



<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>
  用户名/密码是在 ../conf/jetty-realm.properties 里,比如要增加一个管理员jimmy/123456,可参考下面修改:



admin: admin, admin
jimmy: 123456, admin
user: user, user  

  注:管理界面有一个小坑,ActiveMQ 5.13.2与jdk1.8兼容性有点问题,如果使用jdk1.8,管理界面进入Queues标签页时,偶尔会报错,但是并不影响消息正常收发,只是无法从界面上查看队列情况,如果出现该问题,可将jdk版本降至1.7,同时最好清空data目录下的所有数据,再重启activemq即可。
  2016-06-18 注:最新版的5.13.3已经修复了这个bug,建议大家使用最新版本。
四、示例代码
通常消息队列都支持二种模式:基于主题(topic)的发布(Publish)/订阅(Subscribe)模式、点对点(p2p)模式,下面的示例代码为p2p场景。
先给出gradle项目的依赖项



dependencies {
compile "org.springframework:spring-core:4.2.5.RELEASE"
compile "org.springframework:spring-beans:4.2.5.RELEASE"
compile "org.springframework:spring-context:4.2.5.RELEASE"
compile "org.springframework:spring-jms:4.2.3.RELEASE"
compile 'org.apache.activemq:activemq-all:5.13.2'
compile 'org.apache.commons:commons-pool2:2.4.2'
testCompile group: 'junit', name: 'junit', version: '4.12'
}

  4.1 spring配置文件


DSC0000.gif DSC0001.gif


1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
5
6     <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
7         <property name="connectionFactory">
8             <bean class="org.apache.activemq.ActiveMQConnectionFactory">
9                 <!--broker服务的地址-->
10                 <property name="brokerURL" value="tcp://localhost:61616"/>
11                 <!--默认值为1000,如果不需要这么大,可以调小-->
12                 <property name="maxThreadPoolSize" value="100"/>
13             </bean>
14         </property>
15     </bean>
16
17     <bean id="dest" class="org.apache.activemq.command.ActiveMQQueue">
18         <!--队列名称-->
19         <property name="physicalName" value="myQueue"/>
20     </bean>
21
22     <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
23         <property name="connectionFactory" ref="jmsFactory"/>
24         <!--默认的队列-->
25         <property name="defaultDestination" ref="dest"/>
26         <!--接收超时时间10秒-->
27         <property name="receiveTimeout" value="10000"/>
28     </bean>
29
30 </beans>
View Code  注:brokerURL的地址是在conf/activemq.xml里定义里,见下面的片段





1 <transportConnectors>
2  <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
3  <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
4  <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
5  <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
6  <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
7  <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
8 </transportConnectors>
View Code  另外,连接ActiveMQ默认情况下,没有任何安全机制,也就是说任何人只要知道brokerURL都能连接,这显然不安全,可以在activemq.xml里,找到<broker>节点,紧贴它的地方添加下面这段:



    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<plugins>   
<simpleAuthenticationPlugin>   
<users>   
<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>   
</users>   
</simpleAuthenticationPlugin>   
</plugins>
...
</broker>
  那么问题来了,这个${activemq.username}及${activemq.password}的值是在哪里定义的呢?仍然在activemq.xml里找答案,在最开始的地方有一段:



1 <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
2     <property name="locations">
3         <value>file:${activemq.conf}/credentials.properties</value>
4     </property>
5 </bean>
  换句话说,conf/credentials.properties这里保存的就是连接activemq的用户名和密码,启用连接的安全机制后,spring的配置文件要做如下调整:





1 <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
2     <property name="connectionFactory">
3         <bean class="org.apache.activemq.ActiveMQConnectionFactory">
4             <!--broker服务的地址-->
5             <property name="brokerURL" value="tcp://localhost:61616"/>
6             <!--默认值为1000,如果不需要这么大,可以调小-->
7             <property name="maxThreadPoolSize" value="100"/>
8             <property name="userName" value="system"/>
9             <property name="password" value="manager"/>
10         </bean>
11     </property>
12 </bean>
View Code  4.2 生产者代码
  发送消息的代码有二种写法:
  a)利用spring-jms的JmsTemplate



package com.cnblogs.yjmyzz.activemq;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
/**
* ActiveMQ消息发送示例(利用JMSTemplate)
* Author:菩提树下的杨过 http://yjmyzz.cnblogs.com
*/
public class JmsTemplateProducer {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
System.out.println("准备发送消息...");
int max = 100000;
Long start = System.currentTimeMillis();
for (int i = 0; i < max; i++) {
jmsTemplate.convertAndSend("message test:" + i);
}
Long end = System.currentTimeMillis();
Long elapse = end - start;
int perform = Double.valueOf(max / (elapse / 1000d)).intValue();
System.out.print("发送 " + max + " 条消息,耗时:" + elapse + "毫秒,平均" + perform + "条/秒");
}
}

  b) 利用activeMQ的Producer



package com.cnblogs.yjmyzz.activemq;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import javax.jms.*;
import java.io.IOException;
/**
* ActiveMQ消息发送示例(利用Producer)
* Author:菩提树下的杨过 http://yjmyzz.cnblogs.com
*/
public class ActiveMQProducer {
public static void main(String[] args) throws JMSException, IOException, InterruptedException {
ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");
PooledConnectionFactory connectionFactory = context.getBean(PooledConnectionFactory.class);
ActiveMQQueue destination = context.getBean(ActiveMQQueue.class);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
System.out.println("准备发送消息...");
int max = 100000;
Long start = System.currentTimeMillis();
for (int i = 0; i < max; i++) {
TextMessage msg = session.createTextMessage("message test:" + i);
//msg.setIntProperty("id", i);
producer.send(msg);
}
Long end = System.currentTimeMillis();
Long elapse = end - start;
int perform = Double.valueOf(max / (elapse / 1000d)).intValue();
System.out.print("发送 " + max + " 条消息,耗时:" + elapse + "毫秒,平均" + perform + "条/秒");
//producer.send(session.createTextMessage("SHUTDOWN"));
//Thread.sleep(1000 * 3);
//connection.close();
System.exit(0);
}
}

  这二种方式在性能上差不多,4核8G的mac book pro上,大致每秒可以写入3k+条消息。但是从代码量来讲,明显JmsTemplate的代码量更少,推荐使用。
  4.3 消费者代码
  当然也可以用JmsTemplate接收消息,但是一般得自己去写while(true)循环,而且默认情况下,上下文如果不是同一个连接,JmsTemplate A发出的消息,JmsTemplate B是接收不到的,所以不建议这种方式。最好参考下面的示例,使用JMS的MessageLisenter去监听消息,这也是JMS规范建议的标准做法:



package com.cnblogs.yjmyzz.activemq;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import javax.jms.*;
import java.io.IOException;

/**
* ActiveMQ消息接收示例(使用MessageListener)
* Author:菩提树下的杨过 http://yjmyzz.cnblogs.com
*/
public class MessageListenerConsumer {
public static void main(String[] args) throws JMSException, IOException {
ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");
PooledConnectionFactory connectionFactory = context.getBean(PooledConnectionFactory.class);
ActiveMQQueue destination = context.getBean(ActiveMQQueue.class);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new ActiveMQListener());
System.in.read();
}

static class ActiveMQListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
System.out.println(((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}

  4.4 嵌入式Broker
  类似jetty、tombat之类可以内嵌到代码中启动一样,ActiveMQ也可以直接在代码中内嵌启动,这个很方便一些轻量级的使用场景,示例代码如下:



public class EmbbedBroker {
public static void main(String[] args) throws Exception {
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
broker.start();
System.out.println("ActiveMQ 已启动!");
}
}

  关于嵌入式Broker的更多细节,可以参考 http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html
  4.5 消息的自动确认与手动确认
  在接收消息时,如果Session使用的是 Session.AUTO_ACKNOWLEDGE,即:



Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  则消息一旦被接受,不论onMessage()里的业务逻辑执行成功与否,消息都将从ActiveMQ的队列里立刻删除。如果希望业务处理成功后,再通知ActiveMQ删除消息,可以改成:



Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

  然后onMessage方法调用message.acknowledge手动确认,参考以下代码:



    static class ActiveMQListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
System.out.println(((TextMessage) message).getText());
message.acknowledge(); //手动确认消息
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-348431-1-1.html 上篇帖子: HTTPS证书生成方法,也适用于APP 下篇帖子: WebSocket 介绍(一)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表