【转载】java实现rabbitmq消息的发送接受
原文地址:http://blog.csdn.net/sdyy321/article/details/9241445本文不介绍amqp和rabbitmq相关知识,请自行网上查阅
本文是基于spring-rabbit中间件来实现消息的发送接受功能
see http://www.rabbitmq.com/tutorials/tutorial-one-Java.html
see http://www.springsource.org/spring-amqp
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]<!-- for rabbitmq -->
[*] <dependency>
[*] <groupId>com.rabbitmq</groupId>
[*] <artifactId>amqp-client</artifactId>
[*] <version>2.8.2</version>
[*] </dependency>
[*] <dependency>
[*] <groupId>org.springframework.amqp</groupId>
[*] <artifactId>spring-amqp</artifactId>
[*] <version>1.1.1.RELEASE</version>
[*] </dependency>
[*] <dependency>
[*] <groupId>org.springframework.amqp</groupId>
[*] <artifactId>spring-rabbit</artifactId>
[*] <version>1.1.1.RELEASE</version>
[*] </dependency>
[*] <dependency>
[*] <groupId>com.caucho</groupId>
[*] <artifactId>hessian</artifactId>
[*] <version>4.0.7</version>
[*] </dependency>
[*]</dependencies>
首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public class EventMessage implements Serializable{
[*]
[*] private String queueName;
[*]
[*] private String exchangeName;
[*]
[*] private byte[] eventData;
[*]
[*] public EventMessage(String queueName, String exchangeName, byte[] eventData) {
[*] this.queueName = queueName;
[*] this.exchangeName = exchangeName;
[*] this.eventData = eventData;
[*] }
[*]
[*] public EventMessage() {
[*] }
[*]
[*] public String getQueueName() {
[*] return queueName;
[*] }
[*]
[*] public String getExchangeName() {
[*] return exchangeName;
[*] }
[*]
[*] public byte[] getEventData() {
[*] return eventData;
[*] }
[*]
[*] @Override
[*] public String toString() {
[*] return "EopEventMessage [queueName=" + queueName + ", exchangeName="
[*] + exchangeName + ", eventData=" + Arrays.toString(eventData)
[*] + "]";
[*] }
[*]}
为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public interface CodecFactory {
[*]
[*] byte[] serialize(Object obj) throws IOException;
[*]
[*] Object deSerialize(byte[] in) throws IOException;
[*]
[*]}
下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public class HessionCodecFactory implements CodecFactory {
[*]
[*] private final Logger logger = Logger.getLogger(HessionCodecFactory.class);
[*]
[*] @Override
[*] public byte[] serialize(Object obj) throws IOException {
[*] ByteArrayOutputStream baos = null;
[*] HessianOutput output = null;
[*] try {
[*] baos = new ByteArrayOutputStream(1024);
[*] output = new HessianOutput(baos);
[*] output.startCall();
[*] output.writeObject(obj);
[*] output.completeCall();
[*] } catch (final IOException ex) {
[*] throw ex;
[*] } finally {
[*] if (output != null) {
[*] try {
[*] baos.close();
[*] } catch (final IOException ex) {
[*] this.logger.error("Failed to close stream.", ex);
[*] }
[*] }
[*] }
[*] return baos != null ? baos.toByteArray() : null;
[*] }
[*]
[*] @Override
[*] public Object deSerialize(byte[] in) throws IOException {
[*] Object obj = null;
[*] ByteArrayInputStream bais = null;
[*] HessianInput input = null;
[*] try {
[*] bais = new ByteArrayInputStream(in);
[*] input = new HessianInput(bais);
[*] input.startReply();
[*] obj = input.readObject();
[*] input.completeReply();
[*] } catch (final IOException ex) {
[*] throw ex;
[*] } catch (final Throwable e) {
[*] this.logger.error("Failed to decode object.", e);
[*] } finally {
[*] if (input != null) {
[*] try {
[*] bais.close();
[*] } catch (final IOException ex) {
[*] this.logger.error("Failed to close stream.", ex);
[*] }
[*] }
[*] }
[*] return obj;
[*] }
[*]
[*]}
接下来就先实现发送功能,新增一个接口专门用来实现发送功能
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public interface EventTemplate {
[*]
[*] void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;
[*]
[*] void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;
[*]}
SendRefuseException是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为EventMessage
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public class DefaultEventTemplate implements EventTemplate {
[*]
[*] private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);
[*]
[*] private AmqpTemplate eventAmqpTemplate;
[*]
[*] private CodecFactory defaultCodecFactory;
[*]
[*]//private DefaultEventController eec;
[*]//
[*]//public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,
[*]// CodecFactory defaultCodecFactory, DefaultEventController eec) {
[*]// this.eventAmqpTemplate = eopAmqpTemplate;
[*]// this.defaultCodecFactory = defaultCodecFactory;
[*]// this.eec = eec;
[*]//}
[*]
[*] public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {
[*] this.eventAmqpTemplate = eopAmqpTemplate;
[*] this.defaultCodecFactory = defaultCodecFactory;
[*] }
[*]
[*] @Override
[*] public void send(String queueName, String exchangeName, Object eventContent)
[*] throws SendRefuseException {
[*] this.send(queueName, exchangeName, eventContent, defaultCodecFactory);
[*] }
[*]
[*] @Override
[*] public void send(String queueName, String exchangeName, Object eventContent,
[*] CodecFactory codecFactory) throws SendRefuseException {
[*] if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {
[*] throw new SendRefuseException("queueName exchangeName can not be empty.");
[*] }
[*]
[*]// if (!eec.beBinded(exchangeName, queueName))
[*]// eec.declareBinding(exchangeName, queueName);
[*]
[*] byte[] eventContentBytes = null;
[*] if (codecFactory == null) {
[*] if (eventContent == null) {
[*] logger.warn("Find eventContent is null,are you sure...");
[*] } else {
[*] throw new SendRefuseException(
[*] "codecFactory must not be null ,unless eventContent is null");
[*] }
[*] } else {
[*] try {
[*] eventContentBytes = codecFactory.serialize(eventContent);
[*] } catch (IOException e) {
[*] throw new SendRefuseException(e);
[*] }
[*] }
[*]
[*] // 构造成Message
[*] EventMessage msg = new EventMessage(queueName, exchangeName,
[*] eventContentBytes);
[*] try {
[*] eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg);
[*] } catch (AmqpException e) {
[*] logger.error("send event fail. Event Message : [" + eventContent + "]", e);
[*] throw new SendRefuseException("send event fail", e);
[*] }
[*] }
[*]}
注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public interface EventProcesser {
[*] public void process(Object e);
[*]}
为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]/**
[*] * MessageListenerAdapter的Pojo
[*] * <p>消息处理适配器,主要功能:</p>
[*] * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来</p>
[*] * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p>
[*] *
[*] */
[*]public class MessageAdapterHandler {
[*]
[*] private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);
[*]
[*] private ConcurrentMap<String, EventProcessorWrap> epwMap;
[*]
[*] public MessageAdapterHandler() {
[*] this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>();
[*] }
[*]
[*] public void handleMessage(EventMessage eem) {
[*] logger.debug("Receive an EventMessage: [" + eem + "]");
[*] // 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值
[*] if (eem == null) {
[*] logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled.");
[*] return;
[*] }
[*] if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {
[*] logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled.");
[*] return;
[*] }
[*] // 解码,并交给对应的EventHandle执行
[*] EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());
[*] if (eepw == null) {
[*] logger.warn("Receive an EopEventMessage, but no processor can do it.");
[*] return;
[*] }
[*] try {
[*] eepw.process(eem.getEventData());
[*] } catch (IOException e) {
[*] logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e);
[*] return;
[*] }
[*] }
[*]
[*] protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) {
[*] if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {
[*] throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");
[*] }
[*] EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);
[*] EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw);
[*] if (oldProcessorWrap != null) {
[*] logger.warn("The processor of this queue and exchange exists, and the new one can't be add");
[*] }
[*] }
[*]
[*] protected Set<String> getAllBinding() {
[*] Set<String> keySet = epwMap.keySet();
[*] return keySet;
[*] }
[*]
[*] protected static class EventProcessorWrap {
[*]
[*] private CodecFactory codecFactory;
[*]
[*] private EventProcesser eep;
[*]
[*] protected EventProcessorWrap(CodecFactory codecFactory,
[*] EventProcesser eep) {
[*] this.codecFactory = codecFactory;
[*] this.eep = eep;
[*] }
[*]
[*] public void process(byte[] eventData) throws IOException{
[*] Object obj = codecFactory.deSerialize(eventData);
[*] eep.process(obj);
[*] }
[*] }
[*]}
这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public class MessageErrorHandler implements ErrorHandler{
[*]
[*] private static final Logger logger = Logger.getLogger(MessageErrorHandler.class);
[*]
[*] @Override
[*] public void handleError(Throwable t) {
[*] logger.error("RabbitMQ happen a error:" + t.getMessage(), t);
[*] }
[*]
[*]}
接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public class EventControlConfig {
[*]
[*] private final static int DEFAULT_PORT = 5672;
[*]
[*] private final static String DEFAULT_USERNAME = "guest";
[*]
[*] private final static String DEFAULT_PASSWORD = "guest";
[*]
[*] private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;
[*]
[*] private static final int PREFETCH_SIZE = 1;
[*]
[*] private String serverHost ;
[*]
[*] private int port = DEFAULT_PORT;
[*]
[*] private String username = DEFAULT_USERNAME;
[*]
[*] private String password = DEFAULT_PASSWORD;
[*]
[*] private String virtualHost;
[*]
[*] /**
[*] * 和rabbitmq建立连接的超时时间
[*] */
[*] private int connectionTimeout = 0;
[*]
[*] /**
[*] * 事件消息处理线程数,默认是 CPU核数 * 2
[*] */
[*] private int eventMsgProcessNum;
[*]
[*] /**
[*] * 每次消费消息的预取值
[*] */
[*] private int prefetchSize;
[*]
[*] public EventControlConfig(String serverHost) {
[*] this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory());
[*] }
[*]
[*] public EventControlConfig(String serverHost, int port, String username,
[*] String password, String virtualHost, int connectionTimeout,
[*] int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) {
[*] this.serverHost = serverHost;
[*] this.port = port>0?port:DEFAULT_PORT;
[*] this.username = username;
[*] this.password = password;
[*] this.virtualHost = virtualHost;
[*] this.connectionTimeout = connectionTimeout>0?connectionTimeout:0;
[*] this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM;
[*] this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE;
[*] }
[*]
[*] public String getServerHost() {
[*] return serverHost;
[*] }
[*]
[*] public int getPort() {
[*] return port;
[*] }
[*]
[*] public String getUsername() {
[*] return username;
[*] }
[*]
[*] public String getPassword() {
[*] return password;
[*] }
[*]
[*] public String getVirtualHost() {
[*] return virtualHost;
[*] }
[*]
[*] public int getConnectionTimeout() {
[*] return connectionTimeout;
[*] }
[*]
[*] public int getEventMsgProcessNum() {
[*] return eventMsgProcessNum;
[*] }
[*]
[*] public int getPrefetchSize() {
[*] return prefetchSize;
[*] }
[*]
[*]}
具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public interface EventController {
[*]
[*] /**
[*] * 控制器启动方法
[*] */
[*] void start();
[*]
[*] /**
[*] * 获取发送模版
[*] */
[*] EventTemplate getEopEventTemplate();
[*]
[*] /**
[*] * 绑定消费程序到对应的exchange和queue
[*] */
[*] EventController add(String queueName, String exchangeName, EventProcesser eventProcesser);
[*]
[*] /*in map, the key is queue name, but value is exchange name*/
[*] EventController add(Map<String,String> bindings, EventProcesser eventProcesser);
[*]
[*]}
它的实现类如下:
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]/**
[*] * 和rabbitmq通信的控制器,主要负责:
[*] * <p>1、和rabbitmq建立连接</p>
[*] * <p>2、声明exChange和queue以及它们的绑定关系</p>
[*] * <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p>
[*] * <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p>
[*] * @author yangyong
[*] *
[*] */
[*]public class DefaultEventController implements EventController {
[*]
[*] private CachingConnectionFactory rabbitConnectionFactory;
[*]
[*] private EventControlConfig config;
[*]
[*] private RabbitAdmin rabbitAdmin;
[*]
[*] private CodecFactory defaultCodecFactory = new HessionCodecFactory();
[*]
[*] private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container
[*]
[*] private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler();
[*]
[*] private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定
[*] //queue cache, key is exchangeName
[*] private Map<String, DirectExchange> exchanges = new HashMap<String,DirectExchange>();
[*] //queue cache, key is queueName
[*] private Map<String, Queue> queues = new HashMap<String, Queue>();
[*] //bind relation of queue to exchange cache, value is exchangeName | queueName
[*] private Set<String> binded = new HashSet<String>();
[*]
[*] private EventTemplate eventTemplate; // 给App使用的Event发送客户端
[*]
[*] private AtomicBoolean isStarted = new AtomicBoolean(false);
[*]
[*] private static DefaultEventController defaultEventController;
[*]
[*] public synchronized static DefaultEventController getInstance(EventControlConfig config){
[*] if(defaultEventController==null){
[*] defaultEventController = new DefaultEventController(config);
[*] }
[*] return defaultEventController;
[*] }
[*]
[*] private DefaultEventController(EventControlConfig config){
[*] if (config == null) {
[*] throw new IllegalArgumentException("Config can not be null.");
[*] }
[*] this.config = config;
[*] initRabbitConnectionFactory();
[*] // 初始化AmqpAdmin
[*] rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
[*] // 初始化RabbitTemplate
[*] RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
[*] rabbitTemplate.setMessageConverter(serializerMessageConverter);
[*] eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this);
[*] }
[*]
[*] /**
[*] * 初始化rabbitmq连接
[*] */
[*] private void initRabbitConnectionFactory() {
[*] rabbitConnectionFactory = new CachingConnectionFactory();
[*] rabbitConnectionFactory.setHost(config.getServerHost());
[*] rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum());
[*] rabbitConnectionFactory.setPort(config.getPort());
[*] rabbitConnectionFactory.setUsername(config.getUsername());
[*] rabbitConnectionFactory.setPassword(config.getPassword());
[*] if (!StringUtils.isEmpty(config.getVirtualHost())) {
[*] rabbitConnectionFactory.setVirtualHost(config.getVirtualHost());
[*] }
[*] }
[*]
[*] /**
[*] * 注销程序
[*] */
[*] public synchronized void destroy() throws Exception {
[*] if (!isStarted.get()) {
[*] return;
[*] }
[*] msgListenerContainer.stop();
[*] eventTemplate = null;
[*] rabbitAdmin = null;
[*] rabbitConnectionFactory.destroy();
[*] }
[*]
[*] @Override
[*] public void start() {
[*] if (isStarted.get()) {
[*] return;
[*] }
[*] Set<String> mapping = msgAdapterHandler.getAllBinding();
[*] for (String relation : mapping) {
[*] String[] relaArr = relation.split("\\|");
[*] declareBinding(relaArr, relaArr);
[*] }
[*] initMsgListenerAdapter();
[*] isStarted.set(true);
[*] }
[*]
[*] /**
[*] * 初始化消息监听器容器
[*] */
[*] private void initMsgListenerAdapter(){
[*] MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter);
[*] msgListenerContainer = new SimpleMessageListenerContainer();
[*] msgListenerContainer.setConnectionFactory(rabbitConnectionFactory);
[*] msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
[*] msgListenerContainer.setMessageListener(listener);
[*] msgListenerContainer.setErrorHandler(new MessageErrorHandler());
[*] msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 设置每个消费者消息的预取值
[*] msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum());
[*] msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数
[*] msgListenerContainer.setQueues(queues.values().toArray(new Queue));
[*] msgListenerContainer.start();
[*] }
[*]
[*] @Override
[*] public EventTemplate getEopEventTemplate() {
[*] return eventTemplate;
[*] }
[*]
[*] @Override
[*] public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) {
[*] return add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
[*] }
[*]
[*] public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) {
[*] msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
[*] if(isStarted.get()){
[*] initMsgListenerAdapter();
[*] }
[*] return this;
[*] }
[*]
[*] @Override
[*] public EventController add(Map<String, String> bindings,
[*] EventProcesser eventProcesser) {
[*] return add(bindings, eventProcesser,defaultCodecFactory);
[*] }
[*]
[*] public EventController add(Map<String, String> bindings,
[*] EventProcesser eventProcesser, CodecFactory codecFactory) {
[*] for(Map.Entry<String, String> item: bindings.entrySet())
[*] msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory);
[*] return this;
[*] }
[*]
[*] /**
[*] * exchange和queue是否已经绑定
[*] */
[*] protected boolean beBinded(String exchangeName, String queueName) {
[*] return binded.contains(exchangeName+"|"+queueName);
[*] }
[*]
[*] /**
[*] * 声明exchange和queue已经它们的绑定关系
[*] */
[*] protected synchronized void declareBinding(String exchangeName, String queueName) {
[*] String bindRelation = exchangeName+"|"+queueName;
[*] if (binded.contains(bindRelation)) return;
[*]
[*] boolean needBinding = false;
[*] DirectExchange directExchange = exchanges.get(exchangeName);
[*] if(directExchange == null) {
[*] directExchange = new DirectExchange(exchangeName, true, false, null);
[*] exchanges.put(exchangeName, directExchange);
[*] rabbitAdmin.declareExchange(directExchange);//声明exchange
[*] needBinding = true;
[*] }
[*]
[*] Queue queue = queues.get(queueName);
[*] if(queue == null) {
[*] queue = new Queue(queueName, true, false, false);
[*] queues.put(queueName, queue);
[*] rabbitAdmin.declareQueue(queue); //声明queue
[*] needBinding = true;
[*] }
[*]
[*] if(needBinding) {
[*] Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange
[*] rabbitAdmin.declareBinding(binding);//声明绑定关系
[*] binded.add(bindRelation);
[*] }
[*] }
[*]
[*]}
搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO
view plain copy
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]@SuppressWarnings("serial")
[*]public class People implements Serializable{
[*] private int id;
[*] private String name;
[*] private boolean male;
[*] private People spouse;
[*] private List<People> friends;
[*] public int getId() {
[*] return id;
[*] }
[*] public void setId(int id) {
[*] this.id = id;
[*] }
[*] public String getName() {
[*] return name;
[*] }
[*] public void setName(String name) {
[*] this.name = name;
[*] }
[*] public boolean isMale() {
[*] return male;
[*] }
[*] public void setMale(boolean male) {
[*] this.male = male;
[*] }
[*] public People getSpouse() {
[*] return spouse;
[*] }
[*] public void setSpouse(People spouse) {
[*] this.spouse = spouse;
[*] }
[*] public List<People> getFriends() {
[*] return friends;
[*] }
[*] public void setFriends(List<People> friends) {
[*] this.friends = friends;
[*] }
[*]
[*] @Override
[*] public String toString() {
[*] // TODO Auto-generated method stub
[*] return "People";
[*] }
[*]}
建立单元测试
https://code.csdn.net/assets/CODE_ico.pnghttps://code.csdn.net/assets/ico_fork.svg
[*]public class RabbitMqTest{
[*]
[*] private String defaultHost = "127.0.0.1";
[*]
[*] private String defaultExchange = "EXCHANGE_DIRECT_TEST";
[*]
[*] private String defaultQueue = "QUEUE_TEST";
[*]
[*] private DefaultEventController controller;
[*]
[*] private EventTemplate eventTemplate;
[*]
[*] @Before
[*] public void init() throws IOException{
[*] EventControlConfig config = new EventControlConfig(defaultHost);
[*] controller = DefaultEventController.getInstance(config);
[*] eventTemplate = controller.getEopEventTemplate();
[*] controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor());
[*] controller.start();
[*] }
[*]
[*] @Test
[*] public void sendString() throws SendRefuseException{
[*] eventTemplate.send(defaultQueue, defaultExchange, "hello world");
[*] }
[*]
[*] @Test
[*] public void sendObject() throws SendRefuseException{
[*] eventTemplate.send(defaultQueue, defaultExchange, mockObj());
[*] }
[*]
[*] @Test
[*] public void sendTemp() throws SendRefuseException, InterruptedException{
[*] String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange
[*] String tempQueue = "QUEUE_TEST_TEMP";//以前未声明的queue
[*] eventTemplate.send(tempQueue, tempExchange, mockObj());
[*] //发送成功后此时不会接受到消息,还需要绑定对应的消费程序
[*] controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor());
[*] }
[*]
[*] @After
[*] public void end() throws InterruptedException{
[*] Thread.sleep(2000);
[*] }
[*]
[*] private People mockObj(){
[*] People jack = new People();
[*] jack.setId(1);
[*] jack.setName("JACK");
[*] jack.setMale(true);
[*]
[*] List<People> friends = new ArrayList<>();
[*] friends.add(jack);
[*] People hanMeiMei = new People();
[*] hanMeiMei.setId(1);
[*] hanMeiMei.setName("韩梅梅");
[*] hanMeiMei.setMale(false);
[*] hanMeiMei.setFriends(friends);
[*]
[*] People liLei = new People();
[*] liLei.setId(2);
[*] liLei.setName("李雷");
[*] liLei.setMale(true);
[*] liLei.setFriends(friends);
[*] liLei.setSpouse(hanMeiMei);
[*] hanMeiMei.setSpouse(liLei);
[*] return hanMeiMei;
[*] }
[*]
[*] class ApiProcessEventProcessor implements EventProcesser{
[*] @Override
[*] public void process(Object e) {//消费程序这里只是打印信息
[*] Assert.assertNotNull(e);
[*] System.out.println(e);
[*] if(e instanceof People){
[*] People people = (People)e;
[*] System.out.println(people.getSpouse());
[*] System.out.println(people.getFriends());
[*] }
[*] }
[*] }
[*]}
源码地址请点击这里
页:
[1]