|
1 简介
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
详细介绍可参考:RabbitMQ技术详解
2 配置
2.1 配置文件
与Spring整合,通常使用 applicationContext-springmq.xml 作为RabbitMQ配置文件的命名,web.xml会自动扫描并加载
2.2 配置说明
2.2.1 rabbit:connection-factory
连接工厂
addresses:服务器地址
virtual-host:虚拟主机
username:用户名
password:密码
2.2.2 rabbit:admin
管理
connection-factory:连接工厂ID
2.2.3 rabbit:queue
队列
name:队列名称
durable:是否持久化
auto-delete:是否自动删除
2.2.4 rabbit:topic-exchange
交换机
name:交换机名称
durable:是否持久化
auto-delete:是否自动删除
2.2.5 rabbit:bindings,rabbit:binding
交换机绑定队列
queue:队列ID
pattern:规则
2.2.6 rabbit:template
消息模板
exchange:交换机ID
connection-factory:连接工厂ID
message-converter:消息转换类
2.3 示例
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
<rabbit:connection-factory id="rabbitConnectionFactory" addresses="${rabbitmq.address}" virtual-host="${rabbitmq.vhost}" username="${rabbitmq.username}" password="${rabbitmq.password}" />
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<!-- OTAMessage的转换器 -->
<bean id="otaMessageConvertor" class="org.springframework.amqp.support.converter.JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="com.zxq.iov.cloud.sp.vp.api.ServiceMessage"/>
</bean>
</property>
</bean>
<!-- 存放发往tbox app消息的queue -->
<rabbit:queue id="AppTBOXServiceQueue.app.v.1.0" name="AppTBOXServiceQueue.app.v.1.0" durable="true" auto-delete="false"/>
<!-- 发往tbox app消息的exchange -->
<rabbit:topic-exchange id="tboxAppServiceExchange" name="tboxAppServiceExchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="AppTBOXServiceQueue.app.v.1.0" pattern="AppTBOXServiceQueue.AV.1.*"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 发往tbox app消息的template-->
<rabbit:template id="tboxAppServiceTemplate" exchange="tboxAppServiceExchange"
connection-factory="rabbitConnectionFactory" message-converter="otaMessageConvertor"/>
</beans> 延时队列
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
<rabbit:connection-factory id="rabbitConnectionFactory" addresses="${rabbitmq.addresses}"
virtual-host="${rabbitmq.vhost}" username="${rabbitmq.username}"
password="${rabbitmq.password}"/>
<rabbit:admin connection-factory="rabbitConnectionFactory"/>
<!-- OTAMessage的转换器 -->
<bean id="otaMessageConvertor" class="org.springframework.amqp.support.converter.JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="com.zxq.iov.cloud.gw.tbox.api.msg.OTAMessage"/>
</bean>
</property>
</bean>
<!-- 存放tbox下行消息的queue -->
<rabbit:queue id="GWTBOXQueue.MessageSend" name="GWTBOXQueue.MessageSend" durable="true" auto-declare="true"
auto-delete="false"/>
<!-- tbox下行消息的exchange -->
<rabbit:direct-exchange id="messageSendExchange" name="messageSendExchange" durable="true" auto-declare="true"
auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="GWTBOXQueue.MessageSend" key="GWTBOXQueue.MessageSend"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 发送下行TBOX消息的queue template-->
<rabbit:template id="messageSendTemplate" exchange="messageSendExchange" routing-key="GWTBOXQueue.MessageSend"
connection-factory="rabbitConnectionFactory" message-converter="otaMessageConvertor"/>
<!-- 消费下行tbox消息的listener-->
<rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="auto" message-converter="otaMessageConvertor" concurrency="10">
<rabbit:listener queues="GWTBOXQueue.MessageSend" method="onMessage" ref="clientBootstrap" />
</rabbit:listener-container>
<!-- 存放发往tbox app消息的queue -->
<rabbit:queue id="AppTBOXQueue.app.v.1.0" name="AppTBOXQueue.app.v.1.0" durable="true" auto-declare="true"
auto-delete="false"/>
<!-- 发往tbox app消息的exchange -->
<rabbit:topic-exchange id="tboxAppExchange" name="tboxAppExchange" durable="true" auto-declare="true"
auto-delete="false">
<rabbit:bindings>
<!-- route key规则, PV为协议版本, AV为应用数据版本 -->
<rabbit:binding queue="AppTBOXQueue.app.v.1.0" pattern="AppTBOXQueue.PV.2.*.AV.*.*"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 发往tbox app消息的template-->
<rabbit:template id="tboxAppTemplate" exchange="tboxAppExchange" connection-factory="rabbitConnectionFactory"
message-converter="otaMessageConvertor"/>
<!-- 存放tbox下行消息超时检测的queue -->
<rabbit:queue id="GwTimeoutDetectDelayQueue.MessageSend" name="GwTimeoutDetectDelayQueue.MessageSend" durable="true"
auto-declare="true" auto-delete="false">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="GwTimeoutDetectExchange"/>
<entry key="x-dead-letter-routing-key" value="GwTimeoutDetectQueue.MessageSend"/>
<entry key="x-message-ttl">
<value type="java.lang.Long">3000</value>
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue id="GwTimeoutDetectQueue.MessageSend" name="GwTimeoutDetectQueue.MessageSend" durable="true"
auto-declare="true" auto-delete="false"/>
<!-- tbox下行消息超时检测的exchange -->
<rabbit:direct-exchange id="GwTimeoutDetectDelayExchange" name="GwTimeoutDetectDelayExchange" durable="true"
auto-declare="true"
auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="GwTimeoutDetectDelayQueue.MessageSend" key="GwTimeoutDetectDelayQueue.MessageSend"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:direct-exchange id="GwTimeoutDetectExchange" name="GwTimeoutDetectExchange" durable="true"
auto-declare="true"
auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="GwTimeoutDetectQueue.MessageSend" key="GwTimeoutDetectQueue.MessageSend"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 发送下行TBOX消息超时检测的queue tgemplate-->
<rabbit:template id="gwTimeoutDetectTemplate" exchange="GwTimeoutDetectDelayExchange"
routing-key="GwTimeoutDetectDelayQueue.MessageSend"
connection-factory="rabbitConnectionFactory" message-converter="otaMessageConvertor"/>
<!-- 消费下行tbox消息超时检测的listener-->
<rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="auto" message-converter="otaMessageConvertor" concurrency="10">
<rabbit:listener queues="GwTimeoutDetectQueue.MessageSend" method="detectTimeout" ref="clientBootstrap" />
</rabbit:listener-container>
</beans>3 使用
// 注入rabbitmq
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
public void demo() {
// 发送消息
rabbitTemplate.convertAndSend("routeKey", messageObject);
}4 监控
队列1监控地址 |
|
|
|
|
|
|