设为首页 收藏本站
查看: 1566|回复: 0

[经验分享] 8.spark core之读写数据

[复制链接]

尚未签到

发表于 2019-1-30 10:28:05 | 显示全部楼层 |阅读模式
    spark支持多种数据源,从总体来分分为两大部分:文件系统和数据库。

文件系统
    文件系统主要有本地文件系统、Amazon S3、HDFS等。
    文件系统中存储的文件有多种存储格式。spark支持的一些常见格式有:


格式名称
结构化
说明




文件文件

普通文件文件,每行一条记录


JSON
半结构化
常见的基于文本的半结构化数据


CSV

常见的基于文本的格式,在电子表格应用中使用


SequenceFiles

一种用于键值对数据的常见Hadoop文件格式

文本文件


  •   读取

    •   读取单个文件,参数为文件全路径,输入的每一行都会成为RDD的一个元素。

      • python

      input = sc.textFile("file://opt/module/spark/README.md")

      • scala

      val input = sc.textFile("file://opt/module/spark/README.md")

      • java

      JavaRDD input = sc.textFile("file://opt/module/spark/README.md")

    • 读取多个文件时,可以使用textFile将参数改为目录或以逗号文件的多个文件名即可。如果是小文件,也可以使用wholeTextFiles读取为一个Pair RDD(键是文件名,值是文件内容)。

    val input = sc.wholeTextFiles("file://opt/module/spark/datas")
    val result = input.mapValues{
    y => {
    val nums = y.split(" ").map(x => x.toDouble)
    nums.sum / nums.size.toDouble
    }
    }

  • 写入
    输出文本文件时,可使用saveAsTextFile()方法接收一个目录,将RDD中的内容输出到目录中的多个文件中。

```
result.saveAsTextFile(outputFile)
```
JSON


  •   读取

    • 将数据作为文本文件读取,然后使用JSON解析器对数据进行解析。
    • python使用内置库读取JSON

    import json
    ...
    input = sc.textFile("file.json")
    data = input.map(lambda x: json.loads(x))

    • scala使用Jackson读取JSON

    import com.fasterxml.jackson.databind.ObjectMapper
    import com.fasterxml.jackson.module.scala.DefaultScalaModule
    ...
    case class Person(name: String, lovesPandas: Boolean)
    ...
    val input = sc.textFile("file.json")
    val mapper = new ObjectMapper()
    mapper.registerModule(DefaultScalaModule)
    val result = input.flatMap(record => {
    try {
    Some(mapper.readValue(record, classOf[Person]))
    } catch {
    case e: Exception => None
    }
    })

    • java使用Jackson读取JSON

    class ParseJson implements FlatMapFunction {
    public Iterable call(Iterator lines) throws Exception {
    ArrayList people = new ArrayList();
    ObjectMapper mapper = new ObjectMapper();
    while(lines.hasNext()) {
    String line = lines.next();
    try {
    people.add(mapper.readValue(line, Person.class));   
    } catch(Exception e) {
    //跳过失败的数据
    }
    }
    return people;
    }
    }
    JavaRDD input = sc.textFile("file.json");
    JavaRDD result = input.mapPartitions(new ParseJson());

  •   写入

    • 使用JSON解析器将结构化的RDD转为字符串RDD,然后使用文本文件API输出。
    • python

    (data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile)

    • scala

    result.filter(p => p.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)

    • java

    class WriteJson implements FlatMapFunction {
    public Iterable call(Iterator people) throws Exception {
    ArrayList text = new ArrayList();
    ObjectMapper mapper = new ObjectMapper();
    while(people.hasNext()) {
    Person person = people.next();
    text.add(mapper.writeValueAsString(person));
    }
    return text;
    }
    }
    JavaRDD result = input.mapPartitions(new ParseJson()).filter(new LikesPandas());
    JavaRDD formatted = result.mapPartitions(new WriteJson());
    formatted.saveAsTextFile(outfile);


CSV与TSV
    CSV与TSV文件每行都有固定的字段,字段之间使用分隔符(CSV使用逗号;tsv使用制表符)分隔。


  •   读取

    •   将csv或tsv文件当作普通文本文件读取,然后使用响应的解析器进行解析,同json处理方式。

    •   python使用内置库读取csv

      • 文件中所有字段没有包含换行符

      import csv
      import StringIO
      ...
      def loadRecord(line):
      input = StringIO.StringIO(line)
      reader = csv.DictReader(input, fieldnames=["name","favouriteAnimal"])
      return reader.next()
      """读取每行记录"""   
      input = sc.textFile(inputFile).map(loadRecord)

      • 文件中的字段包含换行符

      def loadRecords(fileNameContents):
      input = StringIO.StringIO(fileNameContents[1])
      reader = csv.DictReader(input, fieldnames=["name","favoriteAnimal"])
      return reader
      """读取整个文件"""
      fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)

    •   scala使用opencsv库读取csv

      • 文件中所有字段没有包含换行符

      import Java.io.StringReader
      import au.com.bytecode.opencsv.CSVReader
      ...
      val input = sc.textFile(inputFile)
      val result = input.map{
      line => {
      val reader = new CSVReader(new StringReader(line))
      reader.readNext()
      }
      }

      • 文件中的字段包含换行符

      case class Person(name: String, favoriteAnimal: String)
      val input = sc.wholeTextFiles(inputFile)
      val result = input.flatMap(
      case(_, txt) => {
      val reader = new CSVReader(new StringReader(txt))
      reader.readAll().map(x => Person(x(0), x(1)))
      }

    •   java使用opencsv库读取csv

      • 文件中所有字段没有包含换行符

      import Java.io.StringReader
      import au.com.bytecode.opencsv.CSVReader
      ...
      public static class ParseLine implements Function {
      public String[] call(String line) throws Exception {
      CSVReader reader = new CSVReader(new StringReader(line));
      return reader.readNext();
      }
      }
      JavaPairRDD csvData = sc.textFile(inputFile).map(new ParseLine());

      • 文件中的字段包含换行符

      public static class ParseLine implements FlatMapFunction {
      public Iterable call(Tuple2 file) throws Exception {
      CSVReader reader = new CSVReader(new StringReader(file._2);
      return reader.readAll();
      }
      }
      JavaRDD keyedRDD = sc.wholeTextFiles(inputFile).flatMap(new ParseLine());



  •   写入

    • csv或tsv文件输出时,将个字段转为指定顺序的数组,然后采用普通文本文件的方式进行输出。
    • python

    def writeRecords(records):
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
    for record in records:
    writer.writerow(record)
    return [output.getValue()]
    pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

    • scala

    pandasLovers.map(person => List(person.name, person.favoriteAnimal).toArray).mapPartitions{
    people => {
    val stringWriter = new StringWriter()
    val csvWriter = new CSVWriter(stringWriter)
    csvWriter.writeAll(people.toList)
    Iterator(stringWriter.toString)   
    }
    }.saveAsTextFile(outFile)


SequenceFile
    SequenceFile是键值对形式的常用Hadoop数据格式。由于Hadoop使用一套自定义的序列化框架,因此SequenceFile的键值对类型需实现Hadoop的Writable接口。


  •   读取

    • python

    data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")

    • scala

    val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).map{case (x, y) => (x.toString, y.get())}   

    • java

    public static class ConvertToNativeTypes implements PairFunction {
    public Tuple2 call(Tuple2 record) {
    return new Tuple2(record._1.toString(), record._2.get());
    }
    }
    JavaPairRDD result = sc.sequenceFile(fileName, Text.class, IntWritable.class).mapToPair(new ConvertToNativeTypes());

  •   写入

    • python

    data = sc.parallelize([("Panda", 3), ("Kay", 6), ("Snail", 2)])
    data.saveAsSequeceFile(outputFile)

    • scala

    val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
    data.saveAsSequenceFile(outputFile)

    • java(java中没有saveAsSequenceFile方法,用自定义hadoop格式的方式实现)

    public static class ConvertToWritableTypes implements PairFunction {
    public Tuple2 call(Tuple2 record) {
    return new Tuple2(new Text(record._1), new IntWritable(record._2));
    }
    }
    JavaPairRDD result = sc.parallelizePairs(input).mapToPair(new ConvertToNativeTypes());
    result.saveAsHadoopFile(fileName, Text.class, IntWritable.class, SequenceFileOutputFormat.class);


数据库
    数据库主要分为关系型数据库(MySQL、PostgreSQL等)和非关系型数据库(HBase、ElasticSearch等)。

JDBC数据库连接
    spark使用JDBC访问关系型数据库(MySQL、PostgreSQL等),只需要构建一个org.apache.spark.rdd.JdbcRDD即可。

def createConnection() = {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "root")
}
def extractValues(r: ResultSet) = {
(r.getInt(1), r.getString(2))
}
val data = new JdbcRDD(sc, createConnection,
"SELECT * FROM panda WHERE id >= ? AND id

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669500-1-1.html 上篇帖子: Spark笔记整理(二):RDD与spark核心概念名词 下篇帖子: spark 初体验
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表