设为首页 收藏本站
查看: 2083|回复: 0

[经验分享] flume连接metaQ传数据到spark进行实时计算

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-11-27 21:05:05 | 显示全部楼层 |阅读模式
  以前我们在使用hadoop的时候,都是使用shell脚本来从线上服务器拉取日志(今天凌晨把昨天生成的日志拉过来)。这种情况适用于日质量较小的情况。但是日质量很大了,比如一天1T的日志,那么在使用shell脚本就不能够满足要求了。有两个原因:
  1. 拉取的速度较慢。因为从线上服务器拉取日志是需要跨机房的,而且机房的带宽有限,如果在拉取日志时不加限制,那么带宽就全被拉取日志占了,留给线上服务的带宽就会很小,所以这样就需要舍弃shell脚本的方式。
  2. shell脚本拉取日志经常会出现失败的情况,不是很稳定。
  现在我们使用的工具是flume,他的部署方式是多个client向一个server传日志。
DSC0000.jpg


  部署的client和server都是用同一个安装包,只是配置文件不同。安装地址见:
  http://download.csdn.net/detail/aaa1117a8w5s6d/7973839


  


  我们在实时计算的时候需要使用一个MQ,我们选择的是淘宝的MetaQ,但是flume默认是不会向metaQ导数据的,所以就需要我们修改源码,
  在flume-ng-core工程(flume的核心代码)下的org.apache.flume.sink包里添加一个类:
  

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flume.sink;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
public class MetaQSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory
.getLogger(MetaQSink.class);
private MessageSessionFactory sessionFactory;
private MessageProducer producer;
private String zkConnect;
private String zkRoot;
private String topic;
private int batchSize;
private int threadNum;
private ExecutorService executor;
@Override
public void configure(Context context) {
this.zkConnect = context.getString("sink.zkConnect");
this.zkRoot = context.getString("sink.zkRoot");
this.topic = context.getString("sink.topic");
this.batchSize = context.getInteger("sink.batchSize", 10000);
this.threadNum = context.getInteger("sink.threadNum", 50);
executor = Executors.newCachedThreadPool();
MetaClientConfig metaClientConfig = new MetaClientConfig();
ZKConfig zkConfig = new ZKConfig();
zkConfig.zkConnect = zkConnect;
zkConfig.zkRoot = zkRoot;
metaClientConfig.setZkConfig(zkConfig);
try {
sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
} catch (MetaClientException e) {
e.printStackTrace();
logger.error("", e);
throw new RuntimeException("init error");
}
producer = sessionFactory.createProducer();
logger.info("zkConnect:" + zkConnect + ", zkRoot:" + zkRoot
+ ", topic:" + topic);
}
@Override
public Status process() throws EventDeliveryException {
long start = System.currentTimeMillis();
producer.publish(topic);
Status result = Status.READY;
final Channel channel = getChannel();
final AtomicInteger al = new AtomicInteger(0);
final CountDownLatch cdl = new CountDownLatch(threadNum);
for (int t = 0; t < threadNum; t++) {
executor.execute(new Runnable() {
@Override
public void run() {
Event event = null;
Transaction transaction = null;
int i = 0;
try {
transaction = channel.getTransaction();
transaction.begin();
boolean startTransaction = false;
for (i = 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {
if (i == 0) {
producer.beginTransaction();
startTransaction = true;
}
final SendResult sendResult = producer
.sendMessage(new Message(topic, event
.getBody()));
// check result
if (!sendResult.isSuccess()) {
logger.error(&quot;Send message failed,error message:&quot;
+ sendResult.getErrorMessage());
throw new RuntimeException(
&quot;Send message failed,error message:&quot;
+ sendResult
.getErrorMessage());
} else {
logger.debug(&quot;Send message successfully,sent to &quot;
+ sendResult.getPartition());
}
} else {
// No event found, request back-off semantics
// from the sink
// runner
// result = Status.BACKOFF;
break;
}
}
if (startTransaction) {
producer.commit();
}
al.addAndGet(i);
transaction.commit();
} catch (Exception ex) {
logger.error(&quot;error while rollback:&quot;, ex);
try {
producer.rollback();
} catch (Exception e) {
e.printStackTrace();
}
transaction.rollback();
} finally {
cdl.countDown();
transaction.close();
}
}
});
}
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (al.get() == 0) {
result = Status.BACKOFF;
}
logger.info(&quot;metaqSink_new,process:{},time:{},queue_size:{}&quot;,
new Object[] { al.get(), System.currentTimeMillis() - start,
channel.getSize() });
return result;
}
}


然后在flume里配置如下:  
  # example.conf: A single-node Flume configuration





# Name the components on this agent

info.sources = info_source

info.sinks = info_sink info_sink_to_metaq

info.channels = info_channel info_channel_to_metaq









# Describe/configure the source

info.sources.info_source.type = avro

info.sources.info_source.bind = 10.0.3.19

info.sources.info_source.port = 58001

info.sources.info_source.threads = 24





info.sinks.info_sink.type = file_roll

info.sinks.info_sink.sink.directory = /data1/logs/flume/info

info.sinks.info_sink.sink.name= info

ifno.sinks.info_sink.sink.batchSize= 20000





info.sinks.info_sink_to_metaq.type = org.apache.flume.sink.MetaQSink

info.sinks.info_sink_to_metaq.sink.zkConnect = 10.0.5.108:2181,10.0.5.109:2181,10.0.5.110:2181

info.sinks.info_sink_to_metaq.sink.zkRoot= /meta

info.sinks.info_sink_to_metaq.sink.topic= info

ifno.sinks.info_sink_to_metaq.sink.batchSize= 20000






# Describe the channel

info.channels.info_channel.type = memory

info.channels.info_channel.capacity = 10000000

info.channels.info_channel.transactionCapacity = 10000000









info.channels.info_channel_to_metaq.type = memory

info.channels.info_channel_to_metaq.capacity = 10000000

info.channels.info_channel_to_metaq.transactionCapacity = 10000000





# Bind the source and sink to the channel

info.sources.info_source.channels = info_channel info_channel_to_metaq

info.sinks.info_sink.channel = info_channel

info.sinks.info_sink_to_metaq.channel = info_channel_to_metaq

















nginx.sources = nginx_source

nginx.sinks = nginx_sink

nginx.channels = nginx_channel









# Describe/configure the source

nginx.sources.nginx_source.type = avro

nginx.sources.nginx_source.bind = 10.0.3.19

nginx.sources.nginx_source.port = 58002

#

#

nginx.sinks.nginx_sink.type = file_roll

nginx.sinks.nginx_sink.sink.directory = /data1/logs/flume/nginx

nginx.sinks.nginx_sink.sink.name= nginx

nginx.sinks.nginx_sink.sink.batchSize= 2000

#

# Describe the channel

nginx.channels.nginx_channel.type = SPILLABLEMEMORY

# #1000W 1G

nginx.channels.nginx_channel.memoryCapacity = 20000000

nginx.channels.nginx_channel.overflowCapacity = 200000000

nginx.channels.nginx_channel.checkpointDir = /data1/logs/flume/nginx/check

nginx.channels.nginx_channel.dataDirs = /data1/logs/flume/nginx/data

# Bind the source and sink to the channel

nginx.sources.nginx_source.channels = nginx_channel

nginx.sinks.nginx_sink.channel = nginx_channel













usage.sources = usage_source

usage.sinks = usage_sink

usage.channels = usage_channel









# Describe/configure the source

usage.sources.usage_source.type = avro

usage.sources.usage_source.bind = 10.0.3.19

usage.sources.usage_source.port = 58003

#

#

usage.sinks.usage_sink.type = file_roll

usage.sinks.usage_sink.sink.directory = /data1/logs/flume/usage

usage.sinks.usage_sink.sink.name= usage

usage.sinks.usage_sink.sink.batchSize= 1000

# #

# # Describe the channel

usage.channels.usage_channel.type = SPILLABLEMEMORY

# # #1000W 1G

usage.channels.usage_channel.memoryCapacity = 20000000

usage.channels.usage_channel.overflowCapacity = 200000000

usage.channels.usage_channel.checkpointDir = /data1/logs/flume/usage/check

usage.channels.usage_channel.dataDirs = /data1/logs/flume/usage/data

# # Bind the source and sink to the channel

usage.sources.usage_source.channels = usage_channel

usage.sinks.usage_sink.channel = usage_channel


  


  注:上面的配置文件是配置了3个flume的信息。
  ---------------------------------------------------------------------------------------------------------------
  metaQ的安装包:
  http://download.csdn.net/detail/aaa1117a8w5s6d/7974039


  metaQ向spark传数据,见工程:
  http://download.csdn.net/detail/aaa1117a8w5s6d/7974053

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144388-1-1.html 上篇帖子: flume master服务监听端口总结 下篇帖子: 使用flume搜集服务器log到hdfs
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表