Spark is like Hadoop - uses Hadoop, in fact - for performing actions like outputting data to HDFS. You'll know what I mean the first time you try to save "all-the-data.csv" and are surprised to find a directory named all-the-data.csv/ containing a 0 byte _SUCCESS file and then several part-0000n files for each partition that took part in the job.

This has to do with data locality and HDFS (Hadoop Distributed File System) and how each reducer will try to write to its local slab of HDFS [first], and in the very least, each reducer writes only to the file it owns so no reducer tramples on another reducer's output.

Occasionally this can be inconvenient when the output artifact for your Spark job is, say, a top % aggregate count of your data as a CSV that a non-technical co-worker needs for a report. You can download all of the part-0000n files and merge them yourself easy enough, but is there a way to automate this step? (Of course there is, we're programmers after all. Automate all the things!)

Automate All the Things

An important fact to remember is that if the result of your job is a large number of part-nnnnn files that aggregate to GiBs in size neither of the below solutions are for you. Your data is still big, and you're at an intermediary step where that output is input for another map/reduce job. In other words I probably wouldn't do this for more than a few MiBs of results. The reason is that in both solutions all of the data ends up getting processed on a single worker for the final stage (writing the output file); all of the other stages are still in parallel across the cluster.

Inspired by this article here are a couple solutions.

1. Coalesce

First, the easiest in a spark-shell:

scala> val ac = SomeAggregateCountRDD(sc).cache

/* Spark log output elided */

scala> ac.coalesce(1).saveAsTextFile("s3n://analytics/counts")  

Using that solution you still end up with a "directory" in your S3 bucket named analytics/counts/ with a _SUCCESS and part-00000 file, but the part-00000 file contains all of the results.

Coalesce (scala, java, the Java doc is clearer) returns a new RDD that exists only on the number of partitions specified; in this case 1.

2. FileUtil.copyMerge

The second solution takes advantage of the HDFS code to merge many part-nnnnn files into a single resultant file. All of this merge work occurs on a single worker so its not a good idea if you're operating at a level of more than a few MiBs (as noted earlier).

I like this approach because it gets the data right to the presentation point I need it for my analyst co-workers with no additional steps from me. In fact, if they have an S3 client of some sort, its self-service!

I'm also including my CSV output code; it recursively flattens Product (and TupleN and case classes) into appropriate CSV rows. (Its not @tailrec so yes you could blow the stack if you really, really try.)

project/build.scala

/* ... */
libraryDependencies ++= Seq(  
  "org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
  "org.apache.commons" % "commons-lang3" % "3.3.2")

com/yourname/project/rdd/package.scala

package com.yourname.project

import java.net.URI

import org.apache.commons.io.FilenameUtils  
import org.apache.commons.lang.StringEscapeUtils  
import org.apache.hadoop.fs.{FileUtil, Path, FileSystem}  
import org.apache.spark.SparkContext  
import org.apache.spark.rdd.RDD

package object rdd {

  implicit class RddOp[P <: Product](val rdd: RDD[P]) {
    def saveAsMergedCsv(path: String, overwrite: Boolean = true): 
      Unit = {
      val filename = FilenameUtils.getName(path)
      val csv: RDD[String] = rdd.map(toCsv(_))
      val uri: URI = new URI(s"hdfs:///root/ephemeral-hdfs/$filename")
      if (overwrite) {
        val hdfs: FileSystem = FileSystem.get(uri, 
          rdd.context.hadoopConfiguration)
        hdfs.delete(new Path(uri), true)
      }

      csv.saveAsTextFile(uri.toString)
      merge(rdd.context, uri.toString, path)
    }
  }

  private def merge(sc: SparkContext, 
                    srcPath: String, 
                    dstPath: String): Unit = {
    val srcFileSystem = FileSystem.get(new URI(srcPath),
      sc.hadoopConfiguration)
    val dstFileSystem = FileSystem.get(new URI(dstPath),
      sc.hadoopConfiguration)
    dstFileSystem.delete(new Path(dstPath), true)
    FileUtil.copyMerge(srcFileSystem,  new Path(srcPath),
      dstFileSystem,  new Path(dstPath),
      true, sc.hadoopConfiguration, null)
  }

  private def toCsv(a: Any): String = {
    def toCsv(a: Any, accumulator: Vector[String]): 
      Vector[String] = {
      a match {
        // unfortunately null can happen for some results from Jdbc
        case null | None => 
          accumulator :+ "" 
        case s: String => 
          accumulator :+ StringEscapeUtils.escapeCsv(s)
        case p: Product => 
          p.productIterator.map(toCsv(_, accumulator))
            .toVector.flatten
        case arr: Array[_] => 
          arr.map(toCsv(_, accumulator))
            .toVector.flatten
        case _ => 
          accumulator :+ a.toString
      }
    }

    toCsv(a, Vector()).mkString(",")
}

To use this in spark-shell:

scala> import com.yourname.project.rdd._

scala> val ac = SomeAggregateCountRDD(sc).cache

/* Spark log output elided */

scala> ac.saveAsMergedCsv("counts.csv")  

To better understand what happens with toCsv assume the following case classes:

case class Count[A](item: [A], amount: Long)  
case class Track(title: String, artist: String, trackId: Option[Int])  

and pretend toCsv is public and in scope (you could make it so by using :paste mode in an sbt console session or your spark-shell):

scala> val c1 = Count(Track("All of Me", "John Legend", Some(679297849)), 42)

scala> toCsv(c1)  
res1: String = All of Me,John Legend,679297849,42

scala> val c2 = Count(Track("Supernova", "Ray LaMontagne"), 1337)

scala> toCsv(c2)  
res2: String = Supernova,Ray LaMontagne,,1337

scala> val c3 = Count(Track("Name with embedded , in it", "Artist with \"quotes\""), 1)

scala> toCsv(c3)  
res3: String = "Name with embedded , in it","Artist with ""quotes""",,1