Created
September 8, 2015 23:58
-
-
Save jkbradley/1e3cc0b3116f2f615b3f to your computer and use it in GitHub Desktop.
Revisions
-
jkbradley created this gist
Sep 8, 2015 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,141 @@ Here are 2 code snippets: (1) Compute one-hot encoded data for Spark, using the data generated by https://github.com/szilard/benchm-ml/blob/master/0-init/2-gendata.txt (2) Run MLlib, computing soft predictions by hand. I ran these with Spark 1.4, and they should work for 1.5 as well. Note: There's no real need to switch to DataFrames yet for benchmarking. Both the RDD and DataFrame APIs use the same underlying implementation. (I hope to improve on that in Spark 1.6 if there is time.) Ran on EC2 cluster with 4 workers with 9.6GB memory each, and 8 partitions for training RDD. For the 1M dataset, training the forest took 2080.814977193 sec and achieved AUC 0.7129779357732448 on the test set. (1) Code for one-hot encoding import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.types.DoubleType import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler} import org.apache.spark.ml.Pipeline import org.apache.spark.mllib.linalg.Vector // Paths val origDataDir = "/mnt/mllib/regression/flightTimes/prepped" val origTrainPath = origDataDir + "/train-10m.csv" val origTestPath = origDataDir + "/test.csv" val newDataDir = "/mnt/mllib/regression/flightTimes/spark" val newTrainPath = newDataDir + "/spark-train-10m.FIXED.parquet" val newTestPath = newDataDir + "/spark-test.FIXED.parquet" // Read CSV as Spark DataFrames val trainDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(origTrainPath) val testDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(origTestPath) // Combine train, test temporarily val fullDF = trainDF.withColumn("isTrain", lit(true)).unionAll(testDF.withColumn("isTrain", lit(false))) display(fullDF) // Feature types val vars_categ = Array("Month","DayofMonth","DayOfWeek","UniqueCarrier", "Origin", "Dest") val vars_num = Array("DepTime","Distance") val vars_num_double = vars_num.map(_ + "_double") val var_y = "dep_delayed_15min" // Cast column types as needed val fullDF2 = fullDF.withColumn("DepTime_double", col("DepTime").cast(DoubleType)).withColumn("Distance_double", col("Distance").cast(DoubleType)) display(fullDF2) // Assemble Pipeline for featurization. // Need to use StringIndexer for OneHotEncoder since it does not yet support String input (but it will). val stringIndexers = vars_categ.map(colName => new StringIndexer().setInputCol(colName).setOutputCol(colName + "_indexed")) val oneHotEncoders = vars_categ.map(colName => new OneHotEncoder().setInputCol(colName + "_indexed").setOutputCol(colName + "_ohe").setDropLast(false)) val catAssembler = new VectorAssembler().setInputCols(vars_categ.map(_ + "_ohe")).setOutputCol("catFeatures") val featureAssembler = new VectorAssembler().setInputCols(vars_num_double :+ "catFeatures").setOutputCol("features") val labelIndexer = new StringIndexer().setInputCol(var_y).setOutputCol("label") val pipeline = new Pipeline().setStages(stringIndexers ++ oneHotEncoders ++ Array(catAssembler, featureAssembler, labelIndexer)) // Compute features. val pipelineModel = pipeline.fit(fullDF2) val transformedDF = pipelineModel.transform(fullDF2) display(transformedDF) // Split back into train, test val finalTrainDF = transformedDF.where(col("isTrain")) val finalTestDF = transformedDF.where(!col("isTrain")) // Save Spark DataFrames as Parquet finalTrainDF.write.mode("overwrite").parquet(newTrainPath) finalTestDF.write.mode("overwrite").parquet(newTestPath) (2) AUC/accuracy import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.tree.RandomForest import org.apache.spark.mllib.tree.configuration.Strategy import org.apache.spark.mllib.tree.model.RandomForestModel import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.sql.{DataFrame, Row} // Paths val dataDir = "/mnt/mllib/regression/flightTimes/spark" val trainDataPath = dataDir + "/spark-train-0.1m.FIXED.parquet" val testDataPath = dataDir + "/spark-test.FIXED.parquet" // Load DataFrame, and convert to RDD of LabeledPoints def toLP(df: DataFrame): RDD[LabeledPoint] = { df.select("label", "features").map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) }.repartition(8) } val train = toLP(sqlContext.read.parquet(trainDataPath)).cache() val test = toLP(sqlContext.read.parquet(testDataPath)).cache() (train.count(), test.count()) // Train model val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val numTrees = 500 val featureSubsetStrategy = "sqrt" val impurity = "gini" val maxDepth = 20 val maxBins = 50 val now = System.nanoTime val model = RandomForest.trainClassifier(train, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) val elapsed = ( System.nanoTime - now )/1e9 elapsed // Compute soft predictions. For spark.mllib trees, this works for binary classification. // Spark 1.5 will include it for multiclass under the spark.ml API. import org.apache.spark.mllib.tree.configuration.FeatureType.Continuous import org.apache.spark.mllib.tree.model.{DecisionTreeModel, Node} def softPredict(node: Node, features: Vector): Double = { if (node.isLeaf) { if (node.predict.predict == 1.0) node.predict.prob else 1.0 - node.predict.prob } else { if (node.split.get.featureType == Continuous) { if (features(node.split.get.feature) <= node.split.get.threshold) { softPredict(node.leftNode.get, features) } else { softPredict(node.rightNode.get, features) } } else { if (node.split.get.categories.contains(features(node.split.get.feature))) { softPredict(node.leftNode.get, features) } else { softPredict(node.rightNode.get, features) } } } } def softPredict(dt: DecisionTreeModel, features: Vector): Double = { softPredict(dt.topNode, features) } // Compute AUC val scoreAndLabels = test.map { point => //val score = model.trees.map(_.predict(point.features)).filter(_>0).size.toDouble / model.numTrees val score = model.trees.map(tree => softPredict(tree, point.features)).sum / model.numTrees (score, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) metrics.areaUnderROC()