超酷小 发表于 2019-1-30 10:28:05

8.spark core之读写数据

    spark支持多种数据源,从总体来分分为两大部分:文件系统和数据库。

文件系统
    文件系统主要有本地文件系统、Amazon S3、HDFS等。
    文件系统中存储的文件有多种存储格式。spark支持的一些常见格式有:




格式名称
结构化
说明




文件文件

普通文件文件,每行一条记录


JSON
半结构化
常见的基于文本的半结构化数据


CSV

常见的基于文本的格式,在电子表格应用中使用


SequenceFiles

一种用于键值对数据的常见Hadoop文件格式



文本文件


[*]  读取

[*]  读取单个文件,参数为文件全路径,输入的每一行都会成为RDD的一个元素。

[*]python

input = sc.textFile("file://opt/module/spark/README.md")

[*]scala

val input = sc.textFile("file://opt/module/spark/README.md")

[*]java

JavaRDD input = sc.textFile("file://opt/module/spark/README.md")

[*]读取多个文件时,可以使用textFile将参数改为目录或以逗号文件的多个文件名即可。如果是小文件,也可以使用wholeTextFiles读取为一个Pair RDD(键是文件名,值是文件内容)。

val input = sc.wholeTextFiles("file://opt/module/spark/datas")
val result = input.mapValues{
y => {
val nums = y.split(" ").map(x => x.toDouble)
nums.sum / nums.size.toDouble
}
}

[*]写入
    输出文本文件时,可使用saveAsTextFile()方法接收一个目录,将RDD中的内容输出到目录中的多个文件中。

```
result.saveAsTextFile(outputFile)
```
JSON


[*]  读取

[*]将数据作为文本文件读取,然后使用JSON解析器对数据进行解析。
[*]python使用内置库读取JSON

import json
...
input = sc.textFile("file.json")
data = input.map(lambda x: json.loads(x))

[*]scala使用Jackson读取JSON

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
...
case class Person(name: String, lovesPandas: Boolean)
...
val input = sc.textFile("file.json")
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val result = input.flatMap(record => {
try {
Some(mapper.readValue(record, classOf))
} catch {
case e: Exception => None
}
})

[*]java使用Jackson读取JSON

class ParseJson implements FlatMapFunction {
public Iterable call(Iterator lines) throws Exception {
ArrayList people = new ArrayList();
ObjectMapper mapper = new ObjectMapper();
while(lines.hasNext()) {
String line = lines.next();
try {
people.add(mapper.readValue(line, Person.class));   
} catch(Exception e) {
//跳过失败的数据
}
}
return people;
}
}
JavaRDD input = sc.textFile("file.json");
JavaRDD result = input.mapPartitions(new ParseJson());

[*]  写入

[*]使用JSON解析器将结构化的RDD转为字符串RDD,然后使用文本文件API输出。
[*]python

(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile)

[*]scala

result.filter(p => p.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)

[*]java

class WriteJson implements FlatMapFunction {
public Iterable call(Iterator people) throws Exception {
ArrayList text = new ArrayList();
ObjectMapper mapper = new ObjectMapper();
while(people.hasNext()) {
Person person = people.next();
text.add(mapper.writeValueAsString(person));
}
return text;
}
}
JavaRDD result = input.mapPartitions(new ParseJson()).filter(new LikesPandas());
JavaRDD formatted = result.mapPartitions(new WriteJson());
formatted.saveAsTextFile(outfile);


CSV与TSV
    CSV与TSV文件每行都有固定的字段,字段之间使用分隔符(CSV使用逗号;tsv使用制表符)分隔。


[*]  读取

[*]  将csv或tsv文件当作普通文本文件读取,然后使用响应的解析器进行解析,同json处理方式。

[*]  python使用内置库读取csv

[*]文件中所有字段没有包含换行符

import csv
import StringIO
...
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name","favouriteAnimal"])
return reader.next()
"""读取每行记录"""   
input = sc.textFile(inputFile).map(loadRecord)

[*]文件中的字段包含换行符

def loadRecords(fileNameContents):
input = StringIO.StringIO(fileNameContents)
reader = csv.DictReader(input, fieldnames=["name","favoriteAnimal"])
return reader
"""读取整个文件"""
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)

[*]  scala使用opencsv库读取csv

[*]文件中所有字段没有包含换行符

import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input = sc.textFile(inputFile)
val result = input.map{
line => {
val reader = new CSVReader(new StringReader(line))
reader.readNext()
}
}

[*]文件中的字段包含换行符

case class Person(name: String, favoriteAnimal: String)
val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap(
case(_, txt) => {
val reader = new CSVReader(new StringReader(txt))
reader.readAll().map(x => Person(x(0), x(1)))
}

[*]  java使用opencsv库读取csv

[*]文件中所有字段没有包含换行符

import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
public static class ParseLine implements Function {
public String[] call(String line) throws Exception {
CSVReader reader = new CSVReader(new StringReader(line));
return reader.readNext();
}
}
JavaPairRDD csvData = sc.textFile(inputFile).map(new ParseLine());

[*]文件中的字段包含换行符

public static class ParseLine implements FlatMapFunction {
public Iterable call(Tuple2 file) throws Exception {
CSVReader reader = new CSVReader(new StringReader(file._2);
return reader.readAll();
}
}
JavaRDD keyedRDD = sc.wholeTextFiles(inputFile).flatMap(new ParseLine());



[*]  写入

[*]csv或tsv文件输出时,将个字段转为指定顺序的数组,然后采用普通文本文件的方式进行输出。
[*]python

def writeRecords(records):
output = StringIO.StringIO()
writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
for record in records:
writer.writerow(record)
return
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

[*]scala

pandasLovers.map(person => List(person.name, person.favoriteAnimal).toArray).mapPartitions{
people => {
val stringWriter = new StringWriter()
val csvWriter = new CSVWriter(stringWriter)
csvWriter.writeAll(people.toList)
Iterator(stringWriter.toString)   
}
}.saveAsTextFile(outFile)


SequenceFile
    SequenceFile是键值对形式的常用Hadoop数据格式。由于Hadoop使用一套自定义的序列化框架,因此SequenceFile的键值对类型需实现Hadoop的Writable接口。


[*]  读取

[*]python

data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")

[*]scala

val data = sc.sequenceFile(inFile, classOf, classOf).map{case (x, y) => (x.toString, y.get())}   

[*]java

public static class ConvertToNativeTypes implements PairFunction {
public Tuple2 call(Tuple2 record) {
return new Tuple2(record._1.toString(), record._2.get());
}
}
JavaPairRDD result = sc.sequenceFile(fileName, Text.class, IntWritable.class).mapToPair(new ConvertToNativeTypes());

[*]  写入

[*]python

data = sc.parallelize([("Panda", 3), ("Kay", 6), ("Snail", 2)])
data.saveAsSequeceFile(outputFile)

[*]scala

val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)

[*]java(java中没有saveAsSequenceFile方法,用自定义hadoop格式的方式实现)

public static class ConvertToWritableTypes implements PairFunction {
public Tuple2 call(Tuple2 record) {
return new Tuple2(new Text(record._1), new IntWritable(record._2));
}
}
JavaPairRDD result = sc.parallelizePairs(input).mapToPair(new ConvertToNativeTypes());
result.saveAsHadoopFile(fileName, Text.class, IntWritable.class, SequenceFileOutputFormat.class);


数据库
    数据库主要分为关系型数据库(MySQL、PostgreSQL等)和非关系型数据库(HBase、ElasticSearch等)。

JDBC数据库连接
    spark使用JDBC访问关系型数据库(MySQL、PostgreSQL等),只需要构建一个org.apache.spark.rdd.JdbcRDD即可。

def createConnection() = {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "root")
}
def extractValues(r: ResultSet) = {
(r.getInt(1), r.getString(2))
}
val data = new JdbcRDD(sc, createConnection,
"SELECT * FROM panda WHERE id >= ? AND id
页: [1]
查看完整版本: 8.spark core之读写数据