|
# -*- coding:utf-8 -*-
#
!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding('utf-8')
# 将字段和元数据匹配, 返回迭代器
def read_from_input(file, separator, columns):
for line in file:
if line is None or line == '':
continue
data_list = mapper_input(line, separator)
if not data_list:
continue
item = None
# 最后3列, 年月日作为partitionkey, 无用
if len(data_list) == len(columns) - 3:
item = dict(zip(columns, data_list))
elif len(data_list) == len(columns):
item = dict(zip(columns, data_list))
if not item:
continue
yield item
def index_columns(db, table):
with open('desc.%s.%s' % (db, table), 'r') as fr:
structure_list = deserialize(fr.read())
return [column.get('col_name') for column in structure_list]
# map入口
def main(separator, columns):
items = read_from_input(sys.stdin, separator, columns)
mapper_result = {}
for item in items:
mapper_plugin_1(item, mapper_result)
mapper_plugin_2(item, mapper_result)
for route_key, route_value in mapper_result.iteritems():
for key, value in route_value.iteritems():
ret_dict = dict()
ret_dict['route_key'] = route_key
ret_dict['key'] = key
ret_dict.update(value)
mapper_output('route_total', ret_dict)
def mapper_plugin_1(item, mapper_result):
# key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducer
key = 'route1'
area = item.get('area')
district = item.get('district')
order_id = item.get('order_id')
if not area or not district or not order_id:
returntry:
# total统计
mapper_result.setdefault(key, {})
mapper_result[key].setdefault('_'.join([area, district]), {})
mapper_result[key]['_'.join([area, district])].setdefault('count', 0)
mapper_result[key]['_'.join([area, district])].setdefault('order_id', [])
mapper_result[key]['_'.join([area, district])]['count'] += 1
if len(mapper_result[key]['_'.join([area, district])]['order_id']) < 10:
mapper_result[key]['_'.join([area, district])]['order_id'].append(order_id)
except Exception, ex:
pass
def mapper_plugin_2(item, mapper_result):
key = 'route2'
strive_time = item.get('strive_time')
order_id = item.get('order_id')
if not strive_time or not order_id:
return
try:
day_hour = strive_time.split(':')[0]# total统计
mapper_result.setdefault(key, {})
mapper_result[key].setdefault(day_hour, {})
mapper_result[key][day_hour].setdefault('count', 0)
mapper_result[key][day_hour].setdefault('order_id', [])
mapper_result[key][day_hour]['count'] += 1
if len(mapper_result[key][day_hour]['order_id']) < 10:
mapper_result[key][day_hour]['order_id'].append(order_id)
except Exception, ex:
pass
def serialize(data, type='json'):
if type == 'json':
try:
return json.dumps(data)
except Exception, ex:
return ''
elif type == 'pickle':
try:
return pickle.dumps(data)
except Exception, ex:
return ''
else:
return ''
def deserialize(data, type='json'):
if type == 'json':
try:
return json.loads(data)
except Exception, ex:
return []
elif type == 'pickle':
try:
return pickle.loads(data)
except Exception, ex:
return []
else:
return []
def mapper_input(line, separator='\t'):
try:
return line.split(separator)
except Exception, ex:
return None
def mapper_output(key, data, separator='\t'):
key = str(key)
data = serialize(data)
print '%s%s%s' % (key, separator, data)
# print >> sys.stderr, '%s%s%s' % (key, separator, data)
if __name__ == '__main__':
db = sys.argv[1]
table = sys.argv[2]
columns = index_columns(db, table)
main('||', columns) |
|