设为首页 收藏本站
查看: 1027|回复: 0

[经验分享] Hadoop日志文件分析系统

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-7-13 09:56:35 | 显示全部楼层 |阅读模式
  Hadoop日志分析系统
  项目需求:
  需要统计一下线上日志中某些信息每天出现的频率,举个简单的例子,统计线上每天的请求总数和异常请求数。线上大概几十台
  服务器,每台服务器大概每天产生4到5G左右的日志,假设有30台,每台5G的,一天产生的日志总量为150G。
  处理方案:
  方案1:传统的处理方式,写个JAVA日志分析代码,部署到每台服务器进行处理,这种方式部署起来耗时费力,又不好维护。
  方案2:采用Hadoop分布式处理,日志分析是Hadoop集群系统的拿手好戏。150G每天的日志也算是比较大的数据量了,搭个简
  单的Hadoop集群来处理这些日志是再好不过的了。
  Hadoop集群的搭建:
  参见这两篇文章:http://www.iyunv.com/cstar/archive/2012/12/16/2820209.html
  http://www.iyunv.com/cstar/archive/2012/12/16/2820220.html
  我们这里的集群就采用了两台机器,配置每台8核,32G内存,500G磁盘空间。
  日志准备工作:
  由于日志分散在各个服务器,所以我们先需要将所有的日志拷贝到我们的集群系统当中,这个可以通过linux服务器下rsync或者scp
  服务来执行。这里我们通过scp服务来拷贝,由于都是内网的机器,所以拷贝几个G的日志可以很快就完成。下面是拷贝日志的脚本,脚本
  还是有一些需要注意的地方,我们只需要拷贝前一天的数据,实际保存的数据可能是好几天的,所以我们只要把我们需要的这一天的数据
  SCP过去就可以了。



#!/bin/sh
workdir=/home/myproj/bin/log/
files=`ls $workdir`
pre1date=`date  +"%Y%m%d" -d  "-1 days"`
pre1date1=`date  +"%Y-%m-%d" -d  "-1 days"`
curdate=`date  +"%Y%m%d"`
hostname=`uname -n`
echo $pre1date $curdate
uploadpath="/home/hadoop/hadoop/mytest/log/"$pre1date1"/"$hostname
echo $uploadpath
cd $workdir
mintime=240000
secondmintime=0
for file in $files;do
filedate=`stat $file | grep Modify| awk '{print $2}' |sed -e 's/-//g'`
filetime=`stat $file | grep Modify| awk '{print $3}' |cut -d"." -f1 | sed -e 's/://g'| sed 's/^0\+//'`
if [ $filedate -eq $curdate ]; then
if [ $filetime -lt $mintime ]; then
secondmintime=$mintime
mintime=$filetime
fi
fi
done
echo "mintime:"$mintime
step=1000
mintime=`expr $mintime + $step`
echo "mintime+1000:"$mintime
for file in $files;do
filedate=`stat $file | grep Modify| awk '{print $2}' |sed -e 's/-//g'`
filetime=`stat $file | grep Modify| awk '{print $3}' |cut -d"." -f1 | sed -e 's/://g'| sed 's/^0\+//'`
filename=`echo $file | cut -c 1-8`
startchars="info.log"
#echo $filename
if [ $filename == $startchars ]; then
if [ $filedate -eq $pre1date ]; then
scp -rp $file dir@antix2:$uploadpath
#echo $file
elif [ $filedate -eq $curdate ]; then
if [ $filetime -lt $mintime ]; then
scp -rp $file dir@antix2:$uploadpath
#echo $file
fi
fi
fi
#echo $filedate $filetime
done

  MapReduce代码
  接下来就是编写MapReduce的代码了。使用Eclipse环境来编写,需要安装hadoop插件,我们hadoop机器采用的是1.1.1版本,所以插
  件使用hadoop-eclipse-plugin-1.1.1.jar,将插件拷贝到eclipse的plugins目录下就可以了。然后新建一个MapReduce项目:
DSC0000.jpg
  工程新建好了然后我们就可以编写我们的MapReduce代码了。



import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class LogAnalysis {
public static class LogMapper
extends Mapper{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private Text hourWord = new Text();
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
SimpleDateFormat formatter2 = new SimpleDateFormat("yy-MM-dd");
java.util.Date d1 =new Date();
d1.setTime(System.currentTimeMillis()-1*24*3600*1000);
String strDate =formatter2.format(d1);
if(line.contains(strDate)){
String[] strArr = line.split(",");
int len = strArr[0].length();
String time = strArr[0].substring(1,len-1);
String[] timeArr = time.split(":");
String strHour = timeArr[0];
String hour = strHour.substring(strHour.length()-2,strHour.length());
String hourKey = "";
if(line.contains("StartASocket")){
word.set("SocketCount");
context.write(word, one);
hourKey = "SocketCount:" + hour;
hourWord.set(hourKey);
context.write(hourWord, one);
word.clear();
hourWord.clear();
}
if(line.contains("SocketException")){
word.set("SocketExceptionCount");
context.write(word, one);
hourKey = "SocketExceptionCount:" + hour;
hourWord.set(hourKey);
context.write(hourWord, one);
word.clear();
hourWord.clear();
}
         
        }
}
public static class LogReducer
extends Reducer {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static int run(String[] args) throws Exception{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: loganalysis  ");
System.exit(2);
}
FileSystem fileSys = FileSystem.get(conf);
String inputPath = "input/" + args[0];
fileSys.copyFromLocalFile(new Path(args[0]), new Path(inputPath));//将本地文件系统的文件拷贝到HDFS中
Job job = new Job(conf, "loganalysis");
job.setJarByClass(LogAnalysis.class);
job.setMapperClass(LogMapper.class);
job.setCombinerClass(LogReducer.class);
job.setReducerClass(LogReducer.class);
// 设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = job.waitForCompletion(true)? 0 : 1;
fileSys.copyToLocalFile(new Path(otherArgs[1]), new Path(otherArgs[1]));
fileSys.delete(new Path(inputPath), true);  
fileSys.delete(new Path(otherArgs[1]), true);   
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
return ret;
}
public static void main(String[] args)
{
try
{
int ret = run(args);
System.exit(ret);
} catch (Exception e)
{
e.printStackTrace();
System.out.println(e.getMessage());
}
}
}
  
  部署到Hadoop集群:
  代码完成后测试没有问题后,部署到集群当中去执行,我们有几十台服务器,所以每台的服务器的日志当成一个任务来执行。



workdir="/home/hadoop/hadoop/mytest"
cd $workdir
pre1date=`date  +"%Y-%m-%d" -d  "-1 days"`
servers=(mach1 mach2 mach3 )
for i in ${servers[@]};do
inputPath="log/"$pre1date"/"$i
outputPath="output/log/"$pre1date"/"$i
echo $inputPath $outputPath
echo "start job "$i" date:"`date`
hadoop jar  LogAnalysis.jar loganalysis $inputPath $outputPath
echo "end job "$i" date:"`date`
done
  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-86163-1-1.html 上篇帖子: 【转载】各种sql语句在hadoop pig中的实现 下篇帖子: linux下hadoop安装笔记
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表