设为首页 收藏本站
查看: 650|回复: 0

[经验分享] spring 集成rabbitmq

[复制链接]

尚未签到

发表于 2017-12-9 17:41:01 | 显示全部楼层 |阅读模式
  mq.properties



mq.host=主机ip
mq.username=admin
mq.password=admin123
mq.port=5672
mq.queue.vip=test-queue
mq.exchange=test-exchange
mq.vhost=test
  spring-rabbitmq.xml


  <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-3.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/util
     http://www.springframework.org/schema/util/spring-util-3.0.xsd
     http://www.springframework.org/schema/aop
     http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
     http://www.springframework.org/schema/tx
     http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
  <!-- RabbitMQ start -->
<!-- 连接配置 -->
<rabbit:connection-factory id="mqConnectionFactory" virtual-host="${mq.vhost}"
host="${mq.host}" username="${mq.username}" password="${mq.password}"
port="${mq.port}" />
  <rabbit:admin connection-factory="mqConnectionFactory" />
  <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<bean id="jsonMessageConverter" class="com.mq.util.FastJsonMessageConverter"></bean>
  
<!-- 消息队列客户端 -->
<rabbit:template id="amqpTemplate" exchange="${mq.exchange}"
connection-factory="mqConnectionFactory"  message-converter="jsonMessageConverter" />
  <!-- queue 队列声明 -->
<!-- durable 是否持久化 exclusive 仅创建者可以使用的私有队列,断开后自动删除 auto-delete 当所有消费端连接断开后,是否自动删除队列 -->
<rabbit:queue id="my_queue_vip" name="${mq.queue.vip}"
durable="true" auto-delete="false" exclusive="false" />
  <!-- 交换机定义 -->
<!-- 交换机:一个交换机可以绑定多个队列,一个队列也可以绑定到多个交换机上。 如果没有队列绑定到交换机上,则发送到该交换机上的信息则会丢失。
direct模式:消息与一个特定的路由器完全匹配,才会转发 topic模式:按规则转发消息,最灵活 -->
<rabbit:direct-exchange name="${mq.exchange}"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="my_queue_vip" key="vip_key"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<!-- 配置监听 消费者 -->
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="my_queue" ref="rabbitmqService" />
</rabbit:listener-container>
  
</beans>
  FastJsonMessageConverter



package com.pptv.ucm.mq.util;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.MessageConversionException;
import com.alibaba.fastjson.JSON;
/**
* @类名:FastJsonMessageConverter .
* @描述: *****  .
* @作者: yakunMeng .
* @创建时间: 2017年8月11日 上午10:08:00 .
* @版本号: V1.0 .
*/
public class FastJsonMessageConverter extends AbstractJsonMessageConverter {
private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);
private static ClassMapper classMapper = new DefaultClassMapper();
public FastJsonMessageConverter() {
super();
}
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
byte[] bytes = null;
try {
String jsonString = JSON.toJSONString(object);
bytes = jsonString.getBytes(getDefaultCharset());
} catch (IOException e) {
throw new MessageConversionException("Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(getDefaultCharset());
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
classMapper.fromClass(object.getClass(), messageProperties);
return new Message(bytes, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();
if (contentType != null && contentType.contains("json")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = getDefaultCharset();
}
try {
Class<?> targetClass = getClassMapper().toClass(message.getMessageProperties());
content = convertBytesToObject(message.getBody(), encoding, targetClass);
} catch (IOException e) {
throw new MessageConversionException("Failed to convert Message content", e);
}
} else {
log.warn("Could not convert incoming message with content-type [" + contentType + "]");
}
}
if (content == null) {
content = message.getBody();
}
return content;
}
private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz)
throws UnsupportedEncodingException {
String contentAsString = new String(body, encoding);
return JSON.parseObject(contentAsString, clazz);
}
}
  RabbitmqService



package com.pptv.ucm.service.impl;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import com.pptv.ucm.common.util.ImportExecl;
import com.pptv.ucm.common.util.StringUtil;
import net.sf.json.JSONObject;
/**
* rabbitmq监听消息,消费者
*
* @author st
*
*/
public class RabbitmqService implements MessageListener {
private static Logger log = LoggerFactory.getLogger(RabbitmqService.class);
public void onMessage(Message message) {
log.info("消息消费者 = " + message);
String content = null;
try {
content = new String(message.getBody(), "utf-8");
if (StringUtil.isNotBlank(content)) {
JSONObject object = JSONObject.fromObject(content.toString());
String batch_code = !object.has("batch_code") ? "" : object.getString("batch_code");
String file_url = !object.has("file_url") ? "" : object.getString("file_url");
log.info("batch_code=" + batch_code + "file_url=" + file_url);
String localExcelPath = ImportExecl.getDiskPath(file_url);
System.out.println(localExcelPath);
ImportExecl poi = new ImportExecl();
List<List<String>> list = poi.read(localExcelPath);
if (list != null) {
for (int i = 0; i < list.size(); i++) {
System.out.print("第" + (i) + "行");
List<String> cellList = list.get(i);
for (int j = 0; j < cellList.size(); j++) {
System.out.print(" " + cellList.get(j));
}
System.out.println();
}
}
new File(localExcelPath).delete();//删除本地文件
            }
} catch (Exception e) {
log.info("报错了e= " + e.getMessage());
e.printStackTrace();
}
log.info(content);
}
}
  MQProducerImpl



package com.pptv.ucm.service.impl;
import javax.annotation.Resource;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.pptv.ucm.service.IMQProducer;
@Service
public class MQProducerImpl implements IMQProducer {
@Value(value = "${mq.queue}")
private String queueId;
@Value(value = "${mq.exchange}")
private String mqExchange;
@Value(value = "${mq.patt}")
private String mqPatt;
@Resource
private AmqpTemplate amqpTemplate;
public void sendQueue(Object object) {
// convertAndSend 将Java对象转换为消息发送至匹配key的交换机中Exchange
        amqpTemplate.convertAndSend(mqExchange, mqPatt, object);
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422475-1-1.html 上篇帖子: 服务器时间与互联网时间不一致,出现的问题 下篇帖子: 算法导论 第7章 高速排序
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表