|
1.调用脚本:
#!/bin/bash
# 设置环境变量
source /etc/bashrc
PYTHON=/usr/local/bin/python
if [ $# -eq 1 ]
then
DATE=$1
else
DATE=`date -d '1 day ago' +'%Y-%m-%d'`
fi
echo "start time:`date +'%Y-%m-%d %H:%m:%d'`"
startTm=`date +%s`
YESDAY=`date -d "-1 day $DATE" +%Y-%m-%d`
cd `dirname $0`
pig run_basic_data.pig
2.pig程序处理:
SET pig.exec.reducers.bytes.per.reducer 300000000;
register /hadoop/pig/lib/piggybank.jar
register /hadoop/pig/lib/hive-exec-0.11.0.jar
register /hadoop/pig/lib/hive-common-0.11.0.jar
SET mapred.output.compress false;
DEFINE run_basic_data `run_basic_data.py` SHIP('run_basic_data.py');
mob_log = LOAD '/user/hive/warehouse/fact_call_data/dt=2014-05-0[4-9]' USING PigStorage('\t') AS (l_date,hour,province,city,calltype,state,callstate,depart,queue,call_times,link_times,quene_times,just_one,waittime,speaktime,tec_speaktime,linktime,dt);
mob_log_filter = filter mob_log by state!='';
mob_user_log = foreach mob_log_filter generate l_date,province,state;
mob_user_log_distinct = distinct mob_user_log;
result = STREAM mob_user_log_distinct THROUGH run_basic_data AS (l_date:chararray,province:chararray,state:chararray);
dump result;
3.python脚本:
#!/usr/local/bin/python
#encoding:utf8
import sys
import os
def delPigChr(line):
line_no_mul = line.strip()
line_no_bracket = line_no_mul[1:(len(line_no_mul)-1)]
return line_no_bracket
def main():
dict1={}
#dict2={‘name’:'earth','port':'80'}
for lineStr in sys.stdin:
if not lineStr:
continue
line = delPigChr(lineStr)
cols = line.split(",")
if len(cols) ==3:
#print "=="+cols[0]+"=="+cols[1]+"--"+cols[2]
key = cols[0]+"_"+cols[1]
if dict1.has_key(key):
dict1[key]=dict1.get(key)+","+cols[2]
else:
dict1[key]=cols[2]
else:
print "error:"+line
for r in dict1:
print r+dict1.get(r)
if __name__ == "__main__":
main() |
|
|