设为首页 收藏本站
查看: 1101|回复: 0

[经验分享] 【转载】java实现rabbitmq消息的发送接受

[复制链接]

尚未签到

发表于 2017-7-4 22:03:51 | 显示全部楼层 |阅读模式
  原文地址:http://blog.csdn.net/sdyy321/article/details/9241445
  本文不介绍amqp和rabbitmq相关知识,请自行网上查阅
  本文是基于spring-rabbit中间件来实现消息的发送接受功能
  see http://www.rabbitmq.com/tutorials/tutorial-one-Java.html
  see http://www.springsource.org/spring-amqp



[html] view plain copy
  

  • <!-- for rabbitmq -->  
  •     <dependency>  
  •         <groupId>com.rabbitmq</groupId>  
  •         <artifactId>amqp-client</artifactId>  
  •         <version>2.8.2</version>  
  •     </dependency>  
  •     <dependency>  
  •         <groupId>org.springframework.amqp</groupId>  
  •         <artifactId>spring-amqp</artifactId>  
  •         <version>1.1.1.RELEASE</version>  
  •     </dependency>  
  •     <dependency>  
  •         <groupId>org.springframework.amqp</groupId>  
  •         <artifactId>spring-rabbit</artifactId>  
  •         <version>1.1.1.RELEASE</version>  
  •     </dependency>  
  •     <dependency>  
  •         <groupId>com.caucho</groupId>  
  •         <artifactId>hessian</artifactId>  
  •         <version>4.0.7</version>  
  •     </dependency>  
  •   </dependencies>  
  首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象



[java] view plain copy
  

  • public class EventMessage implements Serializable{  

  •     private String queueName;  

  •     private String exchangeName;  

  •     private byte[] eventData;  

  •     public EventMessage(String queueName, String exchangeName, byte[] eventData) {  
  •         this.queueName = queueName;  
  •         this.exchangeName = exchangeName;  
  •         this.eventData = eventData;  
  •     }

  •     public EventMessage() {  
  •     }

  •     public String getQueueName() {  
  •         return queueName;  
  •     }

  •     public String getExchangeName() {  
  •         return exchangeName;  
  •     }

  •     public byte[] getEventData() {  
  •         return eventData;  
  •     }

  •     @Override  
  •     public String toString() {  
  •         return "EopEventMessage [queueName=" + queueName + ", exchangeName="  
  •                 + exchangeName + ", eventData=" + Arrays.toString(eventData)  
  •                 + "]";  
  •     }
  • }
  为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂



[java] view plain copy
  

  • public interface CodecFactory {  

  •     byte[] serialize(Object obj) throws IOException;  

  •     Object deSerialize(byte[] in) throws IOException;  

  • }
  下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式



[java] view plain copy
  

  • public class HessionCodecFactory implements CodecFactory {  

  •     private final Logger logger = Logger.getLogger(HessionCodecFactory.class);  

  •     @Override  
  •     public byte[] serialize(Object obj) throws IOException {  
  •         ByteArrayOutputStream baos = null;  
  •         HessianOutput output = null;  
  •         try {  
  •             baos = new ByteArrayOutputStream(1024);  
  •             output = new HessianOutput(baos);  
  •             output.startCall();
  •             output.writeObject(obj);
  •             output.completeCall();
  •         } catch (final IOException ex) {  
  •             throw ex;  
  •         } finally {  
  •             if (output != null) {  
  •                 try {  
  •                     baos.close();
  •                 } catch (final IOException ex) {  
  •                     this.logger.error("Failed to close stream.", ex);  
  •                 }
  •             }
  •         }
  •         return baos != null ? baos.toByteArray() : null;  
  •     }

  •     @Override  
  •     public Object deSerialize(byte[] in) throws IOException {  
  •         Object obj = null;  
  •         ByteArrayInputStream bais = null;  
  •         HessianInput input = null;  
  •         try {  
  •             bais = new ByteArrayInputStream(in);  
  •             input = new HessianInput(bais);  
  •             input.startReply();
  •             obj = input.readObject();
  •             input.completeReply();
  •         } catch (final IOException ex) {  
  •             throw ex;  
  •         } catch (final Throwable e) {  
  •             this.logger.error("Failed to decode object.", e);  
  •         } finally {  
  •             if (input != null) {  
  •                 try {  
  •                     bais.close();
  •                 } catch (final IOException ex) {  
  •                     this.logger.error("Failed to close stream.", ex);  
  •                 }
  •             }
  •         }
  •         return obj;  
  •     }

  • }
  接下来就先实现发送功能,新增一个接口专门用来实现发送功能



[java] view plain copy
  

  • public interface EventTemplate {  

  •     void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;  

  •     void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;  
  • }
  SendRefuseException是自定义的发送失败异常类
  下面是它的实现类,主要的任务就是将数据转换为EventMessage



[java] view plain copy
  

  • public class DefaultEventTemplate implements EventTemplate {  

  •     private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);  

  •     private AmqpTemplate eventAmqpTemplate;  

  •     private CodecFactory defaultCodecFactory;  

  • //  private DefaultEventController eec;  
  • //  
  • //  public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,  
  • //          CodecFactory defaultCodecFactory, DefaultEventController eec) {  
  • //      this.eventAmqpTemplate = eopAmqpTemplate;  
  • //      this.defaultCodecFactory = defaultCodecFactory;  
  • //      this.eec = eec;  
  • //  }  

  •     public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {  
  •         this.eventAmqpTemplate = eopAmqpTemplate;  
  •         this.defaultCodecFactory = defaultCodecFactory;  
  •     }

  •     @Override  
  •     public void send(String queueName, String exchangeName, Object eventContent)  
  •             throws SendRefuseException {  
  •         this.send(queueName, exchangeName, eventContent, defaultCodecFactory);  
  •     }

  •     @Override  
  •     public void send(String queueName, String exchangeName, Object eventContent,  
  •             CodecFactory codecFactory) throws SendRefuseException {  
  •         if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {  
  •             throw new SendRefuseException("queueName exchangeName can not be empty.");  
  •         }

  • //      if (!eec.beBinded(exchangeName, queueName))  
  • //          eec.declareBinding(exchangeName, queueName);  

  •         byte[] eventContentBytes = null;  
  •         if (codecFactory == null) {  
  •             if (eventContent == null) {  
  •                 logger.warn("Find eventContent is null,are you sure...");  
  •             } else {  
  •                 throw new SendRefuseException(  
  •                         "codecFactory must not be null ,unless eventContent is null");  
  •             }
  •         } else {  
  •             try {  
  •                 eventContentBytes = codecFactory.serialize(eventContent);
  •             } catch (IOException e) {  
  •                 throw new SendRefuseException(e);  
  •             }
  •         }

  •         // 构造成Message  
  •         EventMessage msg = new EventMessage(queueName, exchangeName,  
  •                 eventContentBytes);
  •         try {  
  •             eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg);
  •         } catch (AmqpException e) {  
  •             logger.error("send event fail. Event Message : [" + eventContent + "]", e);  
  •             throw new SendRefuseException("send event fail", e);  
  •         }
  •     }
  • }
  注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
  然后我们再实现接受消息
  首先我们需要一个消费接口,所有的消费程序都实现这个类



[java] view plain copy
  

  • public interface EventProcesser {  
  •     public void process(Object e);  
  • }
  为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器



[java] view plain copy
  

  • /**
  • * MessageListenerAdapter的Pojo
  • * <p>消息处理适配器,主要功能:</p>
  • * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来</p>
  • * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p>
  • *  
  • */  
  • public class MessageAdapterHandler {  

  •     private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);  

  •     private ConcurrentMap<String, EventProcessorWrap> epwMap;  

  •     public MessageAdapterHandler() {  
  •         this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>();  
  •     }

  •     public void handleMessage(EventMessage eem) {  
  •         logger.debug("Receive an EventMessage: [" + eem + "]");  
  •         // 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值  
  •         if (eem == null) {  
  •             logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled.");  
  •             return;  
  •         }
  •         if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {  
  •             logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled.");  
  •             return;  
  •         }
  •         // 解码,并交给对应的EventHandle执行  
  •         EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());  
  •         if (eepw == null) {  
  •             logger.warn("Receive an EopEventMessage, but no processor can do it.");  
  •             return;  
  •         }
  •         try {  
  •             eepw.process(eem.getEventData());
  •         } catch (IOException e) {  
  •             logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e);  
  •             return;  
  •         }
  •     }

  •     protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) {  
  •         if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {  
  •             throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");  
  •         }
  •         EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);  
  •         EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw);  
  •         if (oldProcessorWrap != null) {  
  •             logger.warn("The processor of this queue and exchange exists, and the new one can't be add");  
  •         }
  •     }

  •     protected Set<String> getAllBinding() {  
  •         Set<String> keySet = epwMap.keySet();
  •         return keySet;  
  •     }

  •     protected static class EventProcessorWrap {  

  •         private CodecFactory codecFactory;  

  •         private EventProcesser eep;  

  •         protected EventProcessorWrap(CodecFactory codecFactory,  
  •                 EventProcesser eep) {
  •             this.codecFactory = codecFactory;  
  •             this.eep = eep;  
  •         }

  •         public void process(byte[] eventData) throws IOException{  
  •             Object obj = codecFactory.deSerialize(eventData);
  •             eep.process(obj);
  •         }
  •     }
  • }
  这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息



[java] view plain copy
  

  • public class MessageErrorHandler implements ErrorHandler{  

  •     private static final Logger logger = Logger.getLogger(MessageErrorHandler.class);  

  •     @Override  
  •     public void handleError(Throwable t) {  
  •         logger.error("RabbitMQ happen a error:" + t.getMessage(), t);  
  •     }

  • }
  接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息



[java] view plain copy
  

  • public class EventControlConfig {  

  •     private final static int DEFAULT_PORT = 5672;  

  •     private final static String DEFAULT_USERNAME = "guest";  

  •     private final static String DEFAULT_PASSWORD = "guest";  

  •     private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;  

  •     private static final int PREFETCH_SIZE = 1;  

  •     private String serverHost ;  

  •     private int port = DEFAULT_PORT;  

  •     private String username = DEFAULT_USERNAME;  

  •     private String password = DEFAULT_PASSWORD;  

  •     private String virtualHost;  

  •     /**
  •      * 和rabbitmq建立连接的超时时间
  •      */  
  •     private int connectionTimeout = 0;  

  •     /**
  •      * 事件消息处理线程数,默认是 CPU核数 * 2
  •      */  
  •     private int eventMsgProcessNum;  

  •     /**
  •      * 每次消费消息的预取值
  •      */  
  •     private int prefetchSize;  

  •     public EventControlConfig(String serverHost) {  
  •         this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory());  
  •     }

  •     public EventControlConfig(String serverHost, int port, String username,  
  •             String password, String virtualHost, int connectionTimeout,  
  •             int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) {  
  •         this.serverHost = serverHost;  
  •         this.port = port>0?port:DEFAULT_PORT;  
  •         this.username = username;  
  •         this.password = password;  
  •         this.virtualHost = virtualHost;  
  •         this.connectionTimeout = connectionTimeout>0?connectionTimeout:0;  
  •         this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM;  
  •         this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE;  
  •     }

  •     public String getServerHost() {  
  •         return serverHost;  
  •     }

  •     public int getPort() {  
  •         return port;  
  •     }

  •     public String getUsername() {  
  •         return username;  
  •     }

  •     public String getPassword() {  
  •         return password;  
  •     }

  •     public String getVirtualHost() {  
  •         return virtualHost;  
  •     }

  •     public int getConnectionTimeout() {  
  •         return connectionTimeout;  
  •     }

  •     public int getEventMsgProcessNum() {  
  •         return eventMsgProcessNum;  
  •     }

  •     public int getPrefetchSize() {  
  •         return prefetchSize;  
  •     }

  • }
  
具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信



[java] view plain copy
  

  • public interface EventController {  

  •     /**
  •      * 控制器启动方法
  •      */  
  •     void start();  

  •     /**
  •      * 获取发送模版
  •      */  
  •     EventTemplate getEopEventTemplate();

  •     /**
  •      * 绑定消费程序到对应的exchange和queue
  •      */  
  •     EventController add(String queueName, String exchangeName, EventProcesser eventProcesser);

  •     /*in map, the key is queue name, but value is exchange name*/  
  •     EventController add(Map<String,String> bindings, EventProcesser eventProcesser);

  • }
  它的实现类如下:



[java] view plain copy
  

  • /**
  • * 和rabbitmq通信的控制器,主要负责:
  • * <p>1、和rabbitmq建立连接</p>
  • * <p>2、声明exChange和queue以及它们的绑定关系</p>
  • * <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p>
  • * <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p>
  • * @author yangyong
  • *
  • */  
  • public class DefaultEventController implements EventController {  

  •     private CachingConnectionFactory rabbitConnectionFactory;  

  •     private EventControlConfig config;  

  •     private RabbitAdmin rabbitAdmin;  

  •     private CodecFactory defaultCodecFactory = new HessionCodecFactory();  

  •     private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container  

  •     private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler();  

  •     private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定  
  •     //queue cache, key is exchangeName  
  •     private Map<String, DirectExchange> exchanges = new HashMap<String,DirectExchange>();  
  •     //queue cache, key is queueName  
  •     private Map<String, Queue> queues = new HashMap<String, Queue>();  
  •     //bind relation of queue to exchange cache, value is exchangeName | queueName  
  •     private Set<String> binded = new HashSet<String>();  

  •     private EventTemplate eventTemplate; // 给App使用的Event发送客户端  

  •     private AtomicBoolean isStarted = new AtomicBoolean(false);  

  •     private static DefaultEventController defaultEventController;  

  •     public synchronized static DefaultEventController getInstance(EventControlConfig config){  
  •         if(defaultEventController==null){  
  •             defaultEventController = new DefaultEventController(config);  
  •         }
  •         return defaultEventController;  
  •     }

  •     private DefaultEventController(EventControlConfig config){  
  •         if (config == null) {  
  •             throw new IllegalArgumentException("Config can not be null.");  
  •         }
  •         this.config = config;  
  •         initRabbitConnectionFactory();
  •         // 初始化AmqpAdmin  
  •         rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);  
  •         // 初始化RabbitTemplate  
  •         RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);  
  •         rabbitTemplate.setMessageConverter(serializerMessageConverter);
  •         eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this);  
  •     }

  •     /**
  •      * 初始化rabbitmq连接
  •      */  
  •     private void initRabbitConnectionFactory() {  
  •         rabbitConnectionFactory = new CachingConnectionFactory();  
  •         rabbitConnectionFactory.setHost(config.getServerHost());
  •         rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum());
  •         rabbitConnectionFactory.setPort(config.getPort());
  •         rabbitConnectionFactory.setUsername(config.getUsername());
  •         rabbitConnectionFactory.setPassword(config.getPassword());
  •         if (!StringUtils.isEmpty(config.getVirtualHost())) {  
  •             rabbitConnectionFactory.setVirtualHost(config.getVirtualHost());
  •         }
  •     }

  •     /**
  •      * 注销程序
  •      */  
  •     public synchronized void destroy() throws Exception {  
  •         if (!isStarted.get()) {  
  •             return;  
  •         }
  •         msgListenerContainer.stop();
  •         eventTemplate = null;  
  •         rabbitAdmin = null;  
  •         rabbitConnectionFactory.destroy();
  •     }

  •     @Override  
  •     public void start() {  
  •         if (isStarted.get()) {  
  •             return;  
  •         }
  •         Set<String> mapping = msgAdapterHandler.getAllBinding();
  •         for (String relation : mapping) {  
  •             String[] relaArr = relation.split("\\|");  
  •             declareBinding(relaArr[1], relaArr[0]);  
  •         }
  •         initMsgListenerAdapter();
  •         isStarted.set(true);  
  •     }

  •     /**
  •      * 初始化消息监听器容器
  •      */  
  •     private void initMsgListenerAdapter(){  
  •         MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter);  
  •         msgListenerContainer = new SimpleMessageListenerContainer();  
  •         msgListenerContainer.setConnectionFactory(rabbitConnectionFactory);
  •         msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
  •         msgListenerContainer.setMessageListener(listener);
  •         msgListenerContainer.setErrorHandler(new MessageErrorHandler());  
  •         msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 设置每个消费者消息的预取值  
  •         msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum());
  •         msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数  
  •         msgListenerContainer.setQueues(queues.values().toArray(new Queue[queues.size()]));  
  •         msgListenerContainer.start();
  •     }

  •     @Override  
  •     public EventTemplate getEopEventTemplate() {  
  •         return eventTemplate;  
  •     }

  •     @Override  
  •     public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) {  
  •         return add(queueName, exchangeName, eventProcesser, defaultCodecFactory);  
  •     }

  •     public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) {  
  •         msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
  •         if(isStarted.get()){  
  •             initMsgListenerAdapter();
  •         }
  •         return this;  
  •     }

  •     @Override  
  •     public EventController add(Map<String, String> bindings,  
  •             EventProcesser eventProcesser) {
  •         return add(bindings, eventProcesser,defaultCodecFactory);  
  •     }

  •     public EventController add(Map<String, String> bindings,  
  •             EventProcesser eventProcesser, CodecFactory codecFactory) {
  •         for(Map.Entry<String, String> item: bindings.entrySet())   
  •             msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory);
  •         return this;  
  •     }

  •     /**
  •      * exchange和queue是否已经绑定
  •      */  
  •     protected boolean beBinded(String exchangeName, String queueName) {  
  •         return binded.contains(exchangeName+"|"+queueName);  
  •     }

  •     /**
  •      * 声明exchange和queue已经它们的绑定关系
  •      */  
  •     protected synchronized void declareBinding(String exchangeName, String queueName) {  
  •         String bindRelation = exchangeName+"|"+queueName;  
  •         if (binded.contains(bindRelation)) return;  

  •         boolean needBinding = false;  
  •         DirectExchange directExchange = exchanges.get(exchangeName);
  •         if(directExchange == null) {  
  •             directExchange = new DirectExchange(exchangeName, true, false, null);  
  •             exchanges.put(exchangeName, directExchange);
  •             rabbitAdmin.declareExchange(directExchange);//声明exchange  
  •             needBinding = true;  
  •         }

  •         Queue queue = queues.get(queueName);
  •         if(queue == null) {  
  •             queue = new Queue(queueName, true, false, false);  
  •             queues.put(queueName, queue);
  •             rabbitAdmin.declareQueue(queue);    //声明queue  
  •             needBinding = true;  
  •         }

  •         if(needBinding) {  
  •             Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange  
  •             rabbitAdmin.declareBinding(binding);//声明绑定关系  
  •             binded.add(bindRelation);
  •         }
  •     }

  • }
  搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO



[java] view plain copy
  

  • @SuppressWarnings("serial")  
  • public class People implements Serializable{  
  •     private int id;  
  •     private String name;  
  •     private boolean male;  
  •     private People spouse;  
  •     private List<People> friends;  
  •     public int getId() {  
  •         return id;  
  •     }
  •     public void setId(int id) {  
  •         this.id = id;  
  •     }
  •     public String getName() {  
  •         return name;  
  •     }
  •     public void setName(String name) {  
  •         this.name = name;  
  •     }
  •     public boolean isMale() {  
  •         return male;  
  •     }
  •     public void setMale(boolean male) {  
  •         this.male = male;  
  •     }
  •     public People getSpouse() {  
  •         return spouse;  
  •     }
  •     public void setSpouse(People spouse) {  
  •         this.spouse = spouse;  
  •     }
  •     public List<People> getFriends() {  
  •         return friends;  
  •     }
  •     public void setFriends(List<People> friends) {  
  •         this.friends = friends;  
  •     }

  •     @Override  
  •     public String toString() {  
  •         // TODO Auto-generated method stub  
  •         return "People[id="+id+",name="+name+",male="+male+"]";  
  •     }
  • }
  建立单元测试
  





  • public class RabbitMqTest{  

  •     private String defaultHost = "127.0.0.1";  

  •     private String defaultExchange = "EXCHANGE_DIRECT_TEST";  

  •     private String defaultQueue = "QUEUE_TEST";  

  •     private DefaultEventController controller;  

  •     private EventTemplate eventTemplate;  

  •     @Before  
  •     public void init() throws IOException{  
  •         EventControlConfig config = new EventControlConfig(defaultHost);  
  •         controller = DefaultEventController.getInstance(config);
  •         eventTemplate = controller.getEopEventTemplate();
  •         controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor());  
  •         controller.start();
  •     }

  •     @Test  
  •     public void sendString() throws SendRefuseException{  
  •         eventTemplate.send(defaultQueue, defaultExchange, "hello world");  
  •     }

  •     @Test  
  •     public void sendObject() throws SendRefuseException{  
  •         eventTemplate.send(defaultQueue, defaultExchange, mockObj());
  •     }

  •     @Test  
  •     public void sendTemp() throws SendRefuseException, InterruptedException{  
  •         String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange  
  •         String tempQueue = "QUEUE_TEST_TEMP";//以前未声明的queue  
  •         eventTemplate.send(tempQueue, tempExchange, mockObj());
  •         //发送成功后此时不会接受到消息,还需要绑定对应的消费程序  
  •         controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor());  
  •     }

  •     @After  
  •     public void end() throws InterruptedException{  
  •         Thread.sleep(2000);  
  •     }

  •     private People mockObj(){  
  •         People jack = new People();  
  •         jack.setId(1);  
  •         jack.setName("JACK");  
  •         jack.setMale(true);  

  •         List<People> friends = new ArrayList<>();  
  •         friends.add(jack);
  •         People hanMeiMei = new People();  
  •         hanMeiMei.setId(1);  
  •         hanMeiMei.setName("韩梅梅");  
  •         hanMeiMei.setMale(false);  
  •         hanMeiMei.setFriends(friends);

  •         People liLei = new People();  
  •         liLei.setId(2);  
  •         liLei.setName("李雷");  
  •         liLei.setMale(true);  
  •         liLei.setFriends(friends);
  •         liLei.setSpouse(hanMeiMei);
  •         hanMeiMei.setSpouse(liLei);
  •         return hanMeiMei;  
  •     }

  •     class ApiProcessEventProcessor implements EventProcesser{  
  •         @Override  
  •         public void process(Object e) {//消费程序这里只是打印信息  
  •             Assert.assertNotNull(e);
  •             System.out.println(e);
  •             if(e instanceof People){  
  •                 People people = (People)e;
  •                 System.out.println(people.getSpouse());
  •                 System.out.println(people.getFriends());
  •             }
  •         }
  •     }
  • }
  
源码地址请点击这里

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390806-1-1.html 上篇帖子: 46. Permutations 下篇帖子: c++ 图解快速排序算法
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表