设为首页 收藏本站
查看: 940|回复: 0

[经验分享] rabbitMQ教程(三) spring整合rabbitMQ代码实例

[复制链接]

尚未签到

发表于 2017-12-9 16:37:16 | 显示全部楼层 |阅读模式
一、开启rabbitMQ服务,导入MQ jar包和gson jar包(MQ默认的是jackson,但是效率不如Gson,所以我们用gson)

DSC0000.png DSC0001.png

二、发送端配置,在spring配置文件中配置



<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

<!-- 连接服务配置 如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码  guest默认不允许远程登录-->  
<rabbit:connection-factory id="connectionFactory"  
host="localhost" username="guest" password="guest" port="5672"  
virtual-host="/" channel-cache-size="5" />  
<!-- 配置爱admin,自动根据配置文件生成交换器和队列,无需手动配置 -->
<rabbit:admin connection-factory="connectionFactory" />  
<!-- queue 队列声明 -->  
<rabbit:queue  durable="true"  
auto-delete="false" exclusive="false" name="spring.queue.tag" />  

<!-- exchange queue binging key 绑定 -->  
<rabbit:direct-exchange name="spring.queue.exchange"  
durable="true" auto-delete="false">  
<rabbit:bindings>  
<rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key" />  
</rabbit:bindings>  
</rabbit:direct-exchange>  
<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 -->  
<bean id="jsonMessageConverter"    class="sendMQ.Gson2JsonMessageConverter" />  
<!-- spring template声明 -->  
<rabbit:template id="amqpTemplate" exchange="spring.queue.exchange"  routing-key="spring.queue.tag.key"  
connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
  发送端代码:GSON配置



package sendMQ;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.MessageConversionException;
import com.google.gson.Gson;
public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter{
private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);  
private static  ClassMapper classMapper =  new DefaultClassMapper();  
private static Gson gson = new Gson();  
public Gson2JsonMessageConverter() {  
super();  
}  
@Override  
protected Message createMessage(Object object,  
MessageProperties messageProperties) {  
byte[] bytes = null;  
try {  
String jsonString = gson.toJson(object);  
bytes = jsonString.getBytes(getDefaultCharset());  
}  
catch (IOException e) {  
throw new MessageConversionException(  
"Failed to convert Message content", e);  
}  
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);  
messageProperties.setContentEncoding(getDefaultCharset());  
if (bytes != null) {  
messageProperties.setContentLength(bytes.length);  
}  
classMapper.fromClass(object.getClass(),messageProperties);  
return new Message(bytes, messageProperties);  
}  
@Override  
public Object fromMessage(Message message)  
throws MessageConversionException {  
Object content = null;  
MessageProperties properties = message.getMessageProperties();  
if (properties != null) {  
String contentType = properties.getContentType();  
if (contentType != null && contentType.contains("json")) {  
String encoding = properties.getContentEncoding();  
if (encoding == null) {  
encoding = getDefaultCharset();  
}  
try {  
Class<?> targetClass = getClassMapper().toClass(  
message.getMessageProperties());  
content = convertBytesToObject(message.getBody(),  
encoding, targetClass);  
}  
catch (IOException e) {  
throw new MessageConversionException(  
"Failed to convert Message content", e);  
}  
}  
else {  
log.warn("Could not convert incoming message with content-type ["  
+ contentType + "]");  
}  
}  
if (content == null) {  
content = message.getBody();  
}  
return content;  
}  
private Object convertBytesToObject(byte[] body, String encoding,  
Class<?> clazz) throws UnsupportedEncodingException {  
String contentAsString = new String(body, encoding);  
return gson.fromJson(contentAsString, clazz);  
}  
}
  发送类接口:



public interface MQProducer {
/**
* 发送消息到指定队列
* @param queueKey
* @param object
*/
public void sendDataToQueue(String queueKey, Object object);
}
  实现类:test是测试用的。



package sendMQ;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(value = SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
"classpath:/spring-common.xml"})

@Component
public class MQProducerImpl implements MQProducer {
@Autowired
private  AmqpTemplate amqpTemplate;
@Override
public void sendDataToQueue(String queueKey, Object object) {
System.out.println("--"+amqpTemplate);
try {
amqpTemplate.convertAndSend(object);
System.out.println("------------消息发送成功");
} catch (Exception e) {
System.out.println(e);
}
}
@Test
public  void test() {  
Map<String,Object> msg = new HashMap<>();
msg.put("data","hello,456");
while(true){
amqpTemplate.convertAndSend(msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO 自动生成的 catch 块
                e.printStackTrace();
}
}
}  
}
  接收端配置:



  <!-- 连接服务配置  -->  
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"  
password="guest" port="5672" virtual-host="/"  channel-cache-size="5" />  
<rabbit:admin connection-factory="connectionFactory"/>  
<!-- queue 队列声明-->  
<rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag"/>  

<!-- exchange queue binging key 绑定 -->  
<rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false">  
<rabbit:bindings>  
<rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"/>  
</rabbit:bindings>  
</rabbit:direct-exchange>  
<bean id="receiveMessageListener"  
class="receiveMQ.QueueListenter" />  
<!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->  
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >  
<rabbit:listener queues="spring.queue.tag" ref="receiveMessageListener" />  
</rabbit:listener-container>  
  接收端代码:



package receiveMQ;
import java.io.UnsupportedEncodingException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class QueueListenter implements MessageListener{
@Override
public void onMessage(Message msg) {
try {
System.out.print("-------------------"+new String(msg.getBody(),"UTF-8"));
} catch (UnsupportedEncodingException e) {
// TODO 自动生成的 catch 块
            e.printStackTrace();
}
}
}
  接收端测试启动:



package receiveMQ;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ConsumerMain {
public static void main(String[] args) {  
new ClassPathXmlApplicationContext("spring-common.xml");   
}  
}
  上面代码均有注释,应该不难看懂,复制即可使用,实现了MQ的简单功能。
  说明:可以配置多个接收端,spring默认的是负载均衡机制,每个接收端接收一条的来,这些扩展功能待后面有时间再讲解

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422460-1-1.html 上篇帖子: spring boot实战(第十二篇)整合RabbitMQ 下篇帖子: rabbitmq shovel插件
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表