Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save chrisalbright/a7599f3018f18ddd55be to your computer and use it in GitHub Desktop.

Select an option

Save chrisalbright/a7599f3018f18ddd55be to your computer and use it in GitHub Desktop.

Revisions

  1. @mlehman mlehman revised this gist Jul 29, 2014. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions MultipleOutputsExample.scala
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,5 @@
    /* Example using MultipleOutputs to write a Spark RDD to multiples files.
    Based on saveAsNewAPIHadoopFile implemented in org.apache.spark.rdd.PairRDDFunctions, org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil.
    val values = sc.parallelize(List(
    ("fruit/items", "apple"),
  2. @mlehman mlehman created this gist Jul 29, 2014.
    149 changes: 149 additions & 0 deletions MultipleOutputsExample.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,149 @@
    /* Example using MultipleOutputs to write a Spark RDD to multiples files.
    val values = sc.parallelize(List(
    ("fruit/items", "apple"),
    ("vegetable/items", "broccoli"),
    ("fruit/items", "pear"),
    ("fruit/items", "peach"),
    ("vegetable/items", "celery"),
    ("vegetable/items", "spinach")
    ))
    values.saveAsMultiTextFiles("tmp/food")
    OUTPUTS:
    tmp/food/fruit/items-r-00000
    apple
    pear
    peach
    tmp/food/vegetable/items-r-00000
    broccoli
    celery
    spinach
    */

    import java.text.SimpleDateFormat
    import java.util.Date
    import org.apache.hadoop.io.{DataInputBuffer, NullWritable, Text}
    import org.apache.hadoop.mapred.RawKeyValueIterator
    import org.apache.hadoop.mapreduce.counters.GenericCounter
    import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat, LazyOutputFormat, MultipleOutputs}
    import org.apache.hadoop.mapreduce.task.{ReduceContextImpl, TaskAttemptContextImpl}
    import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter
    import org.apache.hadoop.util.Progress
    import org.apache.spark._
    import org.apache.hadoop.conf.{Configurable, Configuration}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.Logging
    import org.apache.hadoop.mapreduce._
    import scala.reflect.ClassTag

    object MultipleOutputsExample extends App with Logging {

    import MultiOutputRDD._

    val sc = new SparkContext("local", "MulitOutput Example")
    val values = sc.parallelize(List(
    ("fruit/items", "apple"),
    ("vegetable/items", "broccoli"),
    ("fruit/items", "pear"),
    ("fruit/items", "peach"),
    ("vegetable/items", "celery"),
    ("vegetable/items", "spinach")
    ))
    values.saveAsMultiTextFiles("tmp/food")
    sc.stop()
    }

    class MultiOutputRDD[K, V](self: RDD[(String, (K, V))])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
    extends Logging with Serializable {

    def saveAsMultiTextFiles(path: String) {
    new MultiOutputRDD(self.map(x => (x._1, (NullWritable.get, new Text(x._2._2.toString)))))
    .saveAsNewHadoopMultiOutputs[TextOutputFormat[NullWritable, Text]](path)
    }

    def saveAsNewHadoopMultiOutputs[F <: OutputFormat[K, V]](path: String, conf: Configuration = self.context.hadoopConfiguration)(implicit fm: ClassTag[F]) {
    // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
    val hadoopConf = conf
    val job = new Job(hadoopConf)
    job.setOutputKeyClass(kt.runtimeClass)
    job.setOutputValueClass(vt.runtimeClass)
    LazyOutputFormat.setOutputFormatClass(job, fm.runtimeClass.asInstanceOf[Class[F]])
    job.getConfiguration.set("mapred.output.dir", path)
    saveAsNewAPIHadoopDatasetMultiOutputs(job.getConfiguration)
    }

    def saveAsNewAPIHadoopDatasetMultiOutputs(conf: Configuration) {
    // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
    val hadoopConf = conf
    val job = new Job(hadoopConf)
    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
    val jobtrackerID = formatter.format(new Date())
    val stageId = self.id
    val wrappedConf = new SerializableWritable(job.getConfiguration)
    val outfmt = job.getOutputFormatClass
    val jobFormat = outfmt.newInstance

    if (conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
    // FileOutputFormat ignores the filesystem parameter
    jobFormat.checkOutputSpecs(job)
    }

    val writeShard = (context: TaskContext, itr: Iterator[(String, (K, V))]) => {
    // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    // around by taking a mod. We expect that no task will be attempted 2 billion times.
    val attemptNumber = (context.attemptId % Int.MaxValue).toInt
    /* "reduce task" <split #> <attempt # = spark task #> */
    val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId, attemptNumber)
    val hadoopContext = new TaskAttemptContextImpl(wrappedConf.value, attemptId)
    val format = outfmt.newInstance
    format match {
    case c: Configurable => c.setConf(wrappedConf.value)
    case _ => ()
    }
    val committer = format.getOutputCommitter(hadoopContext)

    committer.setupTask(hadoopContext)
    val recordWriter = format.getRecordWriter(hadoopContext).asInstanceOf[RecordWriter[K, V]]

    val taskInputOutputContext = new ReduceContextImpl(wrappedConf.value, attemptId, new DummyIterator(itr), new GenericCounter, new GenericCounter,
    recordWriter, committer, new DummyReporter, null, kt.runtimeClass, vt.runtimeClass)
    val writer = new MultipleOutputs(taskInputOutputContext)

    try {
    while (itr.hasNext) {
    val pair = itr.next()
    writer.write(pair._2._1, pair._2._2, pair._1)
    }
    } finally {
    writer.close()
    }
    committer.commitTask(hadoopContext)
    1
    }: Int

    val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0)
    val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId)
    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
    jobCommitter.setupJob(jobTaskContext)
    self.context.runJob(self, writeShard)
    jobCommitter.commitJob(jobTaskContext)
    }

    class DummyIterator(itr: Iterator[_]) extends RawKeyValueIterator {
    def getKey: DataInputBuffer = null
    def getValue: DataInputBuffer = null
    def getProgress: Progress = null
    def next = itr.hasNext
    def close() { }
    }
    }

    object MultiOutputRDD {
    implicit def rddToMultiOutputRDD[V](rdd: RDD[(String, V)])(implicit vt: ClassTag[V]) = {
    new MultiOutputRDD(rdd.map(x => (x._1, (null, x._2))))
    }
    }