Spark is like Hadoop - uses Hadoop, in fact - for performing actions like outputting data to HDFS. You'll know what I mean the first time you try to save "all-the-data.csv" and are surprised to find a directory named all-the-data.csv/
containing a 0 byte _SUCCESS
file and then several part-0000n
files for each partition that took part in the job.
This has to do with data locality and HDFS (Hadoop Distributed File System) and how each reducer will try to write to its local slab of HDFS [first], and in the very least, each reducer writes only to the file it owns so no reducer tramples on another reducer's output.
Occasionally this can be inconvenient when the output artifact for your Spark job is, say, a top % aggregate count of your data as a CSV that a non-technical co-worker needs for a report. You can download all of the part-0000n
files and merge them yourself easy enough, but is there a way to automate this step? (Of course there is, we're programmers after all. Automate all the things!)
An important fact to remember is that if the result of your job is a large number of part-nnnnn
files that aggregate to GiBs in size neither of the below solutions are for you. Your data is still big, and you're at an intermediary step where that output is input for another map/reduce job. In other words I probably wouldn't do this for more than a few MiBs of results. The reason is that in both solutions all of the data ends up getting processed on a single worker for the final stage (writing the output file); all of the other stages are still in parallel across the cluster.
Inspired by this article here are a couple solutions.
1. Coalesce
First, the easiest in a spark-shell:
scala> val ac = SomeAggregateCountRDD(sc).cache
/* Spark log output elided */
scala> ac.coalesce(1).saveAsTextFile("s3n://analytics/counts")
Using that solution you still end up with a "directory" in your S3 bucket named analytics/counts/
with a _SUCCESS
and part-00000
file, but the part-00000
file contains all of the results.
Coalesce (scala, java, the Java doc is clearer) returns a new RDD that exists only on the number of partitions specified; in this case 1.
2. FileUtil.copyMerge
The second solution takes advantage of the HDFS code to merge many part-nnnnn
files into a single resultant file. All of this merge work occurs on a single worker so its not a good idea if you're operating at a level of more than a few MiBs (as noted earlier).
I like this approach because it gets the data right to the presentation point I need it for my analyst co-workers with no additional steps from me. In fact, if they have an S3 client of some sort, its self-service!
I'm also including my CSV output code; it recursively flattens Product
(and TupleN
and case classes) into appropriate CSV rows. (Its not @tailrec
so yes you could blow the stack if you really, really try.)
project/build.scala
/* ... */
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
"org.apache.commons" % "commons-lang3" % "3.3.2")
com/yourname/project/rdd/package.scala
package com.yourname.project
import java.net.URI
import org.apache.commons.io.FilenameUtils
import org.apache.commons.lang.StringEscapeUtils
import org.apache.hadoop.fs.{FileUtil, Path, FileSystem}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
package object rdd {
implicit class RddOp[P <: Product](val rdd: RDD[P]) {
def saveAsMergedCsv(path: String, overwrite: Boolean = true):
Unit = {
val filename = FilenameUtils.getName(path)
val csv: RDD[String] = rdd.map(toCsv(_))
val uri: URI = new URI(s"hdfs:///root/ephemeral-hdfs/$filename")
if (overwrite) {
val hdfs: FileSystem = FileSystem.get(uri,
rdd.context.hadoopConfiguration)
hdfs.delete(new Path(uri), true)
}
csv.saveAsTextFile(uri.toString)
merge(rdd.context, uri.toString, path)
}
}
private def merge(sc: SparkContext,
srcPath: String,
dstPath: String): Unit = {
val srcFileSystem = FileSystem.get(new URI(srcPath),
sc.hadoopConfiguration)
val dstFileSystem = FileSystem.get(new URI(dstPath),
sc.hadoopConfiguration)
dstFileSystem.delete(new Path(dstPath), true)
FileUtil.copyMerge(srcFileSystem, new Path(srcPath),
dstFileSystem, new Path(dstPath),
true, sc.hadoopConfiguration, null)
}
private def toCsv(a: Any): String = {
def toCsv(a: Any, accumulator: Vector[String]):
Vector[String] = {
a match {
// unfortunately null can happen for some results from Jdbc
case null | None =>
accumulator :+ ""
case s: String =>
accumulator :+ StringEscapeUtils.escapeCsv(s)
case p: Product =>
p.productIterator.map(toCsv(_, accumulator))
.toVector.flatten
case arr: Array[_] =>
arr.map(toCsv(_, accumulator))
.toVector.flatten
case _ =>
accumulator :+ a.toString
}
}
toCsv(a, Vector()).mkString(",")
}
To use this in spark-shell:
scala> import com.yourname.project.rdd._
scala> val ac = SomeAggregateCountRDD(sc).cache
/* Spark log output elided */
scala> ac.saveAsMergedCsv("counts.csv")
To better understand what happens with toCsv
assume the following case classes:
case class Count[A](item: [A], amount: Long)
case class Track(title: String, artist: String, trackId: Option[Int])
and pretend toCsv
is public and in scope (you could make it so by using :paste
mode in an sbt console session or your spark-shell):
scala> val c1 = Count(Track("All of Me", "John Legend", Some(679297849)), 42)
scala> toCsv(c1)
res1: String = All of Me,John Legend,679297849,42
scala> val c2 = Count(Track("Supernova", "Ray LaMontagne"), 1337)
scala> toCsv(c2)
res2: String = Supernova,Ray LaMontagne,,1337
scala> val c3 = Count(Track("Name with embedded , in it", "Artist with \"quotes\""), 1)
scala> toCsv(c3)
res3: String = "Name with embedded , in it","Artist with ""quotes""",,1