设为首页 收藏本站
查看: 662|回复: 0

[经验分享] beibei_sum_spark(python)

[复制链接]

尚未签到

发表于 2018-8-12 12:02:53 | 显示全部楼层 |阅读模式
  
# -*-coding:utf8-*-#
  
__author__ = 'hash'
  
"""
  
create time:16/7/5 15:42
  
"""
  
from datetime import datetime, timedelta
  
# os.environ['SPARK_HOME'] = "/Users/play/software/spark"  # 绝对路径
  
# sys.path.append("/Users/play/software/spark/python")
  
# print os.environ['SPARK_HOME']
  
from pyspark import SparkContext, SparkConf
  
# Initialize SparkContext
  
# sc = SparkContext('local')
  
conf = SparkConf().setAppName("The GMV Sum BeiBei")
  
conf.set('spark.logConf', False)
  
sc = SparkContext(conf=conf)
  
today_str='2016-07-05'
  
today = datetime.strptime(today_str, "%Y-%m-%d")
  
# today = datetime.today()
  
today_str = today.strftime("%Y-%m-%d")
  
# 指定日期,前一天
  
ytday = today - timedelta(days=1)
  
ytday_str = ytday.strftime("%Y-%m-%d")
  
base_path = "hdfs://master:9000/super_db/"
  
category = "beibei-com"
  
# source_path_current='/Users/play/TEMP/7-4/log/2016-07-03/beibei-com_2016-07-03*'
  
# source_path_current = '/Users/play/TEMP/7-4/log/test/test.txt'
  
# source_path_next = '/Users/play/TEMP/7-4/log/2016-07-04/beibei-com_2016-07-04-00*'
  
# yesterday
  
# source_path_current = '/super_db/raw_db/' + category + '/' + category + '_item/2016/' + ytday_str + '/' + category + '_' + ytday_str + '*'
  
# today
  
# source_path_next = '/super_db/raw_db/' + category + '/' + category + '_item/2016/' + today_str + '/' + category + '_' + today_str + '-00*'
  
#
  
source_path_current = '/super_db/raw_db/beibei/beibei_item/2016/beibei_item_07_04.txt'
  
source_path_next = '/super_db/raw_db/beibei/beibei_item/2016/beibei_item_07_05.txt'
  
# target_data_path = '/Users/play/TEMP/7-4/data/rs.txt'
  
# target_stat_path = '/Users/play/TEMP/7-4/stat/rs.txt'
  
# target_cat_stat_path = '/Users/play/TEMP/7-4/stat_by_cat/rs.txt'
  
target_stat_path = base_path + "analytics_db/" + category + "/" + category + "_item/2016/" + ytday_str + "_stat2"
  
print target_stat_path
  
# def filter_log_item(x):
  
#     sp1 = x.split("#&#")
  
#     if len(sp1) == 10:
  
#         try:
  
#             x1 = datetime.strptime(sp1[9], '%Y-%m-%d %H:%M:%S')
  
#             return True
  
#         except:
  
#             return False
  
#             # return True
  
#     else:
  
#         return False
  
def filter_log_item2(x):
  
    sp1 = x.split("#&#")
  
    if len(sp1) == 28:
  
        try:
  
            x1 = datetime.strptime(sp1[-1], '%Y-%m-%d %H:%M:%S')
  
            return True
  
        except:
  
            return False
  
            # return True
  
    else:
  
        return False
  
def get_some_data(x):
  
    sp1 = x.split("#&#")
  
    # if len(sp1)<10:return None
  
    id = sp1[0].strip()
  
    cat = sp1[2].strip()
  
    brand = sp1[3].strip()
  
    price = float(sp1[4])
  
    orig_price = float(sp1[5])
  
    sales = int(sp1[6])
  
    start_date = sp1[8].split(":")[0].strip()
  
    start_date1 = datetime.strptime(start_date, "%Y-%m-%d %H")
  
    price= 0 if price==-1 else price
  
    orig_price= 0 if orig_price==-1 else orig_price
  
    sales= 0 if sales==-1 else sales
  
    return ((id, start_date1), (start_date1, sales, price, cat, brand, orig_price))
  
def get_some_data2(x):
  
    sp1 = x.split("#&#")
  
    # if len(sp1)<10:return None
  
    # id = sp1[0].strip()
  
    # cat = sp1[2].strip()
  
    # brand = sp1[3].strip()
  
    # price = float(sp1[4])
  
    # orig_price = float(sp1[5])
  
    # # sales = int(sp1[6])
  
    # start_date = sp1[8].split(":")[0].strip()
  
    # start_date1 = datetime.strptime(start_date, "%Y-%m-%d %H")
  
    id = sp1[3]
  
    start_date = sp1[27].split(":")[0].strip()
  
    start_date1 = datetime.strptime(start_date, "%Y-%m-%d %H")
  
    sales = sp1[8]
  
    price = sp1[6]
  
    orig_price = sp1[9]
  
    # stock = int(sp1[17])
  
    if "\N" == sales:sales = 0
  
    else:sales = int(sales)
  
    if "\N" == price:
  
        price = 0.0
  
    else:
  
        price = float(price)
  
    if "\N" == orig_price:
  
        orig_price = 0.0
  
    else:
  
        orig_price = float(orig_price)
  
    # return (id, (start_date, sales, stock, price))
  
    cat=""
  
    brand=""
  
    return ((id, start_date1), (start_date1, sales, price, cat, brand, orig_price))
  
# 昨天销售数据
  
yesterday_sales_rdd = sc.textFile(source_path_current).filter(filter_log_item2).map(get_some_data2)
  
# ys1 = yesterday_sales_rdd.collect()
  
# print ys1
  
# print get_some_data(ys1)
  
# 今天0点的数据
  
today_sales_rdd = sc.textFile(source_path_next).filter(filter_log_item2).map(get_some_data2)
  
# print today_sales_rdd.collect()
  
# 每小时的商品数,只要有日志记录
  
gbk = yesterday_sales_rdd.groupByKey()
  
# from collections import Counter
  
def couter(x, y):
  
    k1 = x.keys()
  
    k2 = y.keys()
  
    s1 = set(k1 + k2)
  
    d1 = dict()
  
    for h in s1:
  
        d1[h] = x.get(h, 0) + y.get(h, 0)
  
    return d1
  
# hour_sale_num = gbk.keys().map(lambda dd: {dd[1].hour: 1}).reduce(lambda x, y: dict(Counter(x) + Counter(y)))
  
hour_sale_num = gbk.keys().map(lambda dd: {dd[1].hour: 1}).reduce(lambda x, y: couter(x, y))
  
print hour_sale_num
  
# 在售商品个数
  
spu_sum = yesterday_sales_rdd.count()
  
#
  
# s = '2016-07-04 00:15:02'
  
# datetime.strptime(s, "%Y-%m-%d %H:%M:%S")
  
# 销售汇总,昨天和今天
  
all_data = yesterday_sales_rdd.union(today_sales_rdd)
  
# print 'all_data.first()',all_data.collect()
  
sales_rdd = all_data.map(lambda x: (x[0][0], x[1]))
  
print sales_rdd.first()
  
grouped_sales_rdd = sales_rdd.groupByKey().mapValues(list)
  
print grouped_sales_rdd.first()
  
# start_date, sales, price, cat, brand, orig_price
  
def date_dict(lst):
  
    d2 = dict()
  
    for i, x in enumerate(lst):
  
        #     print x
  
        #     print x[0].hour
  
        d2[x[0]] = x[1:]
  
    return d2
  
# date_dict(sale_1[1])
  
def sale_count24(dd):
  
    sale_list = date_dict(dd)
  
    #         print sale_list
  
    sort = sorted(sale_list)
  
    sales = []
  
    for x in reversed(range(24)):
  
        #     print x
  
        td1 = ytday + timedelta(hours=x)
  
        td2 = ytday + timedelta(hours=x - 1)  # 前1小时
  
        try:
  
            indx = sort.index(td1)
  
            td2 = sort[indx - 1]  # 列表的前一项
  
            s1 = int(sale_list[td1][0])
  
            p1 = float(sale_list[td1][1])
  
            s2 = int(sale_list[td2][0])
  
            p2 = float(sale_list[td2][1])
  
            sale = s1 - s2
  
            sale = sale if sale > 0 else 0
  
            money = s1 * p1 - s2 * p2
  
            money = money if money > 0 else 0
  
            sales.append((sale, money))
  
            continue
  
        except:
  
            # 不在列表内
  
            sale = 0
  
            money = 0.0
  
            sales.append((sale, money))
  
            continue
  
    return list(reversed(sales))  # 要倒置一下
  
# gsr=grouped_sales_rdd.collect()
  
# example1=gsr[1][1]
  
# for x in example1:
  
#     print x
  
# print sale_count24(example1)
  
hour_sales_rdd = grouped_sales_rdd.map(lambda (a, b): (a, sale_count24(b)))
  
def sum2(a, b):
  
    if len(a) == 2:
  
        a = a[1]
  
    if len(b) == 2:
  
        b = b[1]
  
    su = []
  
    for x, y in zip(a, b):
  
        su.append((x[0] + y[0], x[1] + y[1]))
  
    return su
  
hour_sales = hour_sales_rdd.reduce(lambda a, b: sum2(a, b))
  
# print hour_sales
  
# 每小时,累积销量,累积销售额
  
num = [x[0] for x in hour_sales]
  
money = [x[1] for x in hour_sales]
  
sale_num_sum = sum(num)
  
# sum(money) / float(len(money))
  
sale_money_sum = sum(money)
  
# SPU 每小时销售额>0的商品个数
  
spu_sale = hour_sales_rdd.map(lambda a: [1 if x[0] > 0 else 0 for x in a]).reduce(
  
    lambda a, b: [x + y for x, y in zip(a, b)])
  
print 'spu_sale', spu_sale, len(spu_sale)
  
# 平均价格
  
# yesterday_sales_rdd.first()
  
# 取出现价
  
now_price_rdd = yesterday_sales_rdd.map(lambda x: x[1][2])
  
# now_price_rdd.first()
  
# 取出原价
  
org_price_rdd = yesterday_sales_rdd.map(lambda x: x[1][-1])
  
# org_price_rdd.first()
  
avg_price = now_price_rdd.reduce(lambda a, b: (a + b) / 2.0)
  
avg_org_price = org_price_rdd.reduce(lambda a, b: (a + b) / 2.0)
  
# 折扣
  
discount = avg_price / avg_org_price
  
# discount
  
# SPU 每小时销售额>0的商品个数
  
def sale_item_to_hour(x):
  
    idd = x[0]
  
    salist = x[1]
  
    hd = dict()
  
    for h in range(24):
  
        if salist[h][0] > 0:  # 有销量
  
            hd[h] = [idd]
  
    return hd
  
def reduce_hour_dict_sum(x, y):
  
    #     print Counter(x),Counter(y)
  
    nd = dict()
  
    for h in range(24):
  
        nd[h] = x.get(h, []) + y.get(h, [])
  
    return nd
  
# reduce_hour_dict_sum(ssc[6],ssc[10])
  
spu_sale = hour_sales_rdd.map(sale_item_to_hour).reduce(lambda x, y: reduce_hour_dict_sum(x, y))
  
##每小时销售商品的个数
  
spu_sale_num = dict()
  
for h, lst in spu_sale.items():
  
    spu_sale_num[h] = len(lst)
  
set1 = reduce(set.union, [set(x) for x in spu_sale.values()])
  
spu_sale_sum = len(set1)  # 商品去重后的总数
  
# json
  
js = dict()
  
for hour in range(24):
  
    sale_num = hour_sales[hour][0]
  
    sale_money = hour_sales[hour][1]
  
    spu = hour_sale_num.get(hour, 0)
  
    spu_sale_hour = spu_sale_num.get(hour, 0)
  
    js[hour] = {'sales': sale_num, 'gmv': sale_money, 'spu': spu, 'spu_saled': spu_sale_hour}
  
# spu_sale_sum ,set 去重
  
js['sum'] = {'sales_sum': sale_num_sum, 'gmv_sum': sale_money_sum,
  
             'spu_sum': spu_sum, 'spu_saled_sum': spu_sale_sum,
  
             'avg_price': avg_price, 'avg_org_price': avg_org_price, 'discount': discount
  
             }
  
print '----------------------------'
  
print js
  
"""
  
js={0: {'spu': 3, 'spu_saled': 2, 'sales': 4394, 'gmv': 1739302.0}, 1: {'spu': 3, 'spu_saled': 2, 'sales': 8048, 'gmv': 716863.0}, 2: {'spu': 3, 'spu_saled': 3, 'sales': 3, 'gmv': 798.0}, 3: {'spu': 3, 'spu_saled': 2, 'sales': -8046, 'gmv': -715563.0}, 4: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 5: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 6: {'spu': 3, 'spu_saled': 3, 'sales': 8057, 'gmv': 718666.0}, 7: {'spu': 3, 'spu_saled': 2, 'sales': -2480, 'gmv': -1687642.0}, 8: {'spu': 3, 'spu_saled': 2, 'sales': 560, 'gmv': 1632733.0}, 9: {'spu': 3, 'spu_saled': 1, 'sales': -8611, 'gmv': -2349332.0}, 10: {'spu': 3, 'spu_saled': 2, 'sales': 8055, 'gmv': 716835.0}, 11: {'spu': 3, 'spu_saled': 1, 'sales': -7496, 'gmv': 916460.0}, 12: {'spu': 3, 'spu_saled': 1, 'sales': 5571, 'gmv': -972225.0}, 13: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 14: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 15: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 16: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 17: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 18: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 19: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 20: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 21: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 22: {'spu': 3, 'spu_saled': 0, 'sales': -8055, 'gmv': -716895.0}, 23: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 'sum': {'discount': 0.639766081882126, 'gmv_sum': 0.0, 'avg_org_price': 244.28571429350995, 'sales_sum': 0, 'avg_price': 156.28571429333533, 'spu_sum': 36, 'spu_saled_sum': 3}}
  
"""
  
# 保存
  
# stat_rdd = sc.parallelize(json.dumps(str(js),ensure_ascii=False),numSlices=1)
  
stat_rdd = sc.parallelize([str(js)])
  
# stat_rdd.saveAsTextFile(target_stat_path)
  
# stat_rdd.repartition(1).saveAsTextFile(target_stat_path)
  
# stat_rdd.map(json.dumps).saveAsPickleFile(target_stat_path)
  
stat_rdd.repartition(1).saveAsTextFile(target_stat_path)
  
print  'Saved:', target_stat_path
  
# from pyspark import SQLContext
  
# languagesDF= SQLContext.load(js)
  
# languagesDF.write.json(target_stat_path)

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-550600-1-1.html 上篇帖子: Python---生成器 下篇帖子: 005-Python 变量类型
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表