Skip to content

Instantly share code, notes, and snippets.

@jkbradley
Created September 8, 2015 23:58
Show Gist options
  • Select an option

  • Save jkbradley/1e3cc0b3116f2f615b3f to your computer and use it in GitHub Desktop.

Select an option

Save jkbradley/1e3cc0b3116f2f615b3f to your computer and use it in GitHub Desktop.

Revisions

  1. jkbradley created this gist Sep 8, 2015.
    141 changes: 141 additions & 0 deletions benchm-ml-spark
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,141 @@
    Here are 2 code snippets:
    (1) Compute one-hot encoded data for Spark, using the data generated by https://github.com/szilard/benchm-ml/blob/master/0-init/2-gendata.txt
    (2) Run MLlib, computing soft predictions by hand.

    I ran these with Spark 1.4, and they should work for 1.5 as well.

    Note: There's no real need to switch to DataFrames yet for benchmarking. Both the RDD and DataFrame APIs use the same underlying implementation. (I hope to improve on that in Spark 1.6 if there is time.)

    Ran on EC2 cluster with 4 workers with 9.6GB memory each, and 8 partitions for training RDD.
    For the 1M dataset, training the forest took 2080.814977193 sec and achieved AUC 0.7129779357732448 on the test set.

    (1) Code for one-hot encoding

    import org.apache.spark.sql.functions.{col, lit}
    import org.apache.spark.sql.types.DoubleType
    import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.mllib.linalg.Vector

    // Paths
    val origDataDir = "/mnt/mllib/regression/flightTimes/prepped"
    val origTrainPath = origDataDir + "/train-10m.csv"
    val origTestPath = origDataDir + "/test.csv"
    val newDataDir = "/mnt/mllib/regression/flightTimes/spark"
    val newTrainPath = newDataDir + "/spark-train-10m.FIXED.parquet"
    val newTestPath = newDataDir + "/spark-test.FIXED.parquet"

    // Read CSV as Spark DataFrames
    val trainDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(origTrainPath)
    val testDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(origTestPath)

    // Combine train, test temporarily
    val fullDF = trainDF.withColumn("isTrain", lit(true)).unionAll(testDF.withColumn("isTrain", lit(false)))
    display(fullDF)

    // Feature types
    val vars_categ = Array("Month","DayofMonth","DayOfWeek","UniqueCarrier", "Origin", "Dest")
    val vars_num = Array("DepTime","Distance")
    val vars_num_double = vars_num.map(_ + "_double")
    val var_y = "dep_delayed_15min"

    // Cast column types as needed
    val fullDF2 = fullDF.withColumn("DepTime_double", col("DepTime").cast(DoubleType)).withColumn("Distance_double", col("Distance").cast(DoubleType))
    display(fullDF2)

    // Assemble Pipeline for featurization.
    // Need to use StringIndexer for OneHotEncoder since it does not yet support String input (but it will).
    val stringIndexers = vars_categ.map(colName => new StringIndexer().setInputCol(colName).setOutputCol(colName + "_indexed"))
    val oneHotEncoders = vars_categ.map(colName => new OneHotEncoder().setInputCol(colName + "_indexed").setOutputCol(colName + "_ohe").setDropLast(false))
    val catAssembler = new VectorAssembler().setInputCols(vars_categ.map(_ + "_ohe")).setOutputCol("catFeatures")
    val featureAssembler = new VectorAssembler().setInputCols(vars_num_double :+ "catFeatures").setOutputCol("features")
    val labelIndexer = new StringIndexer().setInputCol(var_y).setOutputCol("label")
    val pipeline = new Pipeline().setStages(stringIndexers ++ oneHotEncoders ++ Array(catAssembler, featureAssembler, labelIndexer))

    // Compute features.
    val pipelineModel = pipeline.fit(fullDF2)
    val transformedDF = pipelineModel.transform(fullDF2)
    display(transformedDF)

    // Split back into train, test
    val finalTrainDF = transformedDF.where(col("isTrain"))
    val finalTestDF = transformedDF.where(!col("isTrain"))

    // Save Spark DataFrames as Parquet
    finalTrainDF.write.mode("overwrite").parquet(newTrainPath)
    finalTestDF.write.mode("overwrite").parquet(newTestPath)

    (2) AUC/accuracy

    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.linalg.{Vector, Vectors}
    import org.apache.spark.mllib.tree.RandomForest
    import org.apache.spark.mllib.tree.configuration.Strategy
    import org.apache.spark.mllib.tree.model.RandomForestModel
    import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
    import org.apache.spark.sql.{DataFrame, Row}

    // Paths
    val dataDir = "/mnt/mllib/regression/flightTimes/spark"
    val trainDataPath = dataDir + "/spark-train-0.1m.FIXED.parquet"
    val testDataPath = dataDir + "/spark-test.FIXED.parquet"

    // Load DataFrame, and convert to RDD of LabeledPoints
    def toLP(df: DataFrame): RDD[LabeledPoint] = {
    df.select("label", "features").map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) }.repartition(8)
    }
    val train = toLP(sqlContext.read.parquet(trainDataPath)).cache()
    val test = toLP(sqlContext.read.parquet(testDataPath)).cache()
    (train.count(), test.count())

    // Train model
    val numClasses = 2
    val categoricalFeaturesInfo = Map[Int, Int]()
    val numTrees = 500
    val featureSubsetStrategy = "sqrt"
    val impurity = "gini"
    val maxDepth = 20
    val maxBins = 50

    val now = System.nanoTime
    val model = RandomForest.trainClassifier(train, numClasses, categoricalFeaturesInfo,
    numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
    val elapsed = ( System.nanoTime - now )/1e9
    elapsed

    // Compute soft predictions. For spark.mllib trees, this works for binary classification.
    // Spark 1.5 will include it for multiclass under the spark.ml API.
    import org.apache.spark.mllib.tree.configuration.FeatureType.Continuous
    import org.apache.spark.mllib.tree.model.{DecisionTreeModel, Node}
    def softPredict(node: Node, features: Vector): Double = {
    if (node.isLeaf) {
    if (node.predict.predict == 1.0) node.predict.prob else 1.0 - node.predict.prob
    } else {
    if (node.split.get.featureType == Continuous) {
    if (features(node.split.get.feature) <= node.split.get.threshold) {
    softPredict(node.leftNode.get, features)
    } else {
    softPredict(node.rightNode.get, features)
    }
    } else {
    if (node.split.get.categories.contains(features(node.split.get.feature))) {
    softPredict(node.leftNode.get, features)
    } else {
    softPredict(node.rightNode.get, features)
    }
    }
    }
    }
    def softPredict(dt: DecisionTreeModel, features: Vector): Double = {
    softPredict(dt.topNode, features)
    }

    // Compute AUC
    val scoreAndLabels = test.map { point =>
    //val score = model.trees.map(_.predict(point.features)).filter(_>0).size.toDouble / model.numTrees
    val score = model.trees.map(tree => softPredict(tree, point.features)).sum / model.numTrees
    (score, point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    metrics.areaUnderROC()