设为首页 收藏本站
查看: 1495|回复: 0

[经验分享] flume与Mosquitto的集成

[复制链接]

尚未签到

发表于 2015-9-17 06:47:42 | 显示全部楼层 |阅读模式
  文章来自:http://www.cnblogs.com/hark0623/p/4173714.html   转发请注明
  
  因业务需求,需要flume收集MQTT(Mosquitto)的数据。  方法就是flume自定义source,source中来订阅(subscribe)MQTT
  
  flume source的java代码如下:



package com.yhx.sensor.flume.source;
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
public class MQTTSource extends AbstractSource implements EventDrivenSource,
Configurable {
/**
* The initialization method for the Source. The context contains all the
* Flume configuration info, and can be used to retrieve any configuration
* values necessary to set up the Source.
*/
@Override
public void configure(Context arg0) {
// TODO Auto-generated method stub

}
SimpleMqttClient client = null;
/**
* Start any dependent systems and begin processing events.
*/
@Override
public void start() {
// TODO Auto-generated method stub
// super.start();
client = new SimpleMqttClient();
client.runClient();
}
/**
* Stop processing events and shut any dependent systems down.
*/
@Override
public void stop() {
// TODO Auto-generated method stub
// super.stop();
if (client != null) {
client.closeConn();
}
}
// public static void main(String[] args) {
// SimpleMqttClient smc = new SimpleMqttClient();
// smc.runClient();
// }
public class SimpleMqttClient implements MqttCallback {
MqttClient myClient;
MqttConnectOptions connOpt;
String BROKER_URL = "tcp://192.168.116.128:1883";
String M2MIO_DOMAIN = "192.168.116.128";
String M2MIO_STUFF = "yhx";
String M2MIO_THING = "yhx_flume";
// String M2MIO_USERNAME = "<m2m.io username>";
// String M2MIO_PASSWORD_MD5 =
// "<m2m.io password (MD5 sum of password)>";

Boolean subscriber = true;
Boolean publisher = false;
/**
*
* connectionLost This callback is invoked upon losing the MQTT
* connection.
*
*/
@Override
public void connectionLost(Throwable t) {
System.out.println("Connection lost!");
// code to reconnect to the broker would go here if desired
        }
public void closeConn() {
if (myClient != null) {
if (myClient.isConnected()) {
try {
myClient.disconnect();
} catch (MqttException e) {
// TODO Auto-generated catch block
                        e.printStackTrace();
}
}
}
}
/**
*
* deliveryComplete This callback is invoked when a message published by
* this client is successfully received by the broker.
*
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// System.out.println("Pub complete" + new
// String(token.getMessage().getPayload()));
        }
/**
*
* messageArrived This callback is invoked when a message is received on
* a subscribed topic.
*
*/
@Override
public void messageArrived(String topic, MqttMessage message)
throws Exception {
// System.out
// .println("-------------------------------------------------");
// // System.out.println("| Topic:" + topic.getName());
// System.out.println("| Topic:" + topic);
// System.out
// .println("| Message: " + new String(message.getPayload()));
// System.out
// .println("-------------------------------------------------");

Map<String, String> headers = new HashMap<String, String>();
//headers.put("curDate", df.format(new Date()));

Event flumeEvent = EventBuilder.withBody(message.getPayload(),
headers);
try {
getChannelProcessor().processEvent(flumeEvent);
} catch (Exception e) {
// TODO: handle exception
                e.printStackTrace();
}

}
/**
*
* runClient The main functionality of this simple example. Create a
* MQTT client, connect to broker, pub/sub, disconnect.
*
*/
public void runClient() {
// setup MQTT Client
String clientID = M2MIO_THING;
connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(3000);
// connOpt.setUserName(M2MIO_USERNAME);
// connOpt.setPassword(M2MIO_PASSWORD_MD5.toCharArray());
// Connect to Broker
try {
myClient = new MqttClient(BROKER_URL, clientID);
myClient.setCallback(this);
myClient.connect(connOpt);
} catch (MqttException e) {
e.printStackTrace();
System.exit(-1);
}
System.out.println("Connected to " + BROKER_URL);
// setup topic
// topics on m2m.io are in the form <domain>/<stuff>/<thing>
String myTopic = M2MIO_DOMAIN + "/" + M2MIO_STUFF + "/"
+ M2MIO_THING;
System.out.println("myTopic:" + myTopic);
MqttTopic topic = myClient.getTopic(myTopic);
// subscribe to topic if subscriber
if (subscriber) {
try {
int subQoS = 0;
myClient.subscribe(myTopic, subQoS);
} catch (Exception e) {
e.printStackTrace();
}
}
// publish messages if publisher
if (publisher) {
for (int i = 1; i <= 10; i++) {
String pubMsg = "{\"pubmsg\":" + i + "}";
int pubQoS = 0;
MqttMessage message = new MqttMessage(pubMsg.getBytes());
message.setQos(pubQoS);
message.setRetained(false);
// Publish the message
System.out.println("Publishing to topic \"" + topic
+ "\" qos " + pubQoS);
MqttDeliveryToken token = null;
try {
// publish message to broker
token = topic.publish(message);
// Wait until the message has been delivered to the
// broker
                        token.waitForCompletion();
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// disconnect
try {
// wait to ensure subscribed messages are delivered
if (subscriber) {
while (true) {
Thread.sleep(5000);
}
}
// myClient.disconnect();
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
}
  打JAR包注意要把Class-Path写上,如下:



Manifest-Version: 1.0
Class-Path: flume-ng-configuration-1.5.2.jar flume-ng-core-1.5.2.jar flume-ng-node-1.5.2.jar flume-ng-sdk-1.5.2.jar org.eclipse.paho.client.mqttv3-1.0.0.jar
  
  将打好的JAR包放到flume的lib目录(注意,class-path说明的jar包在lib一定要有。 如果没有,则放上去)
  
  接着修改一下flume的配置文件,如下(主要是sourceMqtt ,看这个。  因为我这块同时还监听了UDP):



a1.sources = sourceMqtt sourceUdp
a1.sinks = sinkMqtt sinkUdp
a1.channels = channelMqtt channelUdp
# Describe/configure the source
a1.sources.sourceMqtt.type = com.yhx.sensor.flume.source.MQTTSource
# Describe the sink
a1.sinks.sinkMqtt.type = logger
# Use a channel which buffers events in memory
a1.channels.channelMqtt.type = memory
a1.channels.channelMqtt.capacity = 1000
a1.channels.channelMqtt.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.sourceMqtt.channels = channelMqtt
a1.sinks.sinkMqtt.channel = channelMqtt

# a2.sources = sourceUdp
# a2.sinks = sinkUdp
# a2.channels = channelUdp
# Describe/configure the source
a1.sources.sourceUdp.type = syslogudp
a1.sources.sourceUdp.host = 0.0.0.0
a1.sources.sourceUdp.port = 12459
a1.sources.sourceUdp.interceptors=interceptorUdp
a1.sources.sourceUdp.interceptors.interceptorUdp.type=com.yhx.sensor.flume.intercepter.UDPIntercepter$Builder
# Describe the sink
a1.sinks.sinkUdp.type = logger
# Use a channel which buffers events in memory
a1.channels.channelUdp.type = memory
a1.channels.channelUdp.capacity = 1000
a1.channels.channelUdp.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.sourceUdp.channels = channelUdp
a1.sinks.sinkUdp.channel = channelUdp
  
  配置文件保存至flume目录下的conf,叫flume.conf
  然后flume启动命令如下



bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114552-1-1.html 上篇帖子: C++ Thrift Client 与 Flume Thrift Source 对接 下篇帖子: Flume Spooldir 源的一些问题
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表