设为首页 收藏本站
查看: 1310|回复: 0

[经验分享] 知识点

[复制链接]

尚未签到

发表于 2019-1-30 11:29:07 | 显示全部楼层 |阅读模式
  主页博客地址:Spark小节
https://blog.icocoro.me

  Spark处理字符串日期的max和min的方式
Spark处理数据存储到Hive的方式
Spark处理新增列的方式map和udf、functions
Spark处理行转列pivot的使用
Python 3.5.3
Spark1.6.2


Spark处理字符串日期的max和min的方式

  一般是字符串类型的日期在使用Spark的agg求max时,是不正确的,API显示只支持数值型的max、min
hive的SQL查询引擎是支持字符串日期的max和min的


字符串日期转为时间戳再聚合

  unix_timestamp


public static Column unix_timestamp(Column s)
Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale, return null if fail.
Parameters:
s - (undocumented)
Returns:
(undocumented)
Since:
1.5.0
from pyspark.sql import functions as F
df.withColumn('startuptime_stamp', F.unix_timestamp('startuptime'))
使用HiveSQL

select device_id, max(startuptime) as max_startuptime, min(startuptime) as min_startuptime from app_table group by device_id
Spark处理数据存储到Hive的方式

  通常Spark任务处理后的结果数据会存储到Hive表中,可以先保存至HDFS目录再load、最方便还是直接使用临时表和HiveContext插入数据


saveAsTextFile & load data

  repartition根据实际文件大小进行调整,数据比较小时,保存成一个文件


df.map(lambda r: func).repartition(1).saveAsTextFile(data_dir)
  先删除分区,如果已经存在的话
再覆盖原来的数据【方便重新重复跑或修复数据】
此处使用shell,也可使用HiveContext的sql


alter table app_table drop if exists partition(datestr='$day_01');
load data inpath 'hdfs://xx/out/$day_01' overwrite into table app_table partition(datestr='$day_01');
hivectx.sql & insert

app_table1_df.registerTempTable("app_table1_tmp")
app_table2_df.registerTempTable("app_table2_tmp")
hivectx.sql("set spark.sql.shuffle.partitions=1")
hivectx.sql("alter table app_table drop if exists partition(datestr='%s')" % daystr)
hivectx.sql("insert overwrite table app_table partition(datestr='%s') select * from app_table1_tmp" % daystr)
hivectx.sql("insert into app_table partition(datestr='%s') select * from app_table2_tmp" % daystr)
Spark处理新增列的方式map和udf、functions

  Spark在处理数据转换时,通常需要使用map、flatmap等操作,其中使用map会产生新的列或修改某列字段的值
Spark同样支持自定义函数UDF以及提供了类似Hive内置函数的各种各样的处理函数


map

  需要定义函数和StructType
忽略数值判断细节和精度等


from pyspark.sql.types import *
def a_func(_):
return _['id'], _['cnt1'], _['cnt2'], _['cnt1'] / (_['cnt1'] + _['cnt1'])
a_schema = StructType([
StructField('id', StringType(), True),
StructField('cnt1', IntegerType(), True),
StructField('cnt2', IntegerType(), True),
StructField('cnt1_rate', IntegerType(), True)
])
a_new_df = sqlctx.createDataFrame(df.select('id', 'cnt1', 'cnt2').map(a_func), a_schema)
udf

  需要定义函数和UDF
忽略数值判断细节和精度等


def a_func(cnt1, cnt2):
return cnt1 / (cnt1 + cnt2)
a_udf = F.udf(a_func, IntegerType())
a_new_df = df.withColumn('cnt1_rate', a_udf(df['cnt1'], df['cnt2'])
functions

  处理类似日期字符串的格式转换、等等等
https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html


Spark处理行转列pivot的使用

  在使用SQL查询数据时,很多情况下需要将行转为列,以有利于数据的展示和不同维度需求的利用
一般可采用子查询case when、连续join、字段补全union的形式
Spark的DataFrame中可以通过GroupedData的pivot函数来实现


df.groupBy(['course_name']).pivot('daystr').sum('score')
df.groupBy(['course_name']).pivot('daystr').count()
  转换前

daystr course_name score
2017-11-15 yuwen 1
2017-11-15 yuwen 1
2017-11-15 shuxue 1
2017-11-15 yingyu 2
2017-11-16 yuwen 1
2017-11-16 shuxue 1
2017-11-16 yingyu 2
  转换后

course_name 2017-11-15 2017-11-16
yuwen 2 1
shuxue 1 1
yingyu 2 2
course_name 2017-11-15 2017-11-16
yuwen 2 1
shuxue 1 1
yingyu 1 1



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669566-1-1.html 上篇帖子: spark 2.2.0 高可用搭建 下篇帖子: 【总结】Spark优化(1)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表