lx86 发表于 2017-7-4 22:03:51

【转载】java实现rabbitmq消息的发送接受

  原文地址:http://blog.csdn.net/sdyy321/article/details/9241445
  本文不介绍amqp和rabbitmq相关知识,请自行网上查阅
  本文是基于spring-rabbit中间件来实现消息的发送接受功能
  see http://www.rabbitmq.com/tutorials/tutorial-one-Java.html
  see http://www.springsource.org/spring-amqp



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]<!-- for rabbitmq -->
[*]    <dependency>
[*]      <groupId>com.rabbitmq</groupId>
[*]      <artifactId>amqp-client</artifactId>
[*]      <version>2.8.2</version>
[*]    </dependency>
[*]    <dependency>
[*]      <groupId>org.springframework.amqp</groupId>
[*]      <artifactId>spring-amqp</artifactId>
[*]      <version>1.1.1.RELEASE</version>
[*]    </dependency>
[*]    <dependency>
[*]      <groupId>org.springframework.amqp</groupId>
[*]      <artifactId>spring-rabbit</artifactId>
[*]      <version>1.1.1.RELEASE</version>
[*]    </dependency>
[*]    <dependency>
[*]      <groupId>com.caucho</groupId>
[*]      <artifactId>hessian</artifactId>
[*]      <version>4.0.7</version>
[*]    </dependency>
[*]</dependencies>
  首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public class EventMessage implements Serializable{
[*]
[*]    private String queueName;
[*]
[*]    private String exchangeName;
[*]
[*]    private byte[] eventData;
[*]
[*]    public EventMessage(String queueName, String exchangeName, byte[] eventData) {
[*]      this.queueName = queueName;
[*]      this.exchangeName = exchangeName;
[*]      this.eventData = eventData;
[*]    }
[*]
[*]    public EventMessage() {
[*]    }
[*]
[*]    public String getQueueName() {
[*]      return queueName;
[*]    }
[*]
[*]    public String getExchangeName() {
[*]      return exchangeName;
[*]    }
[*]
[*]    public byte[] getEventData() {
[*]      return eventData;
[*]    }
[*]
[*]    @Override
[*]    public String toString() {
[*]      return "EopEventMessage [queueName=" + queueName + ", exchangeName="
[*]                + exchangeName + ", eventData=" + Arrays.toString(eventData)
[*]                + "]";
[*]    }
[*]}
  为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public interface CodecFactory {
[*]
[*]    byte[] serialize(Object obj) throws IOException;
[*]
[*]    Object deSerialize(byte[] in) throws IOException;
[*]
[*]}
  下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public class HessionCodecFactory implements CodecFactory {
[*]
[*]    private final Logger logger = Logger.getLogger(HessionCodecFactory.class);
[*]
[*]    @Override
[*]    public byte[] serialize(Object obj) throws IOException {
[*]      ByteArrayOutputStream baos = null;
[*]      HessianOutput output = null;
[*]      try {
[*]            baos = new ByteArrayOutputStream(1024);
[*]            output = new HessianOutput(baos);
[*]            output.startCall();
[*]            output.writeObject(obj);
[*]            output.completeCall();
[*]      } catch (final IOException ex) {
[*]            throw ex;
[*]      } finally {
[*]            if (output != null) {
[*]                try {
[*]                  baos.close();
[*]                } catch (final IOException ex) {
[*]                  this.logger.error("Failed to close stream.", ex);
[*]                }
[*]            }
[*]      }
[*]      return baos != null ? baos.toByteArray() : null;
[*]    }
[*]
[*]    @Override
[*]    public Object deSerialize(byte[] in) throws IOException {
[*]      Object obj = null;
[*]      ByteArrayInputStream bais = null;
[*]      HessianInput input = null;
[*]      try {
[*]            bais = new ByteArrayInputStream(in);
[*]            input = new HessianInput(bais);
[*]            input.startReply();
[*]            obj = input.readObject();
[*]            input.completeReply();
[*]      } catch (final IOException ex) {
[*]            throw ex;
[*]      } catch (final Throwable e) {
[*]            this.logger.error("Failed to decode object.", e);
[*]      } finally {
[*]            if (input != null) {
[*]                try {
[*]                  bais.close();
[*]                } catch (final IOException ex) {
[*]                  this.logger.error("Failed to close stream.", ex);
[*]                }
[*]            }
[*]      }
[*]      return obj;
[*]    }
[*]
[*]}
  接下来就先实现发送功能,新增一个接口专门用来实现发送功能



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public interface EventTemplate {
[*]
[*]    void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;
[*]
[*]    void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;
[*]}
  SendRefuseException是自定义的发送失败异常类
  下面是它的实现类,主要的任务就是将数据转换为EventMessage



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public class DefaultEventTemplate implements EventTemplate {
[*]
[*]    private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);
[*]
[*]    private AmqpTemplate eventAmqpTemplate;
[*]
[*]    private CodecFactory defaultCodecFactory;
[*]
[*]//private DefaultEventController eec;
[*]//
[*]//public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,
[*]//          CodecFactory defaultCodecFactory, DefaultEventController eec) {
[*]//      this.eventAmqpTemplate = eopAmqpTemplate;
[*]//      this.defaultCodecFactory = defaultCodecFactory;
[*]//      this.eec = eec;
[*]//}
[*]
[*]    public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {
[*]      this.eventAmqpTemplate = eopAmqpTemplate;
[*]      this.defaultCodecFactory = defaultCodecFactory;
[*]    }
[*]
[*]    @Override
[*]    public void send(String queueName, String exchangeName, Object eventContent)
[*]            throws SendRefuseException {
[*]      this.send(queueName, exchangeName, eventContent, defaultCodecFactory);
[*]    }
[*]
[*]    @Override
[*]    public void send(String queueName, String exchangeName, Object eventContent,
[*]            CodecFactory codecFactory) throws SendRefuseException {
[*]      if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {
[*]            throw new SendRefuseException("queueName exchangeName can not be empty.");
[*]      }
[*]
[*]//      if (!eec.beBinded(exchangeName, queueName))
[*]//          eec.declareBinding(exchangeName, queueName);
[*]
[*]      byte[] eventContentBytes = null;
[*]      if (codecFactory == null) {
[*]            if (eventContent == null) {
[*]                logger.warn("Find eventContent is null,are you sure...");
[*]            } else {
[*]                throw new SendRefuseException(
[*]                        "codecFactory must not be null ,unless eventContent is null");
[*]            }
[*]      } else {
[*]            try {
[*]                eventContentBytes = codecFactory.serialize(eventContent);
[*]            } catch (IOException e) {
[*]                throw new SendRefuseException(e);
[*]            }
[*]      }
[*]
[*]      // 构造成Message
[*]      EventMessage msg = new EventMessage(queueName, exchangeName,
[*]                eventContentBytes);
[*]      try {
[*]            eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg);
[*]      } catch (AmqpException e) {
[*]            logger.error("send event fail. Event Message : [" + eventContent + "]", e);
[*]            throw new SendRefuseException("send event fail", e);
[*]      }
[*]    }
[*]}
  注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
  然后我们再实现接受消息
  首先我们需要一个消费接口,所有的消费程序都实现这个类



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public interface EventProcesser {
[*]    public void process(Object e);
[*]}
  为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]/**
[*] * MessageListenerAdapter的Pojo
[*] * <p>消息处理适配器,主要功能:</p>
[*] * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来</p>
[*] * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p>
[*] *
[*] */
[*]public class MessageAdapterHandler {
[*]
[*]    private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);
[*]
[*]    private ConcurrentMap<String, EventProcessorWrap> epwMap;
[*]
[*]    public MessageAdapterHandler() {
[*]      this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>();
[*]    }
[*]
[*]    public void handleMessage(EventMessage eem) {
[*]      logger.debug("Receive an EventMessage: [" + eem + "]");
[*]      // 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值
[*]      if (eem == null) {
[*]            logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled.");
[*]            return;
[*]      }
[*]      if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {
[*]            logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled.");
[*]            return;
[*]      }
[*]      // 解码,并交给对应的EventHandle执行
[*]      EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());
[*]      if (eepw == null) {
[*]            logger.warn("Receive an EopEventMessage, but no processor can do it.");
[*]            return;
[*]      }
[*]      try {
[*]            eepw.process(eem.getEventData());
[*]      } catch (IOException e) {
[*]            logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e);
[*]            return;
[*]      }
[*]    }
[*]
[*]    protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) {
[*]      if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {
[*]            throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");
[*]      }
[*]      EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);
[*]      EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw);
[*]      if (oldProcessorWrap != null) {
[*]            logger.warn("The processor of this queue and exchange exists, and the new one can't be add");
[*]      }
[*]    }
[*]
[*]    protected Set<String> getAllBinding() {
[*]      Set<String> keySet = epwMap.keySet();
[*]      return keySet;
[*]    }
[*]
[*]    protected static class EventProcessorWrap {
[*]
[*]      private CodecFactory codecFactory;
[*]
[*]      private EventProcesser eep;
[*]
[*]      protected EventProcessorWrap(CodecFactory codecFactory,
[*]                EventProcesser eep) {
[*]            this.codecFactory = codecFactory;
[*]            this.eep = eep;
[*]      }
[*]
[*]      public void process(byte[] eventData) throws IOException{
[*]            Object obj = codecFactory.deSerialize(eventData);
[*]            eep.process(obj);
[*]      }
[*]    }
[*]}
  这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public class MessageErrorHandler implements ErrorHandler{
[*]
[*]    private static final Logger logger = Logger.getLogger(MessageErrorHandler.class);
[*]
[*]    @Override
[*]    public void handleError(Throwable t) {
[*]      logger.error("RabbitMQ happen a error:" + t.getMessage(), t);
[*]    }
[*]
[*]}
  接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public class EventControlConfig {
[*]
[*]    private final static int DEFAULT_PORT = 5672;
[*]
[*]    private final static String DEFAULT_USERNAME = "guest";
[*]
[*]    private final static String DEFAULT_PASSWORD = "guest";
[*]
[*]    private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;
[*]
[*]    private static final int PREFETCH_SIZE = 1;
[*]
[*]    private String serverHost ;
[*]
[*]    private int port = DEFAULT_PORT;
[*]
[*]    private String username = DEFAULT_USERNAME;
[*]
[*]    private String password = DEFAULT_PASSWORD;
[*]
[*]    private String virtualHost;
[*]
[*]    /**
[*]   * 和rabbitmq建立连接的超时时间
[*]   */
[*]    private int connectionTimeout = 0;
[*]
[*]    /**
[*]   * 事件消息处理线程数,默认是 CPU核数 * 2
[*]   */
[*]    private int eventMsgProcessNum;
[*]
[*]    /**
[*]   * 每次消费消息的预取值
[*]   */
[*]    private int prefetchSize;
[*]
[*]    public EventControlConfig(String serverHost) {
[*]      this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory());
[*]    }
[*]
[*]    public EventControlConfig(String serverHost, int port, String username,
[*]            String password, String virtualHost, int connectionTimeout,
[*]            int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) {
[*]      this.serverHost = serverHost;
[*]      this.port = port>0?port:DEFAULT_PORT;
[*]      this.username = username;
[*]      this.password = password;
[*]      this.virtualHost = virtualHost;
[*]      this.connectionTimeout = connectionTimeout>0?connectionTimeout:0;
[*]      this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM;
[*]      this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE;
[*]    }
[*]
[*]    public String getServerHost() {
[*]      return serverHost;
[*]    }
[*]
[*]    public int getPort() {
[*]      return port;
[*]    }
[*]
[*]    public String getUsername() {
[*]      return username;
[*]    }
[*]
[*]    public String getPassword() {
[*]      return password;
[*]    }
[*]
[*]    public String getVirtualHost() {
[*]      return virtualHost;
[*]    }
[*]
[*]    public int getConnectionTimeout() {
[*]      return connectionTimeout;
[*]    }
[*]
[*]    public int getEventMsgProcessNum() {
[*]      return eventMsgProcessNum;
[*]    }
[*]
[*]    public int getPrefetchSize() {
[*]      return prefetchSize;
[*]    }
[*]
[*]}
  
具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public interface EventController {
[*]
[*]    /**
[*]   * 控制器启动方法
[*]   */
[*]    void start();
[*]
[*]    /**
[*]   * 获取发送模版
[*]   */
[*]    EventTemplate getEopEventTemplate();
[*]
[*]    /**
[*]   * 绑定消费程序到对应的exchange和queue
[*]   */
[*]    EventController add(String queueName, String exchangeName, EventProcesser eventProcesser);
[*]
[*]    /*in map, the key is queue name, but value is exchange name*/
[*]    EventController add(Map<String,String> bindings, EventProcesser eventProcesser);
[*]
[*]}
  它的实现类如下:



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]/**
[*] * 和rabbitmq通信的控制器,主要负责:
[*] * <p>1、和rabbitmq建立连接</p>
[*] * <p>2、声明exChange和queue以及它们的绑定关系</p>
[*] * <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p>
[*] * <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p>
[*] * @author yangyong
[*] *
[*] */
[*]public class DefaultEventController implements EventController {
[*]
[*]    private CachingConnectionFactory rabbitConnectionFactory;
[*]
[*]    private EventControlConfig config;
[*]
[*]    private RabbitAdmin rabbitAdmin;
[*]
[*]    private CodecFactory defaultCodecFactory = new HessionCodecFactory();
[*]
[*]    private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container
[*]
[*]    private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler();
[*]
[*]    private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定
[*]    //queue cache, key is exchangeName
[*]    private Map<String, DirectExchange> exchanges = new HashMap<String,DirectExchange>();
[*]    //queue cache, key is queueName
[*]    private Map<String, Queue> queues = new HashMap<String, Queue>();
[*]    //bind relation of queue to exchange cache, value is exchangeName | queueName
[*]    private Set<String> binded = new HashSet<String>();
[*]
[*]    private EventTemplate eventTemplate; // 给App使用的Event发送客户端
[*]
[*]    private AtomicBoolean isStarted = new AtomicBoolean(false);
[*]
[*]    private static DefaultEventController defaultEventController;
[*]
[*]    public synchronized static DefaultEventController getInstance(EventControlConfig config){
[*]      if(defaultEventController==null){
[*]            defaultEventController = new DefaultEventController(config);
[*]      }
[*]      return defaultEventController;
[*]    }
[*]
[*]    private DefaultEventController(EventControlConfig config){
[*]      if (config == null) {
[*]            throw new IllegalArgumentException("Config can not be null.");
[*]      }
[*]      this.config = config;
[*]      initRabbitConnectionFactory();
[*]      // 初始化AmqpAdmin
[*]      rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
[*]      // 初始化RabbitTemplate
[*]      RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
[*]      rabbitTemplate.setMessageConverter(serializerMessageConverter);
[*]      eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this);
[*]    }
[*]
[*]    /**
[*]   * 初始化rabbitmq连接
[*]   */
[*]    private void initRabbitConnectionFactory() {
[*]      rabbitConnectionFactory = new CachingConnectionFactory();
[*]      rabbitConnectionFactory.setHost(config.getServerHost());
[*]      rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum());
[*]      rabbitConnectionFactory.setPort(config.getPort());
[*]      rabbitConnectionFactory.setUsername(config.getUsername());
[*]      rabbitConnectionFactory.setPassword(config.getPassword());
[*]      if (!StringUtils.isEmpty(config.getVirtualHost())) {
[*]            rabbitConnectionFactory.setVirtualHost(config.getVirtualHost());
[*]      }
[*]    }
[*]
[*]    /**
[*]   * 注销程序
[*]   */
[*]    public synchronized void destroy() throws Exception {
[*]      if (!isStarted.get()) {
[*]            return;
[*]      }
[*]      msgListenerContainer.stop();
[*]      eventTemplate = null;
[*]      rabbitAdmin = null;
[*]      rabbitConnectionFactory.destroy();
[*]    }
[*]
[*]    @Override
[*]    public void start() {
[*]      if (isStarted.get()) {
[*]            return;
[*]      }
[*]      Set<String> mapping = msgAdapterHandler.getAllBinding();
[*]      for (String relation : mapping) {
[*]            String[] relaArr = relation.split("\\|");
[*]            declareBinding(relaArr, relaArr);
[*]      }
[*]      initMsgListenerAdapter();
[*]      isStarted.set(true);
[*]    }
[*]
[*]    /**
[*]   * 初始化消息监听器容器
[*]   */
[*]    private void initMsgListenerAdapter(){
[*]      MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter);
[*]      msgListenerContainer = new SimpleMessageListenerContainer();
[*]      msgListenerContainer.setConnectionFactory(rabbitConnectionFactory);
[*]      msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
[*]      msgListenerContainer.setMessageListener(listener);
[*]      msgListenerContainer.setErrorHandler(new MessageErrorHandler());
[*]      msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 设置每个消费者消息的预取值
[*]      msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum());
[*]      msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数
[*]      msgListenerContainer.setQueues(queues.values().toArray(new Queue));
[*]      msgListenerContainer.start();
[*]    }
[*]
[*]    @Override
[*]    public EventTemplate getEopEventTemplate() {
[*]      return eventTemplate;
[*]    }
[*]
[*]    @Override
[*]    public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) {
[*]      return add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
[*]    }
[*]
[*]    public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) {
[*]      msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
[*]      if(isStarted.get()){
[*]            initMsgListenerAdapter();
[*]      }
[*]      return this;
[*]    }
[*]
[*]    @Override
[*]    public EventController add(Map<String, String> bindings,
[*]            EventProcesser eventProcesser) {
[*]      return add(bindings, eventProcesser,defaultCodecFactory);
[*]    }
[*]
[*]    public EventController add(Map<String, String> bindings,
[*]            EventProcesser eventProcesser, CodecFactory codecFactory) {
[*]      for(Map.Entry<String, String> item: bindings.entrySet())   
[*]            msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory);
[*]      return this;
[*]    }
[*]
[*]    /**
[*]   * exchange和queue是否已经绑定
[*]   */
[*]    protected boolean beBinded(String exchangeName, String queueName) {
[*]      return binded.contains(exchangeName+"|"+queueName);
[*]    }
[*]
[*]    /**
[*]   * 声明exchange和queue已经它们的绑定关系
[*]   */
[*]    protected synchronized void declareBinding(String exchangeName, String queueName) {
[*]      String bindRelation = exchangeName+"|"+queueName;
[*]      if (binded.contains(bindRelation)) return;
[*]
[*]      boolean needBinding = false;
[*]      DirectExchange directExchange = exchanges.get(exchangeName);
[*]      if(directExchange == null) {
[*]            directExchange = new DirectExchange(exchangeName, true, false, null);
[*]            exchanges.put(exchangeName, directExchange);
[*]            rabbitAdmin.declareExchange(directExchange);//声明exchange
[*]            needBinding = true;
[*]      }
[*]
[*]      Queue queue = queues.get(queueName);
[*]      if(queue == null) {
[*]            queue = new Queue(queueName, true, false, false);
[*]            queues.put(queueName, queue);
[*]            rabbitAdmin.declareQueue(queue);    //声明queue
[*]            needBinding = true;
[*]      }
[*]
[*]      if(needBinding) {
[*]            Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange
[*]            rabbitAdmin.declareBinding(binding);//声明绑定关系
[*]            binded.add(bindRelation);
[*]      }
[*]    }
[*]
[*]}
  搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO



view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]@SuppressWarnings("serial")
[*]public class People implements Serializable{
[*]    private int id;
[*]    private String name;
[*]    private boolean male;
[*]    private People spouse;
[*]    private List<People> friends;
[*]    public int getId() {
[*]      return id;
[*]    }
[*]    public void setId(int id) {
[*]      this.id = id;
[*]    }
[*]    public String getName() {
[*]      return name;
[*]    }
[*]    public void setName(String name) {
[*]      this.name = name;
[*]    }
[*]    public boolean isMale() {
[*]      return male;
[*]    }
[*]    public void setMale(boolean male) {
[*]      this.male = male;
[*]    }
[*]    public People getSpouse() {
[*]      return spouse;
[*]    }
[*]    public void setSpouse(People spouse) {
[*]      this.spouse = spouse;
[*]    }
[*]    public List<People> getFriends() {
[*]      return friends;
[*]    }
[*]    public void setFriends(List<People> friends) {
[*]      this.friends = friends;
[*]    }
[*]
[*]    @Override
[*]    public String toString() {
[*]      // TODO Auto-generated method stub
[*]      return "People";
[*]    }
[*]}
  建立单元测试
  




https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public class RabbitMqTest{
[*]
[*]    private String defaultHost = "127.0.0.1";
[*]
[*]    private String defaultExchange = "EXCHANGE_DIRECT_TEST";
[*]
[*]    private String defaultQueue = "QUEUE_TEST";
[*]
[*]    private DefaultEventController controller;
[*]
[*]    private EventTemplate eventTemplate;
[*]
[*]    @Before
[*]    public void init() throws IOException{
[*]      EventControlConfig config = new EventControlConfig(defaultHost);
[*]      controller = DefaultEventController.getInstance(config);
[*]      eventTemplate = controller.getEopEventTemplate();
[*]      controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor());
[*]      controller.start();
[*]    }
[*]
[*]    @Test
[*]    public void sendString() throws SendRefuseException{
[*]      eventTemplate.send(defaultQueue, defaultExchange, "hello world");
[*]    }
[*]
[*]    @Test
[*]    public void sendObject() throws SendRefuseException{
[*]      eventTemplate.send(defaultQueue, defaultExchange, mockObj());
[*]    }
[*]
[*]    @Test
[*]    public void sendTemp() throws SendRefuseException, InterruptedException{
[*]      String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange
[*]      String tempQueue = "QUEUE_TEST_TEMP";//以前未声明的queue
[*]      eventTemplate.send(tempQueue, tempExchange, mockObj());
[*]      //发送成功后此时不会接受到消息,还需要绑定对应的消费程序
[*]      controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor());
[*]    }
[*]
[*]    @After
[*]    public void end() throws InterruptedException{
[*]      Thread.sleep(2000);
[*]    }
[*]
[*]    private People mockObj(){
[*]      People jack = new People();
[*]      jack.setId(1);
[*]      jack.setName("JACK");
[*]      jack.setMale(true);
[*]
[*]      List<People> friends = new ArrayList<>();
[*]      friends.add(jack);
[*]      People hanMeiMei = new People();
[*]      hanMeiMei.setId(1);
[*]      hanMeiMei.setName("韩梅梅");
[*]      hanMeiMei.setMale(false);
[*]      hanMeiMei.setFriends(friends);
[*]
[*]      People liLei = new People();
[*]      liLei.setId(2);
[*]      liLei.setName("李雷");
[*]      liLei.setMale(true);
[*]      liLei.setFriends(friends);
[*]      liLei.setSpouse(hanMeiMei);
[*]      hanMeiMei.setSpouse(liLei);
[*]      return hanMeiMei;
[*]    }
[*]
[*]    class ApiProcessEventProcessor implements EventProcesser{
[*]      @Override
[*]      public void process(Object e) {//消费程序这里只是打印信息
[*]            Assert.assertNotNull(e);
[*]            System.out.println(e);
[*]            if(e instanceof People){
[*]                People people = (People)e;
[*]                System.out.println(people.getSpouse());
[*]                System.out.println(people.getFriends());
[*]            }
[*]      }
[*]    }
[*]}
  
源码地址请点击这里
页: [1]
查看完整版本: 【转载】java实现rabbitmq消息的发送接受