快速搭建MQTT服务器(MQTTnet和Apache Apollo)
前言MQTT协议是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分,http://mqtt.org/。
MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. For example, it has been used in sensors communicating to a broker via satellite link, over occasional dial-up connections with healthcare providers, and in a range of home automation and small device scenarios. It is also>
通过https://github.com/mqtt/mqtt.github.io/wiki/servers 找到官方推荐的服务端软件,比如:Apache Apollo,
通过https://github.com/mqtt/mqtt.github.io/wiki/libraries可以找到推荐的客户端类库,比如:Eclipse Paho Java
MQTTnet
MQTTnet 是MQTT协议的.NET 开源类库。
MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server. The implementation is based on the documentation from http://mqtt.org/.
通过Nuget搜索MQTT找到了MQTTnet,它不是下载量最多的,也不在官方推荐列表中,主要是因为同时支持客户端和服务端,所以开始下载试用,结果证明有坑,源码在vs2015中不能打开,客户端示例接收不到消息。
开源地址:https://github.com/chkr1011/MQTTnet
首先把官方的控制台程序改成winform的,界面如下:
public partial>
{
private MqttServer mqttServer = null;
private MqttClient mqttClient = null;
public Form1()
{
InitializeComponent();
}
private void button_启动服务端_Click(object sender, EventArgs e)
{
MqttTrace.TraceMessagePublished
+= MqttTrace_TraceMessagePublished;
if (this.mqttServer == null)
{
try
{
var options = new MqttServerOptions
{
ConnectionValidator
= p =>
{
if (p.ClientId == "SpecialClient")
{
if (p.Username != "USER" || p.Password != "PASS")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
}
return MqttConnectReturnCode.ConnectionAccepted;
}
};
mqttServer
= new MqttServerFactory().CreateMqttServer(options);
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
return;
}
}
mqttServer.Start();
this.txt_服务器.AppendText( $">> 启动成功..." + Environment.NewLine);
}
private void MqttTrace_TraceMessagePublished(object sender, MqttTraceMessagePublishedEventArgs e)
{
this.Invoke(new Action(() =>
{
this.txt_服务器.AppendText($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}" + Environment.NewLine);
if (e.Exception != null)
{
this.txt_服务器.AppendText( e.Exception + Environment.NewLine);
}
}));
}
private void button_停止服务端_Click(object sender, EventArgs e)
{
if (mqttServer != null)
{
mqttServer.Stop();
}
this.txt_服务器.AppendText( $">> 停止成功" + Environment.NewLine);
}
private async void button_启动客户端_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
var options = new MqttClientOptions
{
Server
= "192.168.2.54",
ClientId
= "zbl",
CleanSession
= true
};
this.mqttClient = new MqttClientFactory().CreateMqttClient(options);
this.mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
this.mqttClient.Connected += MqttClient_Connected;
this.mqttClient.Disconnected += MqttClient_Disconnected;
}
try
{
await this.mqttClient.ConnectAsync();
}
catch(Exception ex)
{
this.txt_客户端.AppendText( $"### CONNECTING FAILED ###" + Environment.NewLine);
}
}
private void MqttClient_Connected(object sender, EventArgs e)
{
this.Invoke(new Action( async () =>
{
this.txt_客户端.AppendText( $"### CONNECTED WITH SERVER ###" + Environment.NewLine);
await this.mqttClient.SubscribeAsync(new List<TopicFilter>{
new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)
});
this.txt_客户端.AppendText($"### SUBSCRIBED###" + Environment.NewLine);
}));
}
private void MqttClient_Disconnected(object sender, EventArgs e)
{
this.Invoke(new Action(() =>
{
this.txt_客户端.AppendText( $"### DISCONNECTED FROM SERVER ###" + Environment.NewLine);
}));
}
private void MqttClient_ApplicationMessageReceived(object sender, MQTTnet.Core.MqttApplicationMessageReceivedEventArgs e)
{
this.Invoke(new Action(() =>
{
this.txt_客户端.AppendText( $">> Topic:{e.ApplicationMessage.Topic} Payload:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} QoS:{e.ApplicationMessage.QualityOfServiceLevel} Retain:{e.ApplicationMessage.Retain}" + Environment.NewLine);
}));
}
private async void button_停止客户端_Click(object sender, EventArgs e)
{
if (this.mqttClient != null)
{
await this.mqttClient.DisconnectAsync();
}
this.txt_客户端.AppendText( $">> 停止成功" + Environment.NewLine);
}
private async void button_发送_Click(object sender, EventArgs e)
{
//var options = new MqttClientOptions
//{
// Server = "localhost"
//};
//var client = new MqttClientFactory().CreateMqttClient(options);
//await client.ConnectAsync();
var applicationMessage = new MqttApplicationMessage(this.txt_topic.Text,
Encoding.UTF8.GetBytes(this.txt_message.Text), MqttQualityOfServiceLevel.AtMostOnce, false);
await this.mqttClient.PublishAsync(applicationMessage);
}
private void button_清空服务端_Click(object sender, EventArgs e)
{
this.txt_服务器.Text = "";
}
}
Form1.cs 需要注意的是按照作者说明是
while (true)
{
Console.ReadLine();
var applicationMessage = new MqttApplicationMessage(
"A/B/C",
Encoding.UTF8.GetBytes("Hello World"),
MqttQualityOfServiceLevel.AtLeastOnce,
false
);
await client.PublishAsync(applicationMessage);
}
客户端死活都收不到消息,改成 MqttQualityOfServiceLevel.AtMostOnce 就可以了,找问题时尝试下载源码调试因为vs2015打不开项目也折腾了一会。这个问题提交了issues,期待作者回复。
Apache Apollo
1.下载Apollo服务器,我这里用的是Binaries for Windows。下载后解压到一个文件夹,注意路径不要包含中文,安装手册
2.创建Broker Instance,命令行cd到bin目录,执行/bin/apollo create mybroker,执行后就会在bin目录下创建mybroker文件夹。
3.运行Broker Instance,命令行cd到mybroker/bin目录,执行mybroker/bin/apollo-broker.cmd run
4.Web Administrator,地址 http://127.0.0.1:61680/ or https://127.0.0.1:61681/,默认账号 admin,密码 password
参考
[*] MQTT协议的简单介绍和服务器的安装,
[*]编写和MQTT服务器通信的Android客户端程序,
[*] MQTT】在Windows下搭建MQTT服务器
搭建了MQTT服务端之后需要在Esp8266模块和手机App中分别实现客户端功能,稍后待续。。。。
页:
[1]