Alex Minnaar
Machine Learning at University College London. Research Engineer at Nitro.

Email
Github
LinkedIn

Categories

Recent Posts

Implementing the DistBelief Deep Neural Network Training Framework with Akka

Word2Vec Tutorial Part II: The Continuous Bag-of-Words Model

Word2Vec Tutorial Part I: The Skip-Gram Model

Distributed Online Latent Dirichlet Allocation with Apache Spark

Deep Learning Basics: Neural Networks, Backpropagation and Stochastic Gradient Descent

Building a Shoutbox App with Cassandra and Node.js

Building a Distributed Binary Search Tree with Akka

Introduction to the Multithreading Problem and the Akka Actor Solution

ScalaNER: A Scala Wrapper for the Stanford NER Tool with Some Added Features

Online Latent Dirichlet Allocation - The Best Option for Topic Modeling with Large Data Sets

Latent Dirichlet Allocation in Scala Part II - The Code

Latent Dirichlet Allocation in Scala Part II - The Code

The theory behind Latent Dirichlet Allocation was outlined in the the previous blog post. Now the goal is to translate this theory into a fully-fledged Scala application. There are two main entities in the LDA algorithm

  1. The Corpus: This is the collection of documents. Functionality is needed to accept documents from the user, create a vocabulary, perform text preprocessing, maintain document-level and corpus-level word counts, and returning the desired output back to the user.
  2. Inference: The core aspect of this algorithm is the collapsed Gibbs sampling inference step. This must be implemented efficiently and correctly.

The Corpus

Getting a vocabulary

The first task that the corpus class must undertake is getting a vocabulary from the given documents. Stop words must be removed and we also want to remove low-frequency words (these words would most likely not show up in any topics anyway so it is best to remove them for memory-management reasons). The low-frequency cut-off threshold should be supplied by the user. In order to determine the low-frequency words, a corpus-wide word count must be performed. Also, it is good practice to remove stop words and words that are either too short or too long. The following CountVocab class performs the required word count, then removes words that are too infrequent, are part of a stop word list, or are not within the allowable length bounds.

class CountVocab(filePath: String, minCount: Int) extends Vocabulary {

  val stopWords = Source.fromURL(getClass.getResource("/english_stops_words.txt")).mkString.split("\n").toSet

  def getVocabulary: HashMap[String, Int] = {

    var vocabulary: HashMap[String, Int] = HashMap.empty
    var wordCounter: HashMap[String, Int] = HashMap.empty

    def countWords(docFile: File) {

      val tokens = StanfordTokenizer.tokenizeFile(docFile)

      for (token <- tokens) {

        if (wordCounter.contains(token)) {
          wordCounter += (token -> (wordCounter(token) + 1))
        }
        else if (!stopWords.contains(token) && !token.matches("\\p{Punct}") && token.length > 2 && token.length < 15 && token != "-lrb-" && token != "-rrb-") {
          wordCounter += (token -> 1)
        }
      }
    }

    new File(filePath).listFiles.toIterator.filter(_.isFile).toList.map(docFile => countWords(docFile))

    for ((w, freq) <- wordCounter) {
      if (freq >= minCount) {
        vocabulary += (w -> vocabulary.size)
      }
    }

    vocabulary
  }
}

The above function takes a file path as an input parameter that points to the directory containing the documents (in text file format) on which you are going to perform LDA. It also takes a frequency threshold parameter, below which a word is deemed too infrequent to be useful. There is a nested function that performs a word count on the important words (tokenized with the Stanford coreNLP tokenizer), then we iterate through the counted words and keep the ones above the frequency threshold. Also, we create a hashmap where every word we keep is mapped to a unique integer ID which will be used later.

Keeping Track of the Topic Assignment Counts

From part I, we learned that the probabilities that we are interested in are dependent upon the topics that are assigned to each word in the corpus. During the Gibbs sampling procedure, the topic assignments are constantly being updated and the conditional distribution that is being sampled from also needs to be updated to reflect the new topic assignments. Therefore, we need a way of keeping tack of these topic assignment counts. This will be done with two matrices (using the Breeze linear algebra library) - a topic/word matrix that counts how many times each word is assigned to each topic, and a document/topic matrix that counts how many words each topic is assigned to in each document. Furthermore, we need to initialize the Gibbs sampling procedure by randomly assigning each word to a topic. This is all done in the following CollapsedLDACorpus class.

class CollapsedLDACorpus(vocab: HashMap[String, Int], numTopics: Int, docDirectory: String) extends Corpus {

  val numDocs = DocUtils.numDocs(docDirectory)
  val vocabulary = vocab
  var docTopicMatrix = DenseMatrix.zeros[Double](numDocs, numTopics)
  var topicWordMatrix = DenseMatrix.zeros[Double](numTopics, vocabulary.size)
  var words: Vector[Word] = Vector.empty

  val randomTopicGenerator = new Random
  var docIndex = 0

  def processDoc(contents: String) = {

    val tokens = StanfordTokenizer.tokenizeString(contents)

    for (token <- tokens) {

      val randTopic = randomTopicGenerator.nextInt(numTopics)

      if (vocabulary.contains(token)) {

        //Assign the word to a random topic
        words :+= Word(token, docIndex, randTopic)
        docTopicMatrix(docIndex, randTopic) += 1.0
        topicWordMatrix(randTopic, vocabulary(token)) += 1.0
      }
    }
    docIndex += 1
  }

  def initialize = {
    new File(docDirectory).listFiles.toIterator.filter(_.getName.endsWith(".txt")).toList.map(docFile => processDoc(fromFile(docFile).getLines().mkString))
  }

  def reverseVocab: HashMap[Int, String] = {
    vocabulary.map(_ swap)
  }
}

There is also an processDoc function that assigns each word in the document (that is in the vocabulary) to a random topic and increments the corresponding entries docTopicMatrix and topicWordMatrix. The topic assignments are assigned within objects of a case class called Word that maintains the state of each word.

case class Word(token: String, doc: Int, var topic: Int)

The state of a word is its assigned topic, the document that it appears in, and the actual string value of the word itself.

Inference

The collapsed Gibbs sampling inference algorithm was described in detail in part I of this blog post. In short, topic assignments are repeatedly sampled from a conditional distribution and after enough samples have been performed, it is assumed that the samples are taken from the posterior distribution over topic assignments. Then, the \(\theta\) and \(\phi\) probabilities can be computed from these inferred topic assignments. The following collapsedGibbs class performs this these tasks.

class CollapsedGibbs(corpus: CollapsedLDACorpus, docDirectory: String, vocabThreshold: Int, K: Int, alpha: Double, beta: Double) extends TopicModel {

  /**
   * For a given word, calculate the conditional distribution over topic assignments to be sampled from.
   * @param word word whose topic will be inferred from the Gibb's sampler.
   * @return distribution over topics for the word input.
   */
  private[this] def gibbsDistribution(word: Word): Multinomial[DenseVector[Double], Int] = {

    val docTopicRow: DenseVector[Double] = corpus.docTopicMatrix(word.doc, ::).t
    val topicWordCol: DenseVector[Double] = corpus.topicWordMatrix(::, corpus.vocabulary(word.token))
    val topicSums: DenseVector[Double] = sum(corpus.topicWordMatrix, Axis._1)
    val params = (docTopicRow + alpha) :* (topicWordCol + beta) / (topicSums + corpus.vocabulary.size * beta)

    //normalize parameters
    val normalizingConstant = sum(params)
    val normalizedParams = params :/ normalizingConstant

    Multinomial(normalizedParams)
  }

  /**
   * Gibbs sampler for LDA
   * @param numIter number of iterations that Gibbs sampler will be run
   */
  private[this] def gibbsSample(numIter: Int = 200) {

    for (iter <- 0 to numIter) {

      println(iter)

      for (word <- corpus.words) {

        val multinomialDist = gibbsDistribution(word)

        val oldTopic = word.topic

        //reassign word to topic determined by sample
        word.topic = multinomialDist.draw()

        //If topic assignment has changed, must also change count matrices
        if (oldTopic != word.topic) {

          //increment counts to due to reassignment to new topic
          corpus.topicWordMatrix(word.topic, corpus.vocabulary(word.token)) += 1.0
          corpus.docTopicMatrix(word.doc, word.topic) += 1.0

          //decrement counts of old topic assignment that has been changed
          corpus.topicWordMatrix(oldTopic, corpus.vocabulary(word.token)) -= 1.0
          corpus.docTopicMatrix(word.doc, oldTopic) -= 1.0

        }
      }
    }
  }

  /**
   * Calculate theta matrix directly from doc/topic counts.  Overwrite counts matrix to save memory.
   */
  private[this] def getTheta {

    //we turn the counts matrix into a probability matrix
    for (doc <- 0 to corpus.numDocs - 1) {

      val countToProb: DenseVector[Double] = ((corpus.docTopicMatrix(doc, ::) + alpha) / (sum(corpus.docTopicMatrix(doc, ::).t) + K * alpha)).t
      corpus.docTopicMatrix(doc, ::) := countToProb.t
    }
  }

  /**
   * Calculate phi matric directly from topic/word counts.  Overwrite counts matrix to save memory.
   */
  private[this] def getPhi {

    //we turn the counts matrix into a probability matrix
    for (topic <- 0 to K - 1) {

      val countToProb: DenseVector[Double] = ((corpus.topicWordMatrix(topic, ::) + beta) / (sum(corpus.topicWordMatrix(topic, ::).t) + corpus.vocabulary.size * beta)).t
      corpus.topicWordMatrix(topic, ::) := countToProb.t
    }
  }

  /**
   * Perform all inference steps - gibbs sampling, calculating theta matrix, calculating phi matrix.
   */
  def inference {
    gibbsSample()
    getTheta
    getPhi
  }

  /**
   * Print topics found by LDA algorithm.
   * @param numWords Determines how many words to display for each topic.
   */
  def printTopics(numWords: Int) {

    //want to actually show the words, so we need to extract strings from ids
    val revV = corpus.reverseVocab

    for (topic <- 0 to K - 1) {

      //tie probability to column index, then sort by probabiltiy, take the top numWords, map column index to corresponding word
      println("Topic #" + topic + ":  " + corpus.topicWordMatrix(topic, ::).t.toArray.zipWithIndex.sortBy(-_._1).take(numWords).toList.map(x => revV(x._2)))

    }
  }

  /**
   * For a given document, display most likely topics occurring in it.
   * @param docIndex index of document to be analyzed.
   * @param probCutoff Determines how likely a topic has to be for it to be displayed.
   */
  def printTopicProps(docIndex: Int, probCutoff: Double) {

    //tie probability to column index, filter probabilities by probCutoff
    println(corpus.docTopicMatrix(docIndex, ::).t.toArray.zipWithIndex.filter(x => x._1 > probCutoff).toList)

  }
}

The full inference procedure is performed in the inference method. First the Gibbs sampling procedure is performed. This is done by iterating over each word in the corpus and during each iteration, the conditional probability distribution over all topics for that word is calculated with the following equation from part I,

$$P(z_i=j|z_{-i},w) \propto \frac{n_{-i,j}^{w_i}+\beta}{n_{-i,j}+W\beta}\frac{n_{-i,j}^{d_i}+\alpha}{n_{-i}^{d_i}+K \alpha}$$

This is done in the method called gibbsDistribution. The distribution is returned as a Multinomial distribution (see the scaladocs for Breeze Multinomial distribution). Then a new topic is sampled from this multinomial distribution and assigned to the word object. And finally, the doc/topic and topic/word counts are updated to reflect this new assignment (where the word ID from vocabulary is used to associate words with columns in the topic/word matrix). This sampling procedure runs for 10,000 iterations (so that we can be sure that MCMC convergence has occurred). The next step in inference is to calculate the \(\theta\) and \(\phi\) probability matrices. This is done using the count matrices and the following equations from part I,

\(\theta_{i,k}=\frac{n_i^k+\alpha_k}{\sum_{k=1}^Kn_i^k+\alpha_k}\) and \(\phi_{k,w}=\frac{n_w^k+\beta_w}{\sum_{w=1}^Wn_w^k+\beta_w}\)

The count matrices are transformed into probability matrices in place to save on memory. The printTopics method prints the topics found from the \(\theta\) matrix in terms of the most likely words for each topic. The printTopicProps method prints the most likely topics for a particular document within the corpus from the \(\phi\) matrix.

Hopefully this series of blog posts has shed some light on some of the mysteries of Latent Dirichlet Allocation. The above code snippets were taken from my Scala implementation that you can find at my ScalaTopicModels github repo.