设为首页 收藏本站
查看: 1961|回复: 0

[经验分享] (5) flume ---- 工作记录2015年01月

[复制链接]

尚未签到

发表于 2015-11-27 18:45:35 | 显示全部楼层 |阅读模式


1、修改flume-conf.properties文件,将sink的类型改成File Roll Sink,把格式化后的数据保存到本地[/data/logs/flume/{type}/%Y%m%d/]

文件命名规则不需要修改,文件滚动由每隔10分钟生成一个新文件修改为每300MB生成一个新文件;

2、/data/logs/flume/{type}/%Y%m%d/ 中的日志文件生成完成之后,压缩成tar.gz格式;

3、flume本地滚动保存3天的数据(tar.gz文件);

4、配置rsync工具定时将tar.gz文件传递到namenode对应的目录中[已经配置完成,定时执行下面的命令

/usr/bin/rsync -avz --progress --password-file=/etc/rsyncd.passwd /data/logs/flume root@10.240.15.4::backup

5、在namenode中定时运行解压脚本解压,解压完成把数据从namenode本地磁盘导入HDFS文件系统,然后删除本地文件;

6、给hive的表添加日期分区。




-------------------------------------------------------------------------------------------------------------------------

  


  1、修改flume-conf.properties文件,将sink的类型改成File Roll Sink,把格式化后的数据保存到本地[/data/logs/flume/{type}/%Y%m%d/]
  文件命名规则不需要修改,文件滚动由每隔10分钟生成一个新文件修改为每300MB生成一个新文件;
  


  说明:


File Roll Sink


Stores events on the local filesystem. Required properties are in bold.


Property Name
Default
Description

channel


type

The component type name, needs to be file_roll.
sink.directory

The directory where files will be stored
sink.rollInterval
30
Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
sink.serializer
TEXT
Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
batchSize
100

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
  
  之前采用flume将数据实时的传送给HDFS。考虑到实时传输对网络等要求较高。改成将文件生成本地并打包,最后将本地打包后的文件传同步至节点服务器,再将文件传至HDFS。
  flume对本地文件系统的输出做的极差,只有一个file roll到功能,只能按照一定的时间间隔切分日志,而不是我们常用到的按天/按小时/按分钟等方式来切分日志。
  

要实现自己的需求必须对它做一些修改:


一个办法是修改它的RollingFileSink.java和PathManager.java,来实现我们想要的功能。


另外一种办法就是自己直接利用它的API来实现一个Sink。


此处实现的需求是:


本地保存文件的路径是动态生成。文件名称有特定的规范。


[/data/logs/flume/{type}/%Y%m%d/]             {type}  类型   /%Y%m%d
年月日




filePrefix
= flume_bjxd04.%Y%m%d%H%M     %Y%m%d%H%M



fileSuffix
= .log



--------------------------------------------


自定义sink的环境一开始也有点疑惑。新建java project。把flume的jar包引入。直接写自己的sink。写完了打包without lib。把jar包放到


flume的lib文件夹里。配置文件引用自己的sink使用全路径加文件名。形如:


agent.sinks.hdfssinkNsf27.type = com.cccc.bigdata.flume.sink.RollingFileSinkExtraBypjm







//仿照RollingFileSink.java写的sink


public class RollingFileSinkExtraBypjm extends AbstractSink implements
Configurable {
private static final Logger logger = LoggerFactory
.getLogger(RollingFileSink.class);
private static final long defaultRollInterval = 30;
private static final int defaultBatchSize = 100;
private int batchSize = defaultBatchSize;
private String directory;  //在 RollingFileSink类 是 private File directory; 因为此处需要替换 年月日等 定义为String
private long rollInterval;
private OutputStream outputStream;
private ScheduledExecutorService rollService;
private String serializerType;
private Context serializerContext;
private EventSerializer serializer;
private SinkCounter sinkCounter;
private PathManagerBypjm pathController;
private volatile boolean shouldRotate;
private String pefix;
private String suffix;
public RollingFileSinkExtraBypjm() {
pathController = new PathManagerBypjm();
shouldRotate = false;
}

@Override
public void configure(Context context) {
<span style=&quot;white-space:pre&quot;>//获取配置参数sink.directory</span>  sink.rollInterval  sink.filePrefix  sink.fileSuffix
directory = context.getString(&quot;sink.directory&quot;);
String rollInterval = context.getString(&quot;sink.rollInterval&quot;);
pefix = context.getString(&quot;sink.filePrefix&quot;);  
suffix = context.getString(&quot;sink.fileSuffix&quot;);
serializerType = context.getString(&quot;sink.serializer&quot;, &quot;TEXT&quot;);
serializerContext = new Context(context.getSubProperties(&quot;sink.&quot;
+ EventSerializer.CTX_PREFIX));
Preconditions.checkArgument(directory != null,
&quot;Directory may not be null&quot;);
Preconditions.checkNotNull(serializerType,
&quot;Serializer type is undefined&quot;);
if (rollInterval == null) {
this.rollInterval = defaultRollInterval;
} else {
this.rollInterval = Long.parseLong(rollInterval);
}
batchSize = context.getInteger(&quot;sink.batchSize&quot;, defaultBatchSize);
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}
@Override
public void start() {
logger.info(&quot;Starting {}...&quot;, this);
sinkCounter.start();
super.start();
pathController.setBaseDirectory(directory);
pathController.setPefix(pefix);
pathController.setSuffix(suffix);
if (rollInterval > 0) {
rollService = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat(
&quot;rollingFileSink-roller-&quot;
+ Thread.currentThread().getId() + &quot;-%d&quot;)
.build());
/*
* Every N seconds, mark that it's time to rotate. We purposefully
* do NOT touch anything other than the indicator flag to avoid
* error handling issues (e.g. IO exceptions occuring in two
* different threads. Resist the urge to actually perform rotation
* in a separate thread!
*/
rollService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
logger.debug(&quot;Marking time to rotate file {}&quot;,
pathController.getCurrentFile());
shouldRotate = true;
}
}, rollInterval, rollInterval, TimeUnit.SECONDS);
} else {
logger.info(&quot;RollInterval is not valid, file rolling will not happen.&quot;);
}
logger.info(&quot;RollingFileSink {} started.&quot;, getName());
}
@Override
public Status process() throws EventDeliveryException {
if (shouldRotate) {   // shouldRotate为真,表示当前文件停止Roll,再生成新的文件执行写入
logger.debug(&quot;Time to rotate {}&quot;, pathController.getCurrentFile());
if (outputStream != null) {
logger.debug(&quot;Closing file {}&quot;, pathController.getCurrentFile());
try {
serializer.flush();
serializer.beforeClose();
outputStream.close();
sinkCounter.incrementConnectionClosedCount();
shouldRotate = false;
} catch (IOException e) {
sinkCounter.incrementConnectionFailedCount();
throw new EventDeliveryException(&quot;Unable to rotate file &quot;
+ pathController.getCurrentFile()
+ &quot; while delivering event&quot;, e);
} finally {
serializer = null;
outputStream = null;
}
////去掉文件后缀名(文件在写入的过程中默认给加了.tmp作为区分,文件写完需要去掉这个后缀)
File ff = pathController.getCurrentFile();
try {
FileUtils.moveFile( ff, new File(ff.getAbsolutePath().substring(0, ff.getAbsolutePath().indexOf(&quot;.tmp&quot;))));
} catch (IOException e) {
e.printStackTrace();
}
pathController.rotate();
}
}
if (outputStream == null) {
File currentFile = pathController.getCurrentFile();
logger.debug(&quot;Opening output stream for file {}&quot;, currentFile);
try {
outputStream = new BufferedOutputStream(new FileOutputStream(
currentFile));
serializer = EventSerializerFactory.getInstance(serializerType,
serializerContext, outputStream);
serializer.afterCreate();
sinkCounter.incrementConnectionCreatedCount();
} catch (IOException e) {
sinkCounter.incrementConnectionFailedCount();
throw new EventDeliveryException(&quot;Failed to open file &quot;
+ pathController.getCurrentFile()
+ &quot; while delivering event&quot;, e);
}
}
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
Status result = Status.READY;
try {
transaction.begin();
int eventAttemptCounter = 0;
for (int i = 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {
sinkCounter.incrementEventDrainAttemptCount();
eventAttemptCounter++;
serializer.write(event);
/*
* FIXME: Feature: Rotate on size and time by checking bytes
* written and setting shouldRotate = true if we're past a
* threshold.
*/
/*
* FIXME: Feature: Control flush interval based on time or
* number of events. For now, we're super-conservative and
* flush on each write.
*/
} else {
// No events found, request back-off semantics from runner
result = Status.BACKOFF;
break;
}
}
serializer.flush();
outputStream.flush();
transaction.commit();
sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
} catch (Exception ex) {
transaction.rollback();
throw new EventDeliveryException(&quot;Failed to process transaction&quot;,
ex);
} finally {
transaction.close();
}
return result;
}
@Override
public void stop() {
logger.info(&quot;RollingFile sink {} stopping...&quot;, getName());
sinkCounter.stop();
super.stop();
if (outputStream != null) {
logger.debug(&quot;Closing file {}&quot;, pathController.getCurrentFile());
try {
serializer.flush();
serializer.beforeClose();
outputStream.close();
sinkCounter.incrementConnectionClosedCount();
} catch (IOException e) {
sinkCounter.incrementConnectionFailedCount();
logger.error(
&quot;Unable to close output stream. Exception follows.&quot;, e);
} finally {
outputStream = null;
serializer = null;
}
}
if (rollInterval > 0) {
rollService.shutdown();
while (!rollService.isTerminated()) {
try {
rollService.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.debug(
&quot;Interrupted while waiting for roll service to stop. &quot;
+ &quot;Please report this.&quot;, e);
}
}
}
logger.info(&quot;RollingFile sink {} stopped. Event metrics: {}&quot;,
getName(), sinkCounter);
}
public String getDirectory() {
return directory;
}
public void setDirectory(String directory) {
this.directory = directory;
}
public long getRollInterval() {
return rollInterval;
}
public void setRollInterval(long rollInterval) {
this.rollInterval = rollInterval;
}
}



  

import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
public class PathManagerBypjm {
private long seriesTimestamp;
private String baseDirectory;
private AtomicInteger fileIndex;
private File currentFile;
private String pefix;
private String suffix;
public PathManagerBypjm() {
seriesTimestamp = System.currentTimeMillis();
fileIndex = new AtomicInteger();
}
public File nextFile() {
//(1)  /usr/local/flume/xxxxpjmLog/%Y%m%d 将%Y%m%d替换为年月日 并返回(此处为省事整串替换,配置文件中的也必须写成%Y%m%d<span style=&quot;font-family: Arial, Helvetica, sans-serif;&quot;>)</span>
String dirStr = SinkPjmDefinedUtils.getRealPath(baseDirectory);
//(2)  flume_bjxd02.%Y%m%d%H%M将%Y%m%d%H%M替换为年月日时分
String pefixStr = SinkPjmDefinedUtils.getRealPathFilePrefix(pefix);
//(3)  拼文件全路径/data/logs/flume/allpjm/20150115/flume_bjxd02.201501151029.1421288975655.log
//    (写文件中需要添加.tmp后缀)
String filePath = dirStr+pefixStr+&quot;.&quot;+System.currentTimeMillis()+suffix+&quot;.tmp&quot;;
currentFile = SinkPjmDefinedUtils.CreateFolderAndFile(dirStr, filePath);
return currentFile;
}
/* public File nextFile() {
currentFile = new File(baseDirectory, seriesTimestamp + &quot;-&quot;
+ fileIndex.incrementAndGet());
return currentFile;
}
*/
public File getCurrentFile() {
if (currentFile == null) {
return nextFile();
}
return currentFile;
}
public void rotate() {
currentFile = null;
}
public String getBaseDirectory() {
return baseDirectory;
}
public void setBaseDirectory(String baseDirectory) {
this.baseDirectory = baseDirectory;
}
public long getSeriesTimestamp() {
return seriesTimestamp;
}
public AtomicInteger getFileIndex() {
return fileIndex;
}

public String getPefix() {
return pefix;
}

public void setPefix(String pefix) {
this.pefix = pefix;
}

public String getSuffix() {
return suffix;
}

public void setSuffix(String suffix) {
this.suffix = suffix;
}
}


/**
* 类名:SinkPjmDefinedUtils  <br />
*
* 功能:工具类
*
* @author pjm <br />
* 创建时间:2015-1-15 上午09:44:49  <br />
* @version 2015-1-15
*/
public class SinkPjmDefinedUtils {
/**
* 功能:替换文件夹路径中的%Y%m%d <br/>
*
* @author pjm <br/>
* @version 2015-1-15 上午09:44:46 <br/>
*/
public static String getRealPath(String path){
if (path.contains(&quot;%Y%m%d&quot;)) {
Date today = new Date();
SimpleDateFormat formatter = new SimpleDateFormat(&quot;yyyyMMdd&quot;);
String formattedDate = formatter.format(today);
System.out.println(formattedDate);
path = path.replace(&quot;%Y%m%d&quot;, formattedDate);
}
return path;
}
/**
* 功能: 文件前缀替换<br/>
*
* @author pjm <br/>
* @version 2015-1-15 上午09:45:32 <br/>
*/
public static String getRealPathFilePrefix(String path){
if (path.contains(&quot;%Y%m%d%H%M&quot;)) {
Date today = new Date();
SimpleDateFormat formatter = new SimpleDateFormat(&quot;yyyyMMddHHmm&quot;);
String formattedDate = formatter.format(today);
System.out.println(formattedDate);
path = path.replace(&quot;%Y%m%d%H%M&quot;, formattedDate);
}
return path;
}
/**
* 功能: 创建文件和文件夹,并返回文件<br/>
*
* @author pjm <br/>
* @version 2015-1-15 上午09:45:48 <br/>
*/
public static File CreateFolderAndFile(String dirpath,String filepath){
//String dirpath  = &quot;/data/logs/flume/All/20150115/&quot;;
//String filepath = &quot;/data/logs/flume/All/20150115/flume_bjxd04.201501150900.1421283612463.log&quot;;
//String dirpath  = &quot;/usr/local/flume/AllLog/20150115/&quot;;
//String filepath = &quot;/usr/local/flume/AllLog/20150115/flume_bjxd04.201501150900.1421283612463.log&quot;;
File dirFile = new File(dirpath);
// 创建文件夹
if (!dirFile.exists()) {
dirFile.mkdirs();
}
File f = new File(filepath);
/*// 创建文件
if (!f.exists()) {
try {
f.createNewFile();
//f.createTempFile(&quot;kkk2&quot;, &quot;.java&quot;, dirFile);
} catch (IOException e) {
e.printStackTrace();
}
}*/
return f;
}
}

  


  Flume中使用自定义的sink

##lvi 24
agent.sinks.hdfssinkLvi24.type = com.cntv.bigdata.flume.sink.RollingFileSinkExtraBypjm
agent.sinks.hdfssinkLvi24.sink.directory = /data/logs/flume/lvi/%Y%m%d/
agent.sinks.hdfssinkLvi24.sink.filePrefix = flume_bjxd04.%Y%m%d%H%M
agent.sinks.hdfssinkLvi24.sink.fileSuffix = .log
agent.sinks.hdfssinkLvi24.sink.rollInterval = 600
agent.sinks.hdfssinkLvi24.channel = memoryChannelLvi24

-------------------------------------------------------------------------------------------------------------------------

2 /data/logs/flume/{type}/%Y%m%d/ 中的日志文件生成完成之后,压缩成tar.gz&#26684;式 ,大小为0的文件不压缩直接删除;
  
  通过定时执行脚本的方案来实现。写完的文件是以 .log 作为结尾的。正在写的文件是 .log.tmp文件;
  对以.log结尾且大小不为0的文件打包,打包完成删除原来的log文件,以免下次执行脚本时重复打包;
  

#!/bin/bash
#cd到要操作的根目录
cd /data/logs/flume
#循环本目录下面所有的log文件
for i in $(find -name *.log);
do
#获取要操作的文件信息
context=`echo ls -l $i | awk -F/ '{print$NF}'`
#获取文件路径
path=$i
#需要配置到根目录
cd /data/logs/flume/${path%/*}
#获取文件的大小
FILE_SIZE=`ls -l $context | awk '{print $5}' `
#判断文件大小 如果为0直接删除
if [ $FILE_SIZE -ne 0 ];then   
#压缩文件
tar -czf  ${context%.*}.tar.gz $context
fi
#删除文件
rm -rf $context
done

crontab  
  

*/10 * * * * /usr/local/scripts/nginx_compress.sh-------------------------------------------------------------------------------------------------------------------------
  
  3、 deprecated



-------------------------------------------------------------------------------------------------------------------------

4、配置rsync工具定时将tar.gz文件传递到namenode对应的目录中[已经配置完成,定时执行下面的命令

  /usr/bin/rsync -avz --progress --password-file=/etc/rsyncd.passwd /data/logs/flume root@10.240.15.4::backup
  


  执行脚本 进行同步 将本地的日志文件 传到节点机器的指定路径,完毕之后删除本地的tar.gz文件,避免下次再同步。
  

#! /bin/bash
#1 rsyncd
###### deprecated 2015年1月19日15:31:46
######/usr/bin/rsync -avz --progress --password-file=/etc/rsyncd.passwd /data/logs/flume root@10.240.15.4::backup
/usr/bin/rsync -avz --progress --exclude-from '/usr/local/scripts/exclude.txt' --password-file=/etc/rsyncd.passwd /data/logs/flume root@10.240.15.4::backup
#2
#cd到要操作的根目录
cd  /data/logs/flume/
#循环本目录下面所有的log文件
for i in $(find -name *.tar.gz);
do
#删除文件*.tar.gz
rm -rf ${i}
done

[iyunv@zk1 scripts]# cat exclude.txt
*.tmp
*.log
  
  


  crontab

*/10 * * * * /usr/local/scripts/nginx_rsyc_detar.sh

-------------------------------------------------------------------------------------------------------------------------
  5、在namenode中定时运行解压脚本解压,解压完成把数据从namenode本地磁盘导入HDFS文件系统,然后删除本地文件;
  


  解压脚本
  

#!/bin/bash
#递归解压指定根目录中的所有压缩文件到压缩文件所在的文件夹,并删除原文件
#全局变量,数据文件的根目录
base_dir=/data/hadoop/data/flume
function targz()
{
for file in `ls -A $1`
{
#echo &quot;当前文件:&quot;$file
if [ -f $1/$file ];then
cd $1;
#echo &quot;当前路径:&quot;;pwd
#echo &quot;$1/$file is file&quot;
if [ &quot;${file:0-6}&quot; = &quot;tar.gz&quot; ]; then
#echo &quot;解压:&quot;$file
tar zxvf $file
#echo &quot;删除:&quot;$file
rm -f $file
elif [ &quot;${file:0-3}&quot; = &quot;tmp&quot; ]; then
rm -f $file
fi
else
#echo &quot;$1/$file is not file&quot;
targz $1/$file
fi
}
}
#调用函数
targz $base_dir

将文件传&#20540;hdfs脚本  
  

#!/bin/bash
#递归上传指定根目录中的所有.log文件,并从本地删除源文件
#环境变量
export HADOOP_HOME=/usr/local/hadoop-2.3.0-cdh5.1.0
PATH=.:$PATH:$HADOOP_HOME/bin
export PATH
#全局变量,数据文件的根目录
base_dir=/data/hadoop/data/flume
function upload()
{
for file in `ls -A $1`
{
if [ -f $1/$file ];then
cd $1;
#echo &quot;$1/$file is file&quot;
if [ &quot;${file:0-3}&quot; = &quot;log&quot; ]; then
#上传文件的绝对路径
echo $1/$file
#上传位置
#$echo ${1#*/hadoop}/
#创建或者刷新上传位置
hadoop fs -mkdir -p ${1#*/hadoop}/
#上传文件的命令
hadoop fs -copyFromLocal -f -p $1/$file ${1#*/hadoop}/
#删除文件
rm -f $1/$file
fi
else
#echo &quot;$1/$file is not file&quot;
upload $1/$file
fi
}
}
upload $base_dir
echo &quot;OK!&quot;


crontab  
  

*/10 * * * * /usr/local/scripts/unCompressTargz.sh
*/10 * * * * /usr/local/scripts/uploadFileToHDFS.sh
  

-------------------------------------------------------------------------------------------------------------------------
  6、给hive的表添加日期分区。


       节点机器每天执行脚本即可。
  

$HIVE_HOME/bin/hive -e &quot;${hql1}&quot;>>/usr/local/hive-0.12.0/bash/a.log
$HIVE_HOME/bin/hive -e &quot;${hql2}&quot;>>/usr/local/hive-0.12.0/bash/a.log
$HIVE_HOME/bin/hive -e &quot;${hqlother}&quot;>>/usr/local/hive-0.12.0/bash/a.log
$HIVE_HOME/bin/hive -e &quot;${hql8}&quot;>>/usr/local/hive-0.12.0/bash/a.log
$HIVE_HOME/bin/hive -e &quot;${hql9}&quot;>>/usr/local/hive-0.12.0/bash/a.log
$HIVE_HOME/bin/hive -e &quot;${hql10}&quot;>>/usr/local/hive-0.12.0/bash/a.log
$HIVE_HOME/bin/hive -e &quot;${hql11}&quot;>>/usr/local/hive-0.12.0/bash/a.log
$HIVE_HOME/bin/hive -e &quot;${hql25}&quot;>>/usr/local/hive-0.12.0/bash/a.log
$HIVE_HOME/bin/hive -e &quot;${hql4}&quot;>>/usr/local/hive-0.12.0/bash/a.log
$HIVE_HOME/bin/hive -e &quot;${hql16}&quot;>>/usr/local/hive-0.12.0/bash/a.log
$HIVE_HOME/bin/hive -e &quot;${hql7}&quot;>>/usr/local/hive-0.12.0/bash/a.log
$HIVE_HOME/bin/hive -e &quot;${hql18}&quot;>>/usr/local/hive-0.12.0/bash/a.log

crontab -l  
  

1 0 * * * sh /usr/local/hive-0.12.0/bash/addPartitionEveryDay.sh


  
  -------------------------------------------------------------------------------------------------------------------------

总结 :

  至此完成 flume 对日志文件进行&#26684;式化  不同的类型生成不同的文件 ,且同一个文件之间各个字段按照固定的规则排列\t分割。
  生成日志文件传送至hadoop节点服务器。并完成上传至节点机器的HDFS上。
  接下来的就是 hive 解析 数据并将 数据 结果保存到本地文件,在用mysql 将本文文件中存储的数据导入到对应的表中。
  -------------------------------------------------------------------------------------------------------------------------
  问题:
  (1)
  通过xshell 启动 flume ,每次关闭xshell,flume会停掉。
  使用的启动语句

bin/flume-ng agent --conf conf --conf-file ./conf/flume-rollfile_branch_all.properties --name agent -Dflume.root.logger=INFO,console,LOGFILE


需要改为
/usr/local/flume/bin/flume-ng agent -n agent -c conf -f /usr/local/flume/conf/flume-conf.properties &
  
  

(  -Dflume.root.logger=INFO,console,LOGFILE是用来调试的,生产中的启动命令不要添加  )

  
  -------------------------------------------------------------------------------------------------------------------------

(2)
  

发现、分析、解决flume不能稳定的问题:
每隔两个小时跟踪检查一次flume的运行情况,正常的情况就填正常,如果有不正常情况,把异常信息记录到&quot;FLUME健康状态统计表.xlsx&quot;中

文件&quot;FLUME健康状态统计表.xlsx&quot;位于项目:flume-hadoop2/docs/ 下。

  
  写了个脚本2个小时执行一次 将结果写入到指定文件中,(查看一星期的flume健康度):
  0 */2 * * *  /home/panjinming/flumeHealth/recordFlumeHearth.sh
  #! /bin/bash
count=`ps -ef | grep flume | grep -v &quot;grep&quot; | wc -l`
date=`date +'%Y-%m-%d %H:%M:%S'`
if [ 0 == $count ];then
echo &quot;${date} BAD!!!!!!!!  And Restart !!!! View log please!!!!&quot; >> /home/panjinming/flumeHealth/recordRun109.log
/usr/local/flume/bin/flume-ng agent -n agent -c conf -f /usr/local/flume/conf/flume-rollfile_branch_all.properties &
else
echo &quot;${date} OK!!&quot;>> /home/panjinming/flumeHealth/recordRun109.log
fi

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144327-1-1.html 上篇帖子: 使用Flume进行数据的实时收集处理 下篇帖子: 大数据技术应用(一) 应用Flume+HBase采集和存储日志数据
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表