shaerzzr 发表于 2017-3-3 08:44:50

activemq-5.13 在windows下部署应用

  一、下载windows压缩包
  解压并双击F:\server\activemq-5.13.2\bin\win64\activemq.bat 启动,32位的系统为F:\server\activemq-5.13.2\bin\win32\activemq.bat;
  二、配置访问权限
  编辑F:\server\activemq-5.13.2\conf\activemq.xml 文件,
  添加plugins节点:



<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
   <plugins>
<simpleAuthenticationPlugin>         
<users>            
<authenticationUser username="system" password="123" groups="users,admins"/>            
<authenticationUser username="user" password="321" groups="users"/>
<authenticationUser username="guest" password="111" groups="guests"/>
</users>
</simpleAuthenticationPlugin>
</plugins>

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
  三、访问http://localhost:8161/控制台;
  四、示例代码:



package activemq;
import java.io.Serializable;
public class User implements Serializable {
private static final long serialVersionUID = 1L;
private int id;
private String name;
private String pass;
public User(int id, String name, String pass){
this.id = id;
this.name = name;
this.pass = pass;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPass() {
return pass;
}
public void setPass(String pass) {
this.pass = pass;
}

}



package activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsSender {
public void send() {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://127.0.0.1:61616");
try {
Connection connection = connectionFactory.createConnection("user", "321");
connection.start();
Session session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("Test4.foo");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 100; i++) {
int id = i + 1;
ObjectMessage message = session.createObjectMessage();
message.setObject(new User(id, "李四" + id, "123456"));
/**
//文本消息   
TextMessage textMessage = session.createTextMessage("文本消息");   
//键值对消息   
MapMessage mapMessage = session.createMapMessage();   
mapMessage.setLong("age", new Long(32));   
mapMessage.setDouble("sarray", new Double(5867.15));   
mapMessage.setString("username", "键值对消息");   
//流消息   
StreamMessage streamMessage = session.createStreamMessage();   
streamMessage.writeString("streamMessage流消息");   
streamMessage.writeLong(55);
//字节消息   
String s = "BytesMessage字节消息";   
BytesMessage bytesMessage = session.createBytesMessage();   
bytesMessage.writeBytes(s.getBytes());
**/
producer.send(message);
}
session.commit();
session.close();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
            e.printStackTrace();
}
}
public static void main(String[] args){
new JmsSender().send();
}
}


package activemq;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsReceiver {
public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://127.0.0.1:61616");
connectionFactory.setTrustAllPackages(true);
try {
Connection connection = connectionFactory.createConnection("guest",
"111");
connection.start();
final Session session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("Test4.foo");
MessageConsumer consumer = session.createConsumer(destination);
// listener 方式
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
ObjectMessage message = (ObjectMessage) msg;
// TODO something....
try {
User user = (User) message.getObject();
System.out.println("收到消息:" + user.getName());
} catch (JMSException e1) {
// TODO Auto-generated catch block
                        e1.printStackTrace();
}
try {
session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
                        e.printStackTrace();
}
}
});
TimeUnit.MINUTES.sleep(1);
session.close();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
            e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
            e.printStackTrace();
}
}
public void onMessage(Message m) {
try {
if (m instanceof TextMessage) { // 接收文本消息
TextMessage message = (TextMessage) m;
System.out.println(message.getText());
} else if (m instanceof MapMessage) { // 接收键值对消息
MapMessage message = (MapMessage) m;
System.out.println(message.getLong("age"));
System.out.println(message.getDouble("sarray"));
System.out.println(message.getString("username"));
} else if (m instanceof StreamMessage) { // 接收流消息
StreamMessage message = (StreamMessage) m;
System.out.println(message.readString());
System.out.println(message.readLong());
} else if (m instanceof BytesMessage) { // 接收字节消息
byte[] b = new byte;
int len = -1;
BytesMessage message = (BytesMessage) m;
while ((len = message.readBytes(b)) != -1) {
System.out.println(new String(b, 0, len));
}
} else if (m instanceof ObjectMessage) { // 接收对象消息
ObjectMessage message = (ObjectMessage) m;
User user = (User) message.getObject();
} else {
System.out.println(m);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
页: [1]
查看完整版本: activemq-5.13 在windows下部署应用