设为首页 收藏本站
查看: 939|回复: 0

[经验分享] RabbitMQ学习笔记二:Java实现RabbitMQ

[复制链接]

尚未签到

发表于 2017-7-4 07:07:26 | 显示全部楼层 |阅读模式
  本地安装好RabbitMQ Server后,就可以在Java语言中使用RabbitMQ了。
  RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一queue接收message。
  几个关键概念:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

由Exchange,Queue,RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。

消息队列的使用过程大概如下:

(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。

  现在,可以上代码了。首先,是在项目中加入需要的jar包,我使用的是maven项目,直接配置maven及可:



<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.0.4</version>
</dependency>
  后面还会用到的jar包,配置如下



<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
  先写一个类,将产生产者和消费者统一为 EndPoint类型的队列。不管是生产者还是消费者, 连接队列的代码都是一样的,这样可以通用一些。



package cn.com.shopec.rabbitmq;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public abstract class EndPoint {
protected Channel channel;
protected Connection connection;
protected String endPointName;
public EndPoint(String endpointName) throws IOException
{
this.endPointName = endpointName;
// Create a connection factory
ConnectionFactory factory = new ConnectionFactory();
// 与RabbitMQ Server建立连接  
// 连接到的broker在本机localhost上
factory.setHost("localhost");
// getting a connection
connection = factory.newConnection();
// creating a channel
channel = connection.createChannel();
// declaring a queue for this channel. If queue does not exist,
// it will be created on the server.
// queueDeclare的参数:queue 队列名;durable true为持久化;exclusive 是否排外,true为队列只可以在本次的连接中被访问,
// autoDelete true为connection断开队列自动删除;arguments 用于拓展参数
channel.queueDeclare(endpointName, false, false, false, null);
}
/**
* 关闭channel和connection。并非必须,因为隐含是自动调用的。
* @throws IOException
*/
public void close() throws IOException
{
this.channel.close();
this.connection.close();
}
}
  生产者类的任务是向队列里写一条消息



package cn.com.shopec.rabbitmq;
import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
public class Producer extends EndPoint {
public Producer(String endPointName) throws IOException
{
super(endPointName);
}
public void sendMessage(Serializable object) throws IOException
{
channel.basicPublish("", endPointName, null, SerializationUtils.serialize(object));
}
}
  消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。



package cn.com.shopec.rabbitmq;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.SerializationUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
public class QueueConsumer extends EndPoint implements Runnable, Consumer {
public QueueConsumer(String endPointName) throws IOException
{
super(endPointName);
}
public void run()
{
try
{
// start consuming messages. Auto acknowledge messages.
channel.basicConsume(endPointName, true, this);
}
catch (IOException e)
{
e.printStackTrace();
}
}
/**
* Called when consumer is registered.
*/
public void handleConsumeOk(String consumerTag)
{
System.out.println("Consumer " + consumerTag + " registered");
}
/**
* Called when new message is available.
*/
public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException
{
Map map = (HashMap) SerializationUtils.deserialize(body);
System.out.println("Message Number " + map.get("message number") + " received.");
}
public void handleCancel(String consumerTag)
{
}
public void handleCancelOk(String consumerTag)
{
}
public void handleRecoverOk(String consumerTag)
{
}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1)
{
}
}
  测试类中,先运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走。



package cn.com.shopec.rabbitmq;
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
public class Main {
public Main() throws Exception
{
// 创建消费者,即消息接收者,并启动线程
QueueConsumer consumer = new QueueConsumer("queue");
Thread consumerThread = new Thread(consumer);
consumerThread.start();
// 创建生产者,即消息发送者
Producer producer = new Producer("queue");
// 循环发送消息
for (int i = 0; i < 20; i++)
{
HashMap message = new HashMap();
message.put("message number", i);
producer.sendMessage(message);
System.out.println("Message Number " + i + " sent.");
}
}
/**
* @param args
* @throws SQLException
* @throws IOException
*/
public static void main(String[] args) throws Exception
{
new Main();
}
}
  运行结果:
  Consumer amq.ctag-8TFduKUwrE1I8iT2L5DaZg registered
Message Number 0 sent.
Message Number 1 sent.
Message Number 2 sent.
Message Number 3 sent.
Message Number 4 sent.
Message Number 5 sent.
Message Number 6 sent.
Message Number 7 sent.
Message Number 8 sent.
Message Number 9 sent.
Message Number 10 sent.
Message Number 11 sent.
Message Number 12 sent.
Message Number 13 sent.
Message Number 14 sent.
Message Number 15 sent.
Message Number 16 sent.
Message Number 17 sent.
Message Number 18 sent.
Message Number 19 sent.
Message Number 0 received.
Message Number 1 received.
Message Number 2 received.
Message Number 3 received.
Message Number 4 received.
Message Number 5 received.
Message Number 6 received.
Message Number 7 received.
Message Number 8 received.
Message Number 9 received.
Message Number 10 received.
Message Number 11 received.
Message Number 12 received.
Message Number 13 received.
Message Number 14 received.
Message Number 15 received.
Message Number 16 received.
Message Number 17 received.
Message Number 18 received.
Message Number 19 received.

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390696-1-1.html 上篇帖子: win7 rabbitMQ 安装配置命令(需配置环境变量) 下篇帖子: .NET开源MSSQL、Redis监控产品Opserver之安全配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表