|
一、生产者
public static void Write(string ExchangeName,string QueueName)
{
var factory = new ConnectionFactory()
{
HostName = "192.168.56.2",
UserName = "mqt",
Password = "123456",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
try
{
channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
//将队列设置为持久化之后,还需要将消息也设为可持久化的,并不会受服务器重启影响
var props = channel.CreateBasicProperties();
props.Persistent = true;
props.DeliveryMode = 2;
for (int i = 0; i < 50; i++)
{
Thread.Sleep(2000);
string message = QueueName + ";内容:" + i.ToString() + ";时间:" + DateTime.Now.ToString("yyyyMMdd hh:mm:ss");
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: body);
Console.WriteLine("Producter Sent: {0}", message);
}
}
catch (Exception)
{
throw;
}
Console.Read();
}
}
}
多线程调用
static void Main(string[] args)
{
var log1 = WriteAsync("mqt.exchange", "mqt.queue.log");
var sms1 = WriteAsync("mqt.exchange", "mqt.queue.sms");
Task.WaitAll(log1, sms1);
}
static async Task WriteAsync(string ExchangeName, string QueueName)
{
await Task.Run(()=> {
Write(ExchangeName, QueueName);
});
}
二、消费者,开启2个消费者,目前只能做到写2个winform
namespace ServiceSMS
{
class Program
{
static void Main(string[] args)
{
BroadRecevice.Read("mqt.exchange", "mqt.queue.sms");
}
}
}
namespace ServiceLog
{
class Program
{
static void Main(string[] args)
{
BroadRecevice.Read("mqt.exchange", "mqt.queue.log");
}
}
}
public class BroadRecevice
{
public void Test()
{
//BroadRecevice.Read();
}
static void CreateChannel(string ExchangeName, string QueueName, IModel channel)
{
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
ExcuteWriteFile(message, QueueName);
Console.WriteLine("Receiver Received {0}", message);
};
channel.BasicConsume(queue: QueueName,
noAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
public static void Read(string ExchangeName,string QueueName)
{
var factory = new ConnectionFactory()
{
HostName = "192.168.56.2",
UserName = "mqt",
Password = "123456",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
CreateChannel(ExchangeName, QueueName, channel);
}
}
}
public static void ExcuteWriteFile(string message, string name)
{
string fileName = DateTime.Now.ToString("yyyyMMdd") + ".txt";
string path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "log\\");
if (!Directory.Exists(path))
{
Directory.CreateDirectory(path);
}
using (FileStream fs = new FileStream(path + fileName, FileMode.Append))
{
using (StreamWriter sw = new StreamWriter(fs, Encoding.Unicode))
{
sw.WriteLine(string.Format("日期:{0};内容:{1};队列名称:{2}", DateTime.Now, message, name));
}
}
}
} |
|