设为首页 收藏本站
查看: 1298|回复: 0

[经验分享] hadoop日志数据分析开发步骤及代码

[复制链接]

尚未签到

发表于 2017-12-17 23:37:12 | 显示全部楼层 |阅读模式
  日志数据分析:
  1.背景
  1.1 hm论坛日志,数据分为两部分组成,原来是一个大文件,是56GB;以后每天生成一个文件,大约是150-200MB之间;
  1.2 日志格式是apache common日志格式;每行记录有5部分组成:访问ip、访问时间、访问资源、访问状态、本次流量;27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
  1.3 分析一些核心指标,供运营决策者使用;
  1.4 开发该系统的目的是分了获取一些业务相关的指标,这些指标在第三方工具中无法获得的;(第三方工具:百度统计)
  2.开发步骤
  2.1 把日志数据上传到HDFS中进行处理
  如果是日志服务器数据较小、压力较小,可以直接使用shell命令把数据上传到HDFS中;
  如果是日志服务器数据较大、压力较大,使用NFS在另一台服务器上上传数据;(NFS(Network File System)即网络文件系统,是FreeBSD支持的文件系统中的一种,它允许网络中的计算机之间通过TCP/IP网络共享资源。在NFS的应用中,本地NFS的客户端应用可以透明地读写位于远端NFS服务器上的文件,就像访问本地文件一样。)
  如果日志服务器非常多、数据量大,使用flume进行数据处理;
  2.2 使用MapReduce对HDFS中的原始数据进行清洗;
  2.3 使用Hive对清洗后的数据进行统计分析;
  2.4 使用Sqoop把Hive产生的统计结果导出到mysql中;指标查询--mysql
  2.5 如果用户需要查看详细数据的话,可以使用HBase进行展现;明细查询--HBase
  3.流程代码(具体实际操作步骤见下面)
  3.1 使用shell命令把数据从linux磁盘上传到HDFS中
  3.1.1 在hdfs中创建目录,命令如下
  $HADOOP_HOME/bin/hadoop fs -mkdir /hmbbs_logs
  3.1.2 写一个shell脚本,叫做upload_to_hdfs.sh,内容大体如下
  yesterday=`date --date='1 days ago' +%Y_%m_%d`
  hadoop fs -put /apache_logs/access_${yesterday}.log   /hmbbs_logs
  3.1.3 把脚本upload_to_hdfs.sh配置到crontab中,执行命令crontab -e, 写法如下
  * 1 * * * upload_to_hdfs.sh
  3.2 使用MapReduce对数据进行清洗,把原始处理清洗后,放到hdfs的/hmbbs_cleaned目录下,每天产生一个子目录。
  3.3 使用hive对清洗后的数据进行统计。
  3.3.1 建立一个外部分区表,脚本如下
  CREATE EXTERNAL TABLE hmbbs(ip string, atime string, url string) PARTITIONED BY (logdate string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '/hmbbs_cleaned';
  3.3.2 增加分区,脚本如下

  >  把代码增加到upload_to_hdfs.sh中,内容如下
  hive -e "ALTER TABLE hmbbs ADD PARTITION(logdate='${yesterday}') LOCATION '/hmbbs_cleaned/${yesterday}';"
  3.3.3 统计每日的pv,代码如下
  CREATE TABLE hmbbs_pv_2013_05_30 AS SELECT COUNT(1) AS PV FROM hmbbs WHERE logdate='2013_05_30';
  统计每日的注册用户数,代码如下
  CREATE TABLE hmbbs_reguser_2013_05_30 AS SELECT COUNT(1) AS REGUSER FROM hmbbs WHERE logdate='2013_05_30' AND INSTR(url,'member.php?mod=register')>0;
  统计每日的独立ip,代码如下
  CREATE TABLE hmbbs_ip_2013_05_30 AS SELECT COUNT(DISTINCT ip) AS IP FROM hmbbs WHERE logdate='2013_05_30';
  统计每日的跳出用户,代码如下
  CREATE TABLE hmbbs_jumper_2013_05_30 AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM hmbbs WHERE logdate='2013_05_30' GROUP BY ip HAVING times=1) e;
  把每天统计的数据放入一张表
  CREATE TABLE hmbbs_2013_05_30 AS SELECT '2013_05_30', a.pv, b.reguser, c.ip, d.jumper FROM hmbbs_pv_2013_05_30 a JOIN hmbbs_reguser_2013_05_30 b ON 1=1 JOIN hmbbs_ip_2013_05_30 c ON 1=1 JOIN hmbbs_jumper_2013_05_30 d ON 1=1 ;
  3.4 使用sqoop把数据导出到mysql中
  *********************************************
  日志数据分析详细步骤(自己实际操作成功的步骤):
  1、使用shell把数据从Linux磁盘上上传到HDFS中
  在Linux上/usr/local/下创建一个目录:mkdir apache_logs/,然后复制两天的日志数据放到此文件夹下。
  在HDFS中创建存放数据的目录:hadoop fs -mkdir /hmbbs_logs
  hadoop fs -put /usr/local/apache_logs/* /hmbbs_logs
  上传结束了,在hadoop0:50070中观察到在/hmbbs/目录下有两个日志文件。
DSC0000.png

  在/apache_logs目录下创建一个上传数据的shell脚本:vi upload_to_hdfs.sh
  #!/bin/sh
  #get yesterday format string
  yesterday=`date --date='1 days ago' +%Y_%m_%d`
  #upload logs to hdfs
  hadoop fs -put /apache_logs/access_${yesterday}.log /hmbbs_logs
  把脚本upload_to_hdfs.sh配置到crontab中,执行命令crontab -e(在每天的1点钟会准时执行脚本文件)
  * 1 * * * upload_to_hdfs.sh
  2、在eclipse中书写代码,使用MapReduce清洗数据。打包cleaned.jar导出到linux下的/apache_logs目录下。
  

package hmbbs;  

  
import java.text.ParseException;
  
import java.text.SimpleDateFormat;
  
import java.util.Date;
  
import java.util.Locale;
  

  
import org.apache.hadoop.conf.Configuration;
  
import org.apache.hadoop.conf.Configured;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.io.LongWritable;
  
import org.apache.hadoop.io.NullWritable;
  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.mapreduce.Job;
  
import org.apache.hadoop.mapreduce.Mapper;
  
import org.apache.hadoop.mapreduce.Reducer;
  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
import org.apache.hadoop.util.Tool;
  
import org.apache.hadoop.util.ToolRunner;
  
/**
  
  * 源数据的清洗
  
  * @author ahu_lichang
  
  *
  
  */

  
public>  
     public int run(String[] args) throws Exception {
  
         final Job job = new Job(new Configuration(),
  
                 HmbbsCleaner.class.getSimpleName());
  
         job.setJarByClass(HmbbsCleaner.class);
  
         FileInputFormat.setInputPaths(job, args[0]);
  
         job.setMapperClass(MyMapper.class);
  
         job.setMapOutputKeyClass(LongWritable.class);
  
         job.setMapOutputValueClass(Text.class);
  
         job.setReducerClass(MyReducer.class);
  
         job.setOutputKeyClass(Text.class);
  
         job.setOutputValueClass(NullWritable.class);
  
         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
         job.waitForCompletion(true);
  
         return 0;
  
     }
  

  
     public static void main(String[] args) throws Exception {
  
         ToolRunner.run(new HmbbsCleaner(), args);
  
     }
  


  
     static>  
             Mapper<LongWritable, Text, LongWritable, Text> {
  
         LogParser logParser = new LogParser();
  
         Text v2 = new Text();
  

  
         protected void map(
  
                 LongWritable key,
  
                 Text value,
  
                 org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)
  
                 throws java.io.IOException, InterruptedException {
  
             final String[] parsed = logParser.parse(value.toString());
  

  
             // 过滤掉静态信息
  
             if (parsed[2].startsWith("GET /static/")
  
                     || parsed[2].startsWith("GET /uc_server")) {
  
                 return;
  
             }
  

  
             // 过掉开头的特定格式字符串
  
             if (parsed[2].startsWith("GET /")) {
  
                 parsed[2] = parsed[2].substring("GET /".length());
  
             } else if (parsed[2].startsWith("POST /")) {
  
                 parsed[2] = parsed[2].substring("POST /".length());
  
             }
  

  
             // 过滤结尾的特定格式字符串
  
             if (parsed[2].endsWith(" HTTP/1.1")) {
  
                 parsed[2] = parsed[2].substring(0, parsed[2].length()
  
                         - " HTTP/1.1".length());
  
             }
  

  
             v2.set(parsed[0] + "\t" + parsed[1] + "\t" + parsed[2]);
  
             context.write(key, v2);
  
         };
  
     }
  


  
     static>  
             Reducer<LongWritable, Text, Text, NullWritable> {
  
         protected void reduce(
  
                 LongWritable k2,
  
                 java.lang.Iterable<Text> v2s,
  
                 org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context)
  
                 throws java.io.IOException, InterruptedException {
  
             for (Text v2 : v2s) {
  
                 context.write(v2, NullWritable.get());
  
             }
  
         };
  
     }
  


  
     static>  
         public static final SimpleDateFormat FORMAT = new SimpleDateFormat(
  
                 "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
  
         public static final SimpleDateFormat dateformat1 = new SimpleDateFormat(
  
                 "yyyyMMddHHmmss");
  

  
         public static void main(String[] args) throws ParseException {
  
             final String S1 = "27.19.74.143 - - [30/May/2013:17:38:20 +0800] \"GET /static/image/common/faq.gif HTTP/1.1\" 200 1127";
  
             LogParser parser = new LogParser();
  
             final String[] array = parser.parse(S1);
  
             System.out.println("样例数据: " + S1);
  
             System.out.format(
  
                     "解析结果:  ip=%s, time=%s, url=%s, status=%s, traffic=%s",
  
                     array[0], array[1], array[2], array[3], array[4]);
  
         }
  

  
         /**
  
          * 解析英文时间字符串
  
          *
  
          * @param string
  
          * @return
  
          * @throws ParseException
  
          */
  
         private Date parseDateFormat(String string) {
  
             Date parse = null;
  
             try {
  
                 parse = FORMAT.parse(string);
  
             } catch (ParseException e) {
  
                 e.printStackTrace();
  
             }
  
             return parse;
  
         }
  

  
         /**
  
          * 解析日志的行记录
  
          *
  
          * @param line
  
          * @return 数组含有5个元素,分别是ip、时间、url、状态、流量
  
          */
  
         public String[] parse(String line) {
  
             String ip = parseIP(line);
  
             String time = parseTime(line);
  
             String url = parseURL(line);
  
             String status = parseStatus(line);
  
             String traffic = parseTraffic(line);
  

  
             return new String[] { ip, time, url, status, traffic };
  
         }
  

  
         private String parseTraffic(String line) {
  
             final String trim = line.substring(line.lastIndexOf("\"") + 1)
  
                     .trim();
  
             String traffic = trim.split(" ")[1];
  
             return traffic;
  
         }
  

  
         private String parseStatus(String line) {
  
             final String trim = line.substring(line.lastIndexOf("\"") + 1)
  
                     .trim();
  
             String status = trim.split(" ")[0];
  
             return status;
  
         }
  

  
         private String parseURL(String line) {
  
             final int first = line.indexOf("\"");
  
             final int last = line.lastIndexOf("\"");
  
             String url = line.substring(first + 1, last);
  
             return url;
  
         }
  

  
         private String parseTime(String line) {
  
             final int first = line.indexOf("[");
  
             final int last = line.indexOf("+0800]");
  
             String time = line.substring(first + 1, last).trim();
  
             Date date = parseDateFormat(time);
  
             return dateformat1.format(date);
  
         }
  

  
         private String parseIP(String line) {
  
             String ip = line.split("- -")[0].trim();
  
             return ip;
  
         }
  
     }
  

  
}
  

  vi upload_to_hdfs.sh
  #!/bin/sh
  #get yesterday format string
  #yesterday=`date --date='1 days ago' +%Y_%m_%d`
  #testing cleaning data
  yesterday=$1
  #upload logs to hdfs
  hadoop fs -put /apache_logs/access_${yesterday}.log /hmbbs_logs
  #cleanning data
  hadoop jar cleaned.jar /hmbbs_logs/access_${yesterday}.log /hmbbs_cleaned/${yesterday}
  权限chmod u+x upload_to_hdfs.sh
  执行upload_to_hdfs.sh  2013_05_30
  然后在浏览器中hadoop0:50070中就能观察到上传到HDFS中的清洗过后的数据了。
DSC0001.png

  3、使用hive对清洗后的数据进行统计。
  建立一个外部分区表,脚本如下
  CREATE EXTERNAL TABLE hmbbs(ip string, atime string, url string) PARTITIONED BY (logdate string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '/hmbbs_cleaned';
DSC0002.png

  增加分区,脚本如下

  >
DSC0003.png

DSC0004.png

  把代码增加到upload_to_hdfs.sh中,内容如下(每天产生一个分区)
  #alter hive table and then add partition to existed table
  hive -e "ALTER TABLE hmbbs ADD PARTITION(logdate='${yesterday}') LOCATION '/hmbbs_cleaned/${yesterday}';"
  ------hive -e "执行语句;"  hive -e的作用就是不用在hive命令行下,可以在外面执行。
  可以在外面执行hive -e "ALTER TABLE hmbbs ADD PARTITION(logdate='2013_05_31') LOCATION '/hmbbs_cleaned/2013_05_31';"
DSC0005.png

  这样在/hmbbs表下面就多了一个2013_05_31文件
  select count(1) form hmbbs   -----通过观察数字大小变化,就可判断出是否添加成功。
DSC0006.png

DSC0007.png

  统计每日的pv,代码如下
  CREATE TABLE hmbbs_pv_2013_05_30 AS SELECT COUNT(1) AS PV FROM hmbbs WHERE logdate='2013_05_30';
  执行hive -e "SELECT COUNT(1) FROM hmbbs WHERE logdate='2013_05_30';"  得到表中的数据大小,待后面验证用。
  执行hive -e "CREATE TABLE hmbbs_pv_2013_05_30 AS SELECT COUNT(1) AS PV FROM hmbbs WHERE logdate='2013_05_30';"   将查询到的PV(别名)数据存到表hmbbs_pv_2013_05_30中。
  验证表中是否添加成功了数据:hive -e "select * from hmbbs_pv_2013_05_30;"
  统计每日的注册用户数,代码如下
  CREATE TABLE hmbbs_reguser_2013_05_30 AS SELECT COUNT(1) AS REGUSER FROM hmbbs WHERE logdate='2013_05_30' AND INSTR(url,'member.php?mod=register')>0;
  INSTR(url,'member.php?mod=register')是一个函数,用来判断url字符串中所包含的子串member.php?mod=register的个数
  执行hive -e "SELECT COUNT(1) AS REGUSER FROM hmbbs WHERE logdate='2013_05_30' AND INSTR(url,'member.php?mod=register')>0;"  可以统计出其中一天的用户注册数。这个数字肯定比之前的pv数小!
  统计每日的独立ip(去重),代码如下
  CREATE TABLE hmbbs_ip_2013_05_30 AS SELECT COUNT(DISTINCT ip) AS IP FROM hmbbs WHERE logdate='2013_05_30';
  在hive中查询有多少个独立ip:SELECT COUNT(DISTINCT ip) AS IP FROM hmbbs WHERE logdate='2013_05_30';
  执行hive -e "CREATE TABLE hmbbs_ip_2013_05_30 AS SELECT COUNT(DISTINCT ip) AS IP FROM hmbbs WHERE logdate='2013_05_30';"
  统计每日的跳出用户,代码如下
  CREATE TABLE hmbbs_jumper_2013_05_30 AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM hmbbs WHERE logdate='2013_05_30' GROUP BY ip HAVING times=1) e;
  在hive下查询登录次数只有一次的ip有哪些:SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM hmbbs WHERE logdate='2013_05_30' GROUP BY ip HAVING times=1) e;  ---e是别名
  执行hive -e "CREATE TABLE hmbbs_jumper_2013_05_30 AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM hmbbs WHERE logdate='2013_05_30' GROUP BY ip HAVING times=1) e;"
  把每天统计的数据放入一张表 (表连接)
  CREATE TABLE hmbbs_2013_05_30 AS SELECT '2013_05_30', a.pv, b.reguser, c.ip, d.jumper FROM hmbbs_pv_2013_05_30 a JOIN hmbbs_reguser_2013_05_30 b ON 1=1 JOIN hmbbs_ip_2013_05_30 c ON 1=1 JOIN hmbbs_jumper_2013_05_30 d ON 1=1 ;
  创建完了,查看一下:
  show tables;
  select * from hmbbs_2013_05_30 ;
DSC0008.png

  使用sqoop把hmbbs_2013_05_30表中数据导出到mysql中。(数据导出成功了以后,就可以删除掉之前的5个表了)
  在MySQL第三方工具上连接hadoop0,在里面创建一个数据库hmbbs,再创建一个表hmbbs_logs_stat,表中有导出数据的5个字段:logdate varchar 非空 ,pv int, reguser int, ip int, jumper int
DSC0009.png

DSC00010.png

  注意:创建数据库时,出现错误:远程登录权限问题!
DSC00011.png

  sqoop export --connect jdbc:mysql://hadoop0:3306/hmbbs--username root --password admin --table hmbbs_logs_stat --fields-terminated-by '\001'--export-dir ‘/hive/hmbbs_2013_05_30’
  ----'\001'是默认的列分隔符     /user/hive/warehouse/hmbbs_2013_05_30这个目录根据自己的设置,不一定都是这样的!
  导出成功以后,可以在工具中刷新表,就能观察到表中的数据了。
DSC00012.png

  统计数据和导出操作也都应该放在脚本文件中:
  vi upload_to_hdfs.sh
  #create hive table everyday
  hive -e "CREATE TABLE hmbbs_pv_${yesterday} AS SELECT COUNT(1) AS PV FROM hmbbs WHERE logdate='${yesterday}';"
  hive -e "SELECT COUNT(1) AS REGUSER FROM hmbbs WHERE logdate='${yesterday}' AND INSTR(url,'member.php?mod=register')>0;"
  hive -e "CREATE TABLE hmbbs_ip_${yesterday} AS SELECT COUNT(DISTINCT ip) AS IP FROM hmbbs WHERE logdate='${yesterday}';"
  hive -e "CREATE TABLE hmbbs_jumper_${yesterday} AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM hmbbs WHERE logdate='${yesterday}' GROUP BY ip HAVING times=1) e;"
  hive -e "CREATE TABLE hmbbs_${yesterday} AS SELECT '${yesterday}', a.pv, b.reguser, c.ip, d.jumper FROM hmbbs_pv_${yesterday} a JOIN hmbbs_reguser_${yesterday} b ON 1=1 JOIN hmbbs_ip_${yesterday} c ON 1=1 JOIN hmbbs_jumper_${yesterday} d ON 1=1 ;"
  #delete hive tables
  hive -e "drop table hmbbs_pv_${yesterday}"
  hive -e "drop table hmbbs_reguser_${yesterday}"
  hive -e "drop table hmbbs_ip_${yesterday}"
  hive -e "drop table hmbbs_jumper_${yesterday}"
  #sqoop export to mysql
  sqoop export --connect jdbc:mysql://hadoop0:3306/hmbbs--username root --password admin --table hmbbs_logs_stat --fields-terminated-by '\001'--export-dir ‘/hive/hmbbs_${yesterday}’
  #delete hive tables
  hive -e "drop table hmbbs_${yesterday}"
  完善执行的shell脚本:
  1、初始化数据的脚本(历史数据)
  2、每日执行的脚本
  mv upload_to_hdfs.sh hmbbs_core.sh
  vi hmbbs_daily.sh
  #!/bin/sh
  yesterday=`date --date='1 days ago' +%Y_%m_%d`
  hmbbs_core.sh  $yesterday
  chmod u+x hmbbs_daily.sh
  crontab -e
  * 1 * * * /apache_logs/hmbbs_daily.sh
  vi hmbbs_init.sh
  #!/bin/sh
  #hive -e "CREATE EXTERNAL TABLE hmbbs(ip string, atime string, url string) PARTITIONED BY (logdate string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION   '/hmbbs_cleaned';"
  s1=`date --date="$1" +%s`
  s2=`date +%s`
  s3=$(((s2-s1)/3600/24))
  for ((i=$s3;i>0;i--))
  do
  tmp=`date --date="$i days ago" +%Y_%m_%d`
  echo $tmp
  done

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425202-1-1.html 上篇帖子: 【大数据系列】Hadoop DataNode读写流程 下篇帖子: 使用 Hadoop 进行语料处理(面试题)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表