设为首页 收藏本站
查看: 992|回复: 0

[经验分享] 使用 IBM Lotus Expeditor micro broker MQTT 客户机发布消息

[复制链接]

尚未签到

发表于 2015-10-4 05:33:11 | 显示全部楼层 |阅读模式
  消息传递中间件提供了业务集成解决方案上下文中可靠灵活的连接服务。MQ Telemetry Transport (MQTT) 是 IBM 众多的消息传递中间件技术之一,是受 Lotus Expeditor micro broker 支持的一种协议。MQTT 是基于 TCP/IP 的发布/订阅消息传递协议,针对低负荷网络的通信而设计。micro broker 是一种小型的消息代理(只有不足 2 MB 的 Java 代码),主要在小型的应用程序型设备上部署,通常设备都在离企业数据中心很远的位置。在本文中,将创建一个能够连接到 Lotus Expeditor micro broker 的示例发布程序,并用它向一个主题发布消息和验证代理对该消息的接收。学习完本文后,您应该具备了按自己的业务需要创建简单的 MQTT 发布程序所需的知识。
  有关向兼容 MQTT 的代理进行订阅的历史、设计动机、协议特点和操作指导,可以在另一篇文章 “配合使用 WebSphere Business Integration Message Broker 和 MQ Telemetry Transport,第 1 部分: 订阅” 中找到。本文扩展了上述文章中所触及的概念并继续介绍了如何使用 MQTT 发布消息。上述文章涵盖了订阅方面的内容,使用的是 IBM Websphere Message Broker;本文将探究消息发布,使用的是更新的 Lotus Expeditor micro broker。本文所涵盖的方方面面都可以直接应用到 WebSphere Message Broker,因为二者都支持 MQTT 协议。
  本文是用 Java 编写 MQTT 发布程序的指南。本文所详细介绍的这个示例应用程序模仿了机场上有航班抵达时的通知消息发布。为了简化此示例,我们将此场景限制为只包括两个航空公司(Air Freedom 和 Northern Air)和两个机场(Raleigh-Durham/RDU 和 London Heathrow/LHR)。要完成本示例的编译,Lotus Expeditor micro broker 附带的 MQTT 库必须可用。此外,为了用示例 MQTT 客户机连接和发布消息,还要安装和运行 Lotus Expeditor micro broker。
  主题
  在消息的发布/订阅中,消息的目的地称为主题。MQTT 协议具有分层的主题空间,这意味着主题可以按这样一种方式构成,即订阅者和发布者可以使用不同精度指定目的主题。MQTT 对于主题空间有一些强制规定,但设计适合您自己应用程序的逻辑信息空间则由您负责。
  经验表明,详细描述的主题空间要比简单扼要的空间更可取。例如,不建议使用 a、b 或 c 作为主题。使用简短的主题虽然可以节约带宽,但一个设计良好的主题空间则应该更具描述性且允许在订阅应用程序中使用通配符,因此更有实际意义和益处。
  主题是无格式的字符串,可以包含任何单一字节的字符。但字符 /、+ 和 # 具有特定意义,在本文稍后会详细讨论。主题长度最长可达 32,767 个字符。
  图 1 的示例展示了为飞机抵达和起飞消息所设计的逻辑主题空间。此主题空间将贯穿全文。

图 1. 主题空间
DSC0000.gif
  该主题空间的层次为:Flight Times - Airport - Airline - Arrivals or Departures - Flight Number。要将主题层次转变成字符串,可以使用前斜线 (/) 字符分隔主题层次中的每个部分,这又增加了额外的逻辑层精度。例如,要发布有关 Air Freedom 1326 次航班抵达 RDU 的消息, 消息主题应该是 Flight Times/RDU/Air Freedom/Arrivals/Flight 1326。这种逻辑主题空间允许不同的订阅者组可以只订阅他们所感兴趣的消息。而设计糟糕的主题空间则会导致客户的过度订阅,即客户订阅了超出自身所需的主题空间,继而会收到很多无关的消息。这些无关消息需要客户应用程序提供额外的消息过滤以决定消息是否是所感兴趣的。过度订阅会导致带宽浪费和客户机效率的降低。
  接下来,我们将描述三组截然不同的订阅者,他们均对全球范围内航班的抵达和起飞时间感兴趣。

  • 一个人在机场等待飞机抵达
  • 机场显示航班信息
  • 航空公司跟踪准时航班的百分比
  在第一个场景中,假设此人的电话上安装了 MQTT 客户应用程序,他在机场等待接机。在本例中,此人只对特定机场(LHR)的某一个航班(Air Freedom 1024 次航班)感兴趣。为了接收航班抵达的通知,客户应用程序订阅了 Flight Times/LHR/Air Freedom/Arrivals/Flight 1024。
  在第二个订阅场景中,希思罗机场 (LHR) 显示其航班抵达和起飞的信息。在本例中,客户只对 LHR 机场的航班抵达和起飞信息感兴趣。此逻辑主题空间应该允许对 LHR 机场的航班抵达和起飞信息进行单个订阅。所需的订阅主题字符串是 Flight Times/LHR/#。# 字符是个特殊的通配符,可匹配主题空间内合适位置中的所有主题,可以将其视为是订阅主题空间的一个子树。
  最后一个场景是航空公司跟踪其准点航班的数据。我们的示例航空公司是 Northern Air,该公司很关注其全球范围的准点航班的百分比。因此 Northern Air 需要单一订阅全球范围内的航班抵达时间。在本例中,Northern Air 只关心航班抵达,不关心航班起飞。Northern Air 这一特定需求的主题字符串是 Flight Times/+/Northern Air/Arrivals/#。此主题字符串使用了特殊的通配符 + 字符,它让 Northern Air 得以订阅所有机场的航班抵达信息。与 # 通配符不同,它只匹配主题空间层次中的某一层,并不试图匹配层次中的所有稍低的层。
  在创建 MQTT 应用程序时,必须认真将主题空间设计得符合逻辑,并且支持灵活订阅。这种方式确保了发布的大多数客户都无需多次订阅和过度订阅。

回页首
  发布
  MQTT 成功连接之后,就可以发布消息了。应用程序通过 MQTT 客户对象发布消息。发布消息的方法签名是 int publish(String, MqttPayload, byte, Boolean)。这四个参数详细解释如下:

  • String:主题参数的类型是字符串,代理用此字符串来针对订阅者的兴趣(使用之前描述的订阅主题语法指定)匹配发布。
  • MqttPayload:第二个参数是一个 MqttPayload 对象。这个 MqttPayload 对象包含应用程序数据和任何此发布的协议头。此外,还提供了一个偏移量以便应用程序能够决定数据在 MqttPayload 中从何处开始。这实现了对底层字节数组的访问,而无需在数据写到网络后创建额外的副本。提供这种访问是为了在对象构成后和进行传输之前直接在载荷内操纵数据。
  • Byte:第三个参数,是此发布的服务质量(Quality of Service,QoS)。QoS 有三级:0、1 或 2:

    • QoS 为 0 表明发布程序和代理都只尝试消息的一次传递,不会采取任何额外的步骤,亦不会超越 TCP/IP 去确保消息的传递。这一级有时又称作 “发后即忘”,原因是向目的地发送消息后并不验证消息的接收。
    • QoS 为 1 表明消息会被确保传递到代理;但消息可能会传递多次。
    • QoS 为 2 则告知 MQTT 传递消息且只传递一次。
    QoS 级别增加会导致额外的处理器和网络开销。QoS 的设置会影响到消息传递解决方案的整体可扩展性。而且还会增加客户机存储未传递消息的负担。因而,在为每个所发布的消息选择合适的 QoS 级别时要格外注意。总的来说,若非迫切的传递保证所需,尽量使用较低级别的 QoS。随消息提供的 QoS 值指定了在客户机和代理之间发布消息的服务质量。此外,该值还决定了代理用来向其订阅者发布消息的最大 QoS 级别。
    订阅者可以基于每个主题指定消息传递的最大 QoS,所以若消息在 QoS 2 发布,此消息有可能不是在该级别发布给其订阅者。订阅者可以请求为它所收到的消息提供降级 QoS。也许您会觉得消息的端到端 QoS 不由发布程序控制多少有些奇怪,但这样做会提高消息使用者的灵活性。当发布后的消息发送给订阅者时,代理会以订阅过程中由此订阅者指定的最高的 QoS 级别或是以此发布消息的 QoS 级别(如果此级别更低)传递此消息。例如,以 QoS 2 向订阅者发布的消息,若订阅者指定此主题的级别为 QoS 1,那么主题就会以 QoS 1 传递。对于同样级别的标题,若向相同的订阅者发布 QoS 0 级别的消息,那么此消息就会以 QoS 0 级别发送给订阅者。
  • Boolean:第四个参数是 Boolean 标志,该标志表明它是否是保留发布。保留发布存在于代理之内,作为针对给定主题收到的最后一个消息。保留发布让后续订阅者可以在订阅之后立即收到关于某主题的最新消息,即使是他们在此消息发布之后才连接。对于启动之后即刻填充某个显示应用程序并随后用对此信息的后续更改更新它,这种方式十分有效。如果此标志设为 false,只有当前订阅到该主题的订阅者可以收到消息。清单 2 中所示的示例使用的是一个非保留发布。
  这种发布方法会返回一个整型消息 ID。它可以与已注册的 MqttAdvancedCallback 方法结合使用来检查消息是否已由代理接收。
  清单 2 中的代码发布的消息表明 Air Freedom 的 1024 次航班已经抵达伦敦希思罗机场 (LHR)。

清单 2. 发布
/** * Invoke from the command line with a single parameter, the broker URI, * e.g. tcp://mybroker:1883. */ public static void main(String args[]) {      MqttPublisher publisher = null;      try {          publisher = new MqttPublisher(args[0]);          /**         * Connect the newly created publisher to the supplied broker.         */         publisher.connect();          /**         * Publish an "Arrival" message.         */                          publisher.publishMessage(             "Flight Times/LHR/Air Freedom/Arrivals/Flight 1024",             (byte) 2, "Arrived");          /**         * Sleep for 1 second waiting to receive notification of         * publication. Real applications should use appropriate         * inter-thread signaling mechanisms such as wait/notify,          * cyclic barriers or latches.         */         Thread.sleep(1000);      }     catch (MqttException exception) {         System.err.println("Exception occurred during either instantiation,          connection, or publication: "             + exception.getMessage());     }     catch (InterruptedException exception) {         System.err.println("Interrupted while waiting for publication: "             + exception.getMessage());     }     finally {         try {             /**             * Close the publisher if instantiated.             */             if (publisher != null) {                 publisher.disconnectClient();             }         }         catch (MqttException exception) {             System.err.println("Exception occurred closing publisher: "                     + exception.getMessage());         }     } }   /** * Construct a new MqttPublisher containing an unconnected MqttClient. *  * @param brokerURL *            Broker URL to (eventually) connect to. * @throws MqttException *             If an underlying MQTT error occurs instantiating the client *             object. */ private MqttPublisher(String brokerURL) throws MqttException {     this.brokerURL = brokerURL;     this.client = createClient(); }   /** * Publish a string as a message in byte form with the given quality of * service to the given topic. */ public void publishMessage(String topic, byte qos, String message)  throws MqttException {     client.publish(topic, new MqttPayload(message.getBytes(), 0), qos,             false); }


回页首
  结束语
  MQTT 是面向发布/订阅消息范型的一个功能强大的传送机制。在需要小型客户机和低网络负荷的情况下,它能提供在其他发布/订阅协议上增强了的实用工具。本文介绍了如何创建功能全面的 MQTT 发布程序。示例客户机连接到代理并向主题发布消息。此示例还展示了 MqttAdvancedCallback 接口,以用来通知消息已发送给代理。

  参考资料
  学习

  • 您可以参阅本文在 developerWorks 全球网站上的 英文原文。
  • 阅读 developerWorks 文章 “配合使用 WebSphere Business Integration Message Broker 和 MQ Telemetry Transport,第 1 部分”。
  • 阅读 developerWorks Lotus 文章 “使用普及消息传递和 IBM Lotus Expeditor 微代理(micro broker)实现端到端的集成”。
  • 阅读 developerWorks Lotus 文章 “在 IBM Lotus Expeditor 中构建脱机应用程序”。
  • 阅读 developerWorks Lotus 文章 “使用 IBM Lotus Expeditor V6.1 在桌面和移动设备上开发并部署富客户机应用程序”。
  • 阅读 developerWorks Lotus 文章 “Migrating Eclipse RCP applications to IBM Lotus Expeditor”。
  • 阅读 developerWorks Lotus 文章 “为 IBM Lotus Expeditor Property Broker 创建协作组件”。
  • 阅读 developerWorks Lotus 文章 “开发 OSGi 服务作为 IBM Lotus Expeditor 中的 Web 服务”。
  • 阅读 developerWorks Lotus 文章 “在 IBM Lotus Expeditor 中构建并部署简单的 Web 服务资源”。
  • 阅读 IBM 红皮书 Building Composite Applications in Lotus Expeditor V6.1
  • 阅读 developerWorks Lotus Expeditor 页面。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-122289-1-1.html 上篇帖子: IBM收购Rational一年总结 下篇帖子: 推荐引擎概述(转自IBM worker)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表