val input = sc.wholeTextFiles("file://opt/module/spark/datas")
val result = input.mapValues{
y => {
val nums = y.split(" ").map(x => x.toDouble)
nums.sum / nums.size.toDouble
}
}
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input = sc.textFile(inputFile)
val result = input.map{
line => {
val reader = new CSVReader(new StringReader(line))
reader.readNext()
}
}
文件中的字段包含换行符
case class Person(name: String, favoriteAnimal: String)
val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap(
case(_, txt) => {
val reader = new CSVReader(new StringReader(txt))
reader.readAll().map(x => Person(x(0), x(1)))
}
java使用opencsv库读取csv
文件中所有字段没有包含换行符
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
public static class ParseLine implements Function {
public String[] call(String line) throws Exception {
CSVReader reader = new CSVReader(new StringReader(line));
return reader.readNext();
}
}
JavaPairRDD csvData = sc.textFile(inputFile).map(new ParseLine());
文件中的字段包含换行符
public static class ParseLine implements FlatMapFunction {
public Iterable call(Tuple2 file) throws Exception {
CSVReader reader = new CSVReader(new StringReader(file._2);
return reader.readAll();
}
}
JavaRDD keyedRDD = sc.wholeTextFiles(inputFile).flatMap(new ParseLine());
写入
csv或tsv文件输出时,将个字段转为指定顺序的数组,然后采用普通文本文件的方式进行输出。
python
def writeRecords(records):
output = StringIO.StringIO()
writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
for record in records:
writer.writerow(record)
return [output.getValue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
scala
pandasLovers.map(person => List(person.name, person.favoriteAnimal).toArray).mapPartitions{
people => {
val stringWriter = new StringWriter()
val csvWriter = new CSVWriter(stringWriter)
csvWriter.writeAll(people.toList)
Iterator(stringWriter.toString)
}
}.saveAsTextFile(outFile)
data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
scala
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).map{case (x, y) => (x.toString, y.get())}
java
public static class ConvertToNativeTypes implements PairFunction {
public Tuple2 call(Tuple2 record) {
return new Tuple2(record._1.toString(), record._2.get());
}
}
JavaPairRDD result = sc.sequenceFile(fileName, Text.class, IntWritable.class).mapToPair(new ConvertToNativeTypes());
写入
python
data = sc.parallelize([("Panda", 3), ("Kay", 6), ("Snail", 2)])
data.saveAsSequeceFile(outputFile)
scala
val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)
public static class ConvertToWritableTypes implements PairFunction {
public Tuple2 call(Tuple2 record) {
return new Tuple2(new Text(record._1), new IntWritable(record._2));
}
}
JavaPairRDD result = sc.parallelizePairs(input).mapToPair(new ConvertToNativeTypes());
result.saveAsHadoopFile(fileName, Text.class, IntWritable.class, SequenceFileOutputFormat.class);
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "root")
}
def extractValues(r: ResultSet) = {
(r.getInt(1), r.getString(2))
}
val data = new JdbcRDD(sc, createConnection,
"SELECT * FROM panda WHERE id >= ? AND id