Skip to content

Instantly share code, notes, and snippets.

@kenmsj
Forked from jkbradley/LDA_SparkDocs
Created October 1, 2015 22:04
Show Gist options
  • Select an option

  • Save kenmsj/d02cbb53e178d233d93f to your computer and use it in GitHub Desktop.

Select an option

Save kenmsj/d02cbb53e178d233d93f to your computer and use it in GitHub Desktop.

Revisions

  1. @jkbradley jkbradley created this gist Mar 24, 2015.
    61 changes: 61 additions & 0 deletions LDA_SparkDocs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,61 @@
    /*
    This example uses Scala. Please see the MLlib documentation for a Java example.

    Try running this code in the Spark shell. It may produce different topics each time (since LDA includes some randomization), but it should give topics similar to those listed above.

    This example is paired with a blog post on LDA in Spark: http://databricks.com/blog
    Spark: http://spark.apache.org/
    */

    import scala.collection.mutable
    import org.apache.spark.mllib.clustering.LDA
    import org.apache.spark.mllib.linalg.{Vector, Vectors}
    import org.apache.spark.rdd.RDD

    // Load documents from text files, 1 document per file
    val corpus: RDD[String] = sc.wholeTextFiles("docs/*.md").map(_._2)

    // Split each document into a sequence of terms (words)
    val tokenized: RDD[Seq[String]] =
    corpus.map(_.toLowerCase.split("\\s")).map(_.filter(_.length > 3).filter(_.forall(java.lang.Character.isLetter)))

    // Choose the vocabulary.
    // termCounts: Sorted list of (term, termCount) pairs
    val termCounts: Array[(String, Long)] =
    tokenized.flatMap(_.map(_ -> 1L)).reduceByKey(_ + _).collect().sortBy(-_._2)
    // vocabArray: Chosen vocab (removing common terms)
    val numStopwords = 20
    val vocabArray: Array[String] =
    termCounts.takeRight(termCounts.size - numStopwords).map(_._1)
    // vocab: Map term -> term index
    val vocab: Map[String, Int] = vocabArray.zipWithIndex.toMap

    // Convert documents into term count vectors
    val documents: RDD[(Long, Vector)] =
    tokenized.zipWithIndex.map { case (tokens, id) =>
    val counts = new mutable.HashMap[Int, Double]()
    tokens.foreach { term =>
    if (vocab.contains(term)) {
    val idx = vocab(term)
    counts(idx) = counts.getOrElse(idx, 0.0) + 1.0
    }
    }
    (id, Vectors.sparse(vocab.size, counts.toSeq))
    }

    // Set LDA parameters
    val numTopics = 10
    val lda = new LDA().setK(numTopics).setMaxIterations(10)

    val ldaModel = lda.run(documents)
    val avgLogLikelihood = ldaModel.logLikelihood / documents.count()

    // Print topics, showing top-weighted 10 terms for each topic.
    val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
    topicIndices.foreach { case (terms, termWeights) =>
    println("TOPIC:")
    terms.zip(termWeights).foreach { case (term, weight) =>
    println(s"${vocabArray(term.toInt)}\t$weight")
    }
    println()
    }