select
TRANSFORM(p.joint_attr_values, p.collect_product_id, p.released_id)
USING 'python split_product_attrs.py'
as (custom_attr , custom_attr_value, collect_product_id, released_product_id)
from
(select pc.joint_attr_values, pm.collect_product_id, pm.released_id from
products_compared_${hiveconf:wid_version} pc
left join products_merged_${hiveconf:wid_version} pm
on pc.collect_id = pm.collect_product_id
where pm.released_id is not null
and pc.joint_attr_values is not null and pc.joint_attr_values != '' and pc.joint_attr_values != 'null') as p ;
下面是python的脚本,用于将三列转换为四列,这里就比较简单了,主要用于测试,代码随便写了一下
#!/usr/bin/python
# #_*_ coding: utf-8 _*_
import sys
import datetime
# "规格:RN1-10/50;规格:RN1-10/50;规格:RN1-10/50"
# ["规格:RN1-10/51;规格:RN1-10/52;规格:RN1-10/53", '11', '22']
# ["规格", "RN1-10/51", '11', '22']
# ["规格", "RN1-10/52", '11', '22']
# ["规格", "RN1-10/53", '11', '22']
for line in sys.stdin:
values = line.split('\t')
values = [ i.strip() for i in values ]
tmp = values[0]
key_values = tmp.split(";")
for kv in key_values:
k = kv.split(":")[0]
v = kv.split(":")[1]
print '\t'.join([k,v,values[1],values[2]])
add file /www/FCCS_Data/ComETL/hive/sql_map/demo.py ;
from access_fccs select TRANSFORM (time) using 'python demo.py' where week=41 limit 10 ;
或者:
add file /www/FCCS_Data/ComETL/hive/sql_map/demo.py;
select TRANSFORM (time) using 'python demo.py' as (time) from (select * from access_fccs where week=41 limit 10) a ;
上面 SQL的意思是 将查询结果集中的 time 通过 demo.py 进行处理 返回相应的结果,至于都做什么处理了,请看下面的demo.py代码。
这里我们需要注意的是,一定要先 将map脚本添加到分布式缓存中,否则会报错 metadata.HiveException: [Error 20003]: An error occurred when trying to close the Operator running your custom script. at 这种错误
特别说明 这里的 路径为本地路径 不是分布式HDFS 路径,新手比较容易搞混
然后看看我们的demo.py 是怎么写的
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import sys
import datetime
import time
#把时间戳转成字符串形式
def timestamp_toString(stamp):
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(stamp))
for line in sys.stdin:
print timestamp_toString(float(line))