设为首页 收藏本站
查看: 3776|回复: 0

[经验分享] Spark机器学习

[复制链接]

尚未签到

发表于 2019-1-30 11:07:17 | 显示全部楼层 |阅读模式
Spark机器学习
Pipelines中的主要概念
MLlib 提供的API可以通过Pipelines将多个复杂的机器学习算法结合成单个pipeline或者单个工作流。这个概念和scikit-learn里的概念类似,根据官方的说法是,此抽象概念的设计灵感来自于scikit-learn
·        DataFrame:通过Spark SQL 组件里的DataFrame作为机器学习的数据集。支持多种数据类型.比如 DataFrame 可以将文本,数据库等外部数据源划分为不同的列,包含特征向量, 特征值等。
·        Transformer: 一个 Transformer可以将一个DataFrame 转换成另一个DataFrame. 比如, 一个机器学习模型可以将带有特征值的DataFrame转换为一个带有模型预测结果数据的DataFrame.
·        Estimator:通过 DataFrame数据集进行训练 产生一个机器学习模型的算法。
·        Pipeline:联合多个 Transformer Estimator构成一个机器学习工作流
·        Parameter: 所有Transformer Estimator指定参数的共享API

DataFrame
DataFrame里广泛运用的数据结构,可以包含向量,文本,图片,以及结构化数据。DataFrame通过Spark SQL支持多种数据源。

工作流程如图所示:

DSC0000.jpg



机器学习中Pipleline流程图

正如图中所示,Pipeline有三个阶段,每个阶段要么是Transformer ,要么就是Estimator,这些阶段按照一定的顺序执行,执行的过程中,通过圆柱体代表的DataFrame类型的Raw text产生一个新的Words(DataFrame类型),最后建立了一个LogisticRegressionModel。图中的Tokenizer,HashingTF都是Transformer,而LogisticRegressionModel是Estimator 。
在Transformer 阶段,主要调用transform()方法进行计算。
Estimator阶段,主要调用fit()方法进行计算。

DAG Pipelines:多个阶段形成一个pipeline,同理,DAG Pipelines就是多个pipeline组成的一个有向无环图。
运行时检查:数据结构DataFrame中可以有各种各样的数据,但是在编译的时候不会检查数据的数据类型,而是在运行的时候才根据DataFrame的Schema来检查数据类型。
唯一ID标识:Pipeline的每一个阶段(stage)都通过id来进行唯一的标识,同一个相同的实列,比如HashingTF不会插入到同一个Pipeline俩次,因为每一个stage都有自身的唯一的ID来进行标识

保存和读取pipeline
代码案例:
Estimator, Transformer, 以及 Param综合案例

importorg.apache.spark.ml.classification.LogisticRegression
importorg.apache.spark.ml.linalg.{Vector,Vectors}
importorg.apache.spark.ml.param.ParamMap
importorg.apache.spark.sql.Row

// Prepare training data from a list of (label, features)tuples.
valtraining=spark.createDataFrame(Seq(
  (1.0,Vectors.dense(0.0,1.1,0.1)),
  (0.0,Vectors.dense(2.0,1.0,-1.0)),
  (0.0,Vectors.dense(2.0,1.3,1.0)),
  (1.0,Vectors.dense(0.0,1.2,-0.5))
)).toDF("label","features")

// Create a LogisticRegression instance. This instance is anEstimator.
vallr=newLogisticRegression()
// Print out the parameters, documentation, and any defaultvalues.
println("LogisticRegressionparameters:\n"+lr.explainParams()+"\n")

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parametersstored in lr.
valmodel1=lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced byan Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where namesare unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit usingparameters: "+model1.parent.extractParamMap)

// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
valparamMap=ParamMap(lr.maxIter->20)
  .put(lr.maxIter,30) // Specify 1 Param. This overwrites the original maxIter.
  .put(lr.regParam->0.1,lr.threshold->0.55) // Specify multiple Params.

// One can also combine ParamMaps.
valparamMap2=ParamMap(lr.probabilityCol->"myProbability") // Change output column name.
valparamMapCombined=paramMap++paramMap2

// Now learn a new model using the paramMapCombinedparameters.
// paramMapCombined overrides all parameters set earlier vialr.set* methods.
valmodel2=lr.fit(training,paramMapCombined)
println("Model 2 was fit usingparameters: "+model2.parent.extractParamMap)

// Prepare test data.
valtest=spark.createDataFrame(Seq(
  (1.0,Vectors.dense(-1.0,1.5,1.3)),
  (0.0,Vectors.dense(3.0,2.0,-0.1)),
  (1.0,Vectors.dense(0.0,2.2,-1.5))
)).toDF("label","features")

// Make predictions on test data using theTransformer.transform() method.
// LogisticRegression.transform will only use the 'features'column.
// Note that model2.transform() outputs a 'myProbability'column instead of the usual
// 'probability' column since we renamed thelr.probabilityCol parameter previously.
model2.transform(test)
  .select("features","label","myProbability","prediction")
  .collect()
  .foreach{caseRow(features:Vector,label:Double,prob:Vector,prediction:Double)=>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")
  }
Pipeline单独的案例代码

importorg.apache.spark.ml.{Pipeline,PipelineModel}importorg.apache.spark.ml.classification.LogisticRegressionimportorg.apache.spark.ml.feature.{HashingTF,Tokenizer}importorg.apache.spark.ml.linalg.Vectorimportorg.apache.spark.sql.Row
// Prepare training documents from a list of (id, text, label) tuples.val training = spark.createDataFrame(Seq(  (0L,"a b c d e spark",1.0),  (1L,"b d",0.0),  (2L,"spark f g h",1.0),  (3L,"hadoop mapreduce",0.0))).toDF("id","text","label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.val tokenizer =newTokenizer()  .setInputCol("text")  .setOutputCol("words")val hashingTF =newHashingTF()  .setNumFeatures(1000)  .setInputCol(tokenizer.getOutputCol)  .setOutputCol("features")val lr =newLogisticRegression()  .setMaxIter(10)  .setRegParam(0.001)val pipeline =newPipeline()  .setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.val model = pipeline.fit(training)
// Now we can optionally save the fitted pipeline to diskmodel.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to diskpipeline.write.overwrite().save("/tmp/unfit-lr-model")
// And load it back in during productionval sameModel =PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.val test = spark.createDataFrame(Seq(  (4L,"spark i j k"),  (5L,"l m n"),  (6L,"spark hadoop spark"),  (7L,"apache hadoop"))).toDF("id","text")
// Make predictions on test documents.model.transform(test)  .select("id","text","probability","prediction")  .collect()  .foreach{caseRow(id:Long, text:String, prob:Vector, prediction:Double)=>    println(s"($id, $text) --> prob=$prob, prediction=$prediction")  }  

  





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669539-1-1.html 上篇帖子: Spark Shuffle 下篇帖子: 大数据Spark视频教程下载
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表