Last active
April 11, 2022 06:54
-
-
Save mlehman/df9546f6be2e362bbad2 to your computer and use it in GitHub Desktop.
Revisions
-
mlehman revised this gist
Jul 29, 2014 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,5 @@ /* Example using MultipleOutputs to write a Spark RDD to multiples files. Based on saveAsNewAPIHadoopFile implemented in org.apache.spark.rdd.PairRDDFunctions, org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil. val values = sc.parallelize(List( ("fruit/items", "apple"), -
mlehman created this gist
Jul 29, 2014 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,149 @@ /* Example using MultipleOutputs to write a Spark RDD to multiples files. val values = sc.parallelize(List( ("fruit/items", "apple"), ("vegetable/items", "broccoli"), ("fruit/items", "pear"), ("fruit/items", "peach"), ("vegetable/items", "celery"), ("vegetable/items", "spinach") )) values.saveAsMultiTextFiles("tmp/food") OUTPUTS: tmp/food/fruit/items-r-00000 apple pear peach tmp/food/vegetable/items-r-00000 broccoli celery spinach */ import java.text.SimpleDateFormat import java.util.Date import org.apache.hadoop.io.{DataInputBuffer, NullWritable, Text} import org.apache.hadoop.mapred.RawKeyValueIterator import org.apache.hadoop.mapreduce.counters.GenericCounter import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat, LazyOutputFormat, MultipleOutputs} import org.apache.hadoop.mapreduce.task.{ReduceContextImpl, TaskAttemptContextImpl} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter import org.apache.hadoop.util.Progress import org.apache.spark._ import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.spark.rdd.RDD import org.apache.spark.Logging import org.apache.hadoop.mapreduce._ import scala.reflect.ClassTag object MultipleOutputsExample extends App with Logging { import MultiOutputRDD._ val sc = new SparkContext("local", "MulitOutput Example") val values = sc.parallelize(List( ("fruit/items", "apple"), ("vegetable/items", "broccoli"), ("fruit/items", "pear"), ("fruit/items", "peach"), ("vegetable/items", "celery"), ("vegetable/items", "spinach") )) values.saveAsMultiTextFiles("tmp/food") sc.stop() } class MultiOutputRDD[K, V](self: RDD[(String, (K, V))]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) extends Logging with Serializable { def saveAsMultiTextFiles(path: String) { new MultiOutputRDD(self.map(x => (x._1, (NullWritable.get, new Text(x._2._2.toString))))) .saveAsNewHadoopMultiOutputs[TextOutputFormat[NullWritable, Text]](path) } def saveAsNewHadoopMultiOutputs[F <: OutputFormat[K, V]](path: String, conf: Configuration = self.context.hadoopConfiguration)(implicit fm: ClassTag[F]) { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val job = new Job(hadoopConf) job.setOutputKeyClass(kt.runtimeClass) job.setOutputValueClass(vt.runtimeClass) LazyOutputFormat.setOutputFormatClass(job, fm.runtimeClass.asInstanceOf[Class[F]]) job.getConfiguration.set("mapred.output.dir", path) saveAsNewAPIHadoopDatasetMultiOutputs(job.getConfiguration) } def saveAsNewAPIHadoopDatasetMultiOutputs(conf: Configuration) { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val job = new Job(hadoopConf) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = self.id val wrappedConf = new SerializableWritable(job.getConfiguration) val outfmt = job.getOutputFormatClass val jobFormat = outfmt.newInstance if (conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { // FileOutputFormat ignores the filesystem parameter jobFormat.checkOutputSpecs(job) } val writeShard = (context: TaskContext, itr: Iterator[(String, (K, V))]) => { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt /* "reduce task" <split #> <attempt # = spark task #> */ val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId, attemptNumber) val hadoopContext = new TaskAttemptContextImpl(wrappedConf.value, attemptId) val format = outfmt.newInstance format match { case c: Configurable => c.setConf(wrappedConf.value) case _ => () } val committer = format.getOutputCommitter(hadoopContext) committer.setupTask(hadoopContext) val recordWriter = format.getRecordWriter(hadoopContext).asInstanceOf[RecordWriter[K, V]] val taskInputOutputContext = new ReduceContextImpl(wrappedConf.value, attemptId, new DummyIterator(itr), new GenericCounter, new GenericCounter, recordWriter, committer, new DummyReporter, null, kt.runtimeClass, vt.runtimeClass) val writer = new MultipleOutputs(taskInputOutputContext) try { while (itr.hasNext) { val pair = itr.next() writer.write(pair._2._1, pair._2._2, pair._1) } } finally { writer.close() } committer.commitTask(hadoopContext) 1 }: Int val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0) val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) self.context.runJob(self, writeShard) jobCommitter.commitJob(jobTaskContext) } class DummyIterator(itr: Iterator[_]) extends RawKeyValueIterator { def getKey: DataInputBuffer = null def getValue: DataInputBuffer = null def getProgress: Progress = null def next = itr.hasNext def close() { } } } object MultiOutputRDD { implicit def rddToMultiOutputRDD[V](rdd: RDD[(String, V)])(implicit vt: ClassTag[V]) = { new MultiOutputRDD(rdd.map(x => (x._1, (null, x._2)))) } }