Wednesday, April 15, 2015

Recursive Viterbi Path for Hidden Markov Models

Target audience: Advanced
Estimated reading time: 7'

Many machine learning algorithms utilize dynamic programming, and the Hidden Markov Model (HMM) is a prime example of this. Scala's tail recursion optimization offers a superb alternative to the conventional iterative approach for implementing the three fundamental forms of the HMM.


Table of contents
      Markov chain
      Viterbi path
Follow me on LinkedIn

What you will learn: How to write a fast implementation of Viterbi algorithm in Scala


Notes:

Theory

Markov chain

A Markov process is a stochastic process characterized by its dependence on time, where the future's state is independent of past states if the present state is known. 

The intricacies of Markov process theory vary significantly based on the nature of the time space, whether it is discrete or continuous, and the nature of the state space, which can be discrete (countable and with all subsets measurable) or a broader topological space.
The following diagram describes a simple 2-state Markov chain [ref 2]:
Fig. 1 State transition diagram for a 2-state Markov chain

The state transition probabilities matrix is defined as: \[\begin{bmatrix} 1-\alpha & \alpha \\ \beta & 1-\beta \end{bmatrix}\]

Hidden Markov model

Hidden Markov Models (HMMs) are versatile tools used across various domains to infer sequences of data that are not directly observable. Their applications include:
  • Neuroscience
  • Speech analysis and recognition
  • Gene sequencing prediction
  • Time series analysis in general
  • Logistics and supply chain optimization
  • Handwriting recognition
  • Computational finance and quantitative analysis

A HMM algorithm uses 3 key components [ref 3].
  • A set of observations
  • A sequence of hidden states
  • A model that maximizes the joint probability of the observations and hidden states, known as the Lambda model
HMM are usually visualized as lattice of states with transition and observations as illustrated in fig 2.

Fig. 2  Diagram of lattice of states, transition and observations


There are 3 use cases (or canonical forms) of the HMM [ref 4]:
  • Evaluation: Evaluate the probability of a given sequence of observations, given a model.
  • Training: Identify (or learn) a model given a set of observations.
  • Decoding Estimate the state sequence with the highest probability to generate a given as set of observations and a model.
The last use case, Decoding, is implemented numerically using the Viterbi algorithm.

Viterbi algorithm

The terms "Viterbi path" and "Viterbi algorithm" are now commonly used to describe the application of dynamic programming algorithms in solving maximization problems that deal with probabilities.

Given a sequence of states {qt} and sequence of observations {oj}, the probability δt(i) for any sequence to have the highest probability path for the first T observations is defined for the state Si

Definition of delta function (eq 1):\[\delta _{t}(i)= \max_{q_{i}:0,T-1}\ p(q_{0:T-1} =S_{i},\ O_{0:T-1} \ | \lambda )\]
Initialization of delta (eq 2): \[\delta _{0}(i)=\pi_{i}b_{i}(O_{0})\ \ \ \psi_{t}(i)=0 \  \ i:0,T-1\]
Recursive computation of delta (eq 3):\[\delta_{t}(i)=\max_{i}( \delta_{t-1}(i)a_{ij}b_{j}O_{t})\ \ \ \ \psi_{t}=arg \max_{i}(\delta_{t-1}(i).a_{ij})\]
Computation of the optimum sequence of states Q (eq 4):\[q^{*}_{t}=\psi_{t+1}(q^{*}_{t+1})\ \ \ \ q^{*}_{T}=arg \max_{i}\delta_{T}(i)\]

Scala implementation

State transition

  • delta: sequence to have the highest probability path for the first i observations is defined for a specific test δt(i)
  • psi Matrix that contains the indices used in the maximization of the probabilities of pre-defined states
  • Qstar: the optimum sequence q* of states Q0:T-1
The state of the Viterbi/HMM computation regarding the state transition and emission matrices is defined in the HMMState class.

final class HMMState(hmmModel: HMMModel, maxIters: Int) {
  
  // Matrix of elements (t, i) that defines the highest  probability of a single path
  // of t  observations reaching state S(i) - Initialized
  val delta = Array.fill(hmmModel.getT)(Array.fill(hmmModel.getN)(0.0))

 
  // Auxiliary matrix of indices that maximize the probability 
  //of a given sequence of states - Initialized
  val psi = Array.fill(hmmModel.getT)(Array.fill(hmmModel.getN)(0))

  // Singleton to compute the sequence Q* of states with 
  // the highest probability given a sequence of observations 
  object QStar {
     private val qStar = Array.fill(lambda.getT)(0)

    // Update Q* the optimum sequence of state using backtracking.. 
    def update(t: Int, index: Int): Unit
       ...
  }

  def getModel: HMMModel = hmmModel
}


The class HMMModel is defined by the 3 components of the Markov processes
  • State transition matrix A
  • Observation emission matrix B
  • Initial state probabilities, pi
and the number of observations, numObs.

case class HMMModel  (
   A: Array[Array[Double]],
   B: Array[Array[Double]],
   pi: Array[Double],
   numObs: Int
)  {

  def numStates: Int = A.nRows
  def numSymbols: Int = B.nCols
}

The number of states and symbols are directly extracted from the state-transition A, and observations B matrices.

Viterbi path

The algorithm is conveniently illustrated by the following diagram.
Fig. 3 Illustration of state-transition and observation matrices


First, let's create the key member and method for the Lambda model for HMM. The model is defined as a tuple of the transition probability matrix A, emission probability matrix B and the initial probability π

Then let's define the basic components for implementing the Viterbi algorithm. The class ViterbiPath encapsulates the member variables critical to the recursion.
The Viterbi algorithm is fully defined with

  • lambda: Lambda model as described in the previous section (line 2)
  • obsIdx: Index of observed states
class ViterbiPath(state: HMMState, obsIdx: Array[Int]) {
  
  val hmmModel = state.getModel()       // Access the model through its state
  
  def getPath: HMMPrediction = {
      val path = recurse(1)
      HMMPrediction(path, qStar())
} }

The predicted output from the Viterbi algorithm consists of
  • List of predicted states
  • Likelihood of the sequence of observed states
case class HMMPrediction(likelihood: Double, states: Array[Int])

Tail recursion

The recursive method, recurse that implements the formula or steps defined earlier. The method relies on the tail recursion. Tail recursion or tail elimination algorithm avoid the creation of a new stack frame for each invocation, improving the performance of the entire recursion [ref 5].

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@scala.annotation.tailrec
private def recurse(t: Int): Double = {
  
  // Initialization of the delta value for the first observation
  if( t == 0)
    initial 

   // then for the subsequent observations ...
  else { 
    // Update the maximum delta value and its state index  for observation t
    Range(0, hmmModel.getN).foreach( updateMaxDelta(t, _) )
   
     // If we reached the last observation... exit by backtracking the
     // computation from the last observation
    if( t ==  obs.size-1) {
      val idxMaxDelta = Range(0, hmmModel.getN)
             .map(i => (i, state.delta(t, i)))
             .maxBy(_._2)

      // Update the Q* value with the index that maximize /the delta.A
      state.QStar.update(t+1, idxMaxDelta._1)
      idxMaxDelta._2
    }
    else    
      recurse(t+1)
  }
}

Once initialized (line 5), the maximum value of delta, maxDelta, is computed through the method updateMaxDelta after the first iteration (line 10). Once the step t reaches the maximum number of observation labels, last index in the sequence of observations obs.size-1) (line 15), the optimum sequence of state q* / state.QStar is updated (line 21). The index of the column of the transition matrix A, idxMaxDelta corresponding to the maximum of delta is computed (lines 15-18). The last step is to update the matrix QStar (line 21).

The method updateMaxDelta computes the maximum value of delta of the states for this observation at time t and the state index j.

def updateMaxDelta(t: Int, j: Int): Unit = {

    val (psiValue, index) = (0 until hmmModel.numStates)
      .map(i => (i, delta(t - 1, i) * hmmModel.A(i, j)))
      .maxBy(_._2)

    psi(t)(j) = psiValue
    delta += (t, j, index)
}


References

  • [1Scala Programming Language
  • [2] Pattern Recognition and Machine Learning; Chap 13 Sequential Data/Hidden Markov Models - C. Bishop - Springer 2009
  • [3] Scala for Machine Learning; Chap 7 Sequential Data Models - P. Nicolas - Pack Publishing 2017
  • [4] Machine Learning: A Probabilistic Perspective; Chap 17 Markov and Hidden Markov models - K. Murphy - The MIT Press 2012
  • [5] Wikipedia: Tail Recursion



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Wednesday, March 4, 2015

F-bounded Type Polymorphism

Target audience: Intermediate
Estimated reading time: 4'


This post introduces and describes the concept of bounded polymorphism using self-referential type constraint.


Introduction

F-Bounded Type polymorphism or Bounded Quantification Polymorphism is parametric type polymorphism that constrains the subtypes to themselves using bounds.
Let's consider the following "classification" problem:


How can we guarantee that the SharpPencils bucket contains only sharp pencils, not small pencils or erasers?

Type polymorphism

The first attempt to solve the problem is to rely on parametric type polymorphism. To this purpose, we create a Pencils trait sub-classed by as a bucket for sharp pencils, SharpPencils and a bucket for small pencils, SmallPencils.
For the sake of simplification, we assume that Pencils defines only 2 methods: add (line 4) and pop (line 5) pencils.

1
2
3
4
5
6
7
8
9
trait Pencils[T] {
  private lazy val content = new mutable.ListBuffer[T]
 
  def add(t: T): Unit = content.append(t)
  def pop: List[T] = (content -= data.head).toList
}
 
class SharpPencils extends Pencils[SharpPencils]
class SmallPencils extends Pencils[SmallPencils]


This implementation does not guarantee that SharpPencils is the only bucket that contains the sharp pencils (line 8). Another bucket OtherPencils can be created with sharp pencils, too (line 9).

1
class OtherPencils extends Pencils[SharpPencils]


The solution is to specify constraints (or bounds) on the type of elements contained in each bucket.

Bounded polymorphism

The goal is to make sure that the bucket of specific type (or containing a specific type of pencils). The first step is to make sure that a bucket does not contain other items than a Pencils. This is accomplished by using the recursive (or F-Bound) type polymorphism
   trait A[T]<: ...="" a="" pre="">
The class Pencils is parameterized with one of its sub-type (line 1). None of the existing methods need to change.

1
2
3
4
5
6
trait Pencils[T <: Pencils[T]] {
  private lazy val content = new mutable.ListBuffer[T]
 
  def add(t: T): Unit = content.append(t)
  def pop: List[T] = (content -= data.head).toList
}

This implementation resolve the limitation raised in the previous paragraph. However there is nothing that prevent SharpPencils to contain an eraser or small pencils and SmallPencils to contain sharp pencils, as illustrated in the next code snippet.

// Won't compile!
class SharpPencils extends Pencils[Eraser]
 
 // The following code compiles!
class SharpPencils extends Pencils[SmallPencils]
class SmallPencils extends Pencils[SharpPencils]


Self-referential polymorphism

As a matter of fact, the most effective constraint on a inherited type is the self reference (line 2) that list the type allows for the method to execute.

1
2
3
4
5
6
7
8
9
trait Pencils[T <: Pencils[T]] {
  self: =>
    private lazy val content = new ListBuffer[T]
    def add(t: T): Unit = content.append(t)
    def pop: List[T] = (content -= data.head).toList
}
 
   // The following code fails to compile!
class SharpPencils extends Pencils[SmallPencils]


The declaration of SharpPencils as containing small pencils (line 9) fails to compile because it violates the self-referential type restriction.

References

Getting F-Bounded Polymorphism into Shape B. Greenman, F. Muehlboeck, R. Tate - Cornell University

Thursday, February 5, 2015

Back-Pressure Strategy Using Akka Mailboxes

Target audience: Advanced
Estimated reading time: 6'


This post explores the back-pressure control in Akka to manage data pipelines.




Overview

Akka is a very reliable and effective library to deploy tasks over multiple cores and over multiple hosts. Fault-tolerance is implemented through a hierarchy of supervising actors, not that different from the Zookeeper framework.
But what about controlling the message flows between actors or clusters of actors? How can we avoid messages backing up in mailboxes for slow, unavailable actors or lengthy computational tasks?
Typesafe has introduced Akka reactive streams and Flow materialization to control TCP back-pressure and manage data flows between actors and cluster of actors. TCP back pressure is a subject for a future post. In the meantime let's design the poor's man back-pressure handler using bounded mail boxes.


Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.

Workflow 

Let's look at a simple computational workflow with the following components:
  • Workers: These actors process and transform data sets. They start a computation task upon receiving a Compute message that contain the next data set to process. Each worker actor returns the results of the computation (transformed data set) back to the controller using the Completed message.
  • Controller is responsible for loading batch of data through a Load message and forward it to the workers in a Compute message. The Controller request the status of the bounded mail box for each worker by sending a GetStatus to the watcher.
  • Watcher monitor the state of the workers' mail box and return the current load (as percentage of the mail box is currently used) to the controller with a Status message.
The following diagram describe the minimum set of messages required to execute the workflow.


Workers

The worker actor is responsible for processing a slice of data using a function forwarded by the controller.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
final class Worker(id: Int) extends Actor {

  override def receive = {
    // Sent by master/controller to initiate 
    // the computation with a msg.fct invocation
    case msg: Compute => {
       val output = msg.fct(msg.xt)
       sender ! Completed(id, output, msg.id+1)
    }
      // last request
    case Cleanup => sender ! Done
  }
}

The worker receives the input to the computation (slice of time series msg.xt and the data transformation msg.fct through the Compute message sent by the controller. Note that the function fct cannot be defined as a closure as the context of the function is unknown to the worker.
The actor return the result output of the computation through the Completed message. Finally all the workers notify the controller that their tasks is completed by responding to the Cleanup message.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type DblSeries = List[Array[Double]]
 
sealed abstract class Message(val id: Int)

  // Sent by worker back to controller
case class Completed(
    i: Int, 
    xt: Array[Double], 
    nIter: Int) extends Message(i)

  // Sent by controller to workers
case class Compute(
    i: Int, 
    xt: DblSeries, 
   fct: DblSeries => DblSeries)  extends Message(i)

case class Cleanup() extends Message(-1)

Controller

The worker actor is responsible for processing a slice of data using a function forwarded by the controller. It takes three parameters.
  • numWorkers: Number of worker actors to create
  • watermark: Define the utilization of the worker mail box which trigger a throttle action. If the utilization of the mailbox is below the watermark, the controller throttles up ( increases the pressure) on the actor; If the utilization of the mail box exceeds 1.0 - watermark the controller decreases the pressure on the actor, otherwise the controller does not interfere with the data flow.
  • mailboxCapacity:Capacity of the mailboxes for the worker actors (maximum number of messages). It is assumed that all mailboxes have the same capacity.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Controller(
   numWorkers: Int, 
   watermark: Double, 
   capacity: Int
   )  extends Actor {
  
  var id: Int = 0
  var batchSize = capacity>>1

    // Create a set of worker actors
  val workers = List.tabulate(numWorkers)(n => 
    context.actorOf(Props(new Worker(n)), name = s"worker$n")
   )
    
  val pushTimeout = new FiniteDuration(10, MILLISECONDS)
  val msgQueues = workers.map(w => 
     (new BoundedMailbox(capacity, pushTimeout))
       .create(Some(w), Some(context.system))
   )
 
  val watcher = context.actorOf(Props(new Watcher(msgQueues)))
   ...
}

The set of workers are created using the tabulate higher order method. A message queue (mailbox) has to be created for each actor. The mailboxes are bounded in order to avoid buffer overflow. Finally a watch dog actor of type Watcher is created through the Akka context to monitor the mailboxes for the worker. The watcher actor is described in the next sub paragraph.
Let's look at the Controller message loop.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
override def receive = {
    // Loads chunk of stream or input data set
  case input: Load => load(input.strm)

    // processes results from workers
  case msg: Completed => 
    if(msg.id == -1) kill else process(msg)

    // Status on mail boxes utilization sent by the watchdog
  case status: Status => throttle(status.load)
}

Back-pressure control

The controller performs 3 distinct functions:
  • load: Load a variable number of data points and partition them for each worker
  • process: Aggregate the results of computation for all the workers
  • throttle: Increase or decrease the number of data points loaded at each iteration.
Let's look at these methods.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
  // Load, partition input stream then 
  // distribute the partitions across the workers
def load(strm: InputStream): Unit =

  while( strm.hasNext ) {
    val nextBatch = strm.next(batchSize)
    partition(nextBatch)
       .zip(workers)
       .foreach(w => w._2 ! Compute(id, w._1, strm.fct) )
    id += 1
  }
 
  // Process, aggregate results from all the workers
def process(msg: Completed): Unit = {
   ..  // aggregation
   watcher ! GetStatus
}
  
  // Simple throttle function that increases or decreases the 
  // size of the next batch of data to be processed by 
  // workers according to the current load on mail boxes
def throttle(load: Double): Unit = {
   if(load < watermark) 
      batchSize += (batchSize*(load-watermark)).floor.toInt
   else if(load > 1.0 - watermark) 
      batchSize -= (batchSize*(load-1.0 + watermark)).floor.toInt
   
   if( batchSize > (mailboxCapacity>>1) )
      batchSize = (mailboxCapacity>>1)
}

  • load extracts the next batch of data, partitions it then send each partition to a worker actor along with the data transformation fct
  • process aggregates the result (Completed) from the transformation on each worker. Then the controller requests a status on the mail box to the watcher
  • throttle recompute the size of the next batch, batchSize using the load information provided by the watcher relative to the watermark

Watcher

The watcher has a simple task: compute the average load of the mailbox of all workers. The computation is done upon reception of the GetStatus message from the controller.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class Watcher(
   queue: Iterable[MessageQueue]
  ) extends Actor {

  def load = queue.map( _.numberOfMessages)
                  .sum.toDouble/queue.size

  override def receive = { 
    case GetStatus => sender ! Status(load) 
  }
}


Memory consumption profile

The bottom graph displays the action of the controller (throttling). The Y-axis display the intensity of the throttle from -6 for significant decrease in load (size of batches) to +6 for significant increase in load/pressure on the workers. A throttle index of 0 means that no action is taken by the controller.
The top graph displays the average size of the worker's mailbox, in response of action taken by the controller.

This implementation of a feedback controller loop is rather crude and mainly described as an introduction to the concept of back pressure control. Production quality implementation relies on:
  • TCP-back pressure using reactive streams
  • A more sophisticated throttle algorithm to avoid significant adjustment or resonance
  • Handling dead letters in case the throttle algorithm fails and the mailbox overflows

References

Saturday, November 29, 2014

Apache Spark/MLlib for K-means

Target audience: Intermediate
Estimated reading time: 5'


This post illustrates the Apache Spark MLlib library with the plain-vanilla K-means clustering (unsupervised) algorithm.


Table of contents


Overview

Apache Spark attempts to address the limitation of Hadoop in terms of performance and real-time processing by implementing in-memory iterative computing, which is critical to most discriminative machine learning algorithms. Numerous benchmark tests have been performed and published to evaluate the performance improvement of Spark relative to Hadoop. In case of iterative algorithms, the time per iteration can be reduced by a ratio of 1:10 or more.
The core element of Spark is Resilient Distributed Datasets (RDD), which is a collection of elements partitioned across the nodes of a cluster and/or CPU cores of servers. An RDD can be created from local data structures such as list, array or hash tables, from the local file system or the Hadoop distributed file system (HDFS).

Note: The code presented in this post uses Apache Spark version 1.3.1. There is no guarantee that the implementation of the K-means in this post will be compatible with future version of Apache Spark.

Apache Spark RDDs

The operations on an RDD in Spark are very similar to the Scala higher order methods. These operations are performed concurrently over each partition. Operations on RDD can be classified as:
* Transformation: convert, manipulate and filter the elements of an RDD on each partition
* Action: aggregate, collect or reduce the elements of the RDD from all partitions

An RDD can persist, be serialized and cached for future computation. Spark provides a large array of pre-built transforms and actions which go well beyond the basic map-reduce paradigm. Those methods on RDDs are a natural extension of the Scala collections making code migration seamless for Scala developers.

Apache Spark supports fault-tolerant operations by allowing RDDs to persist both in memory and in the file systems. Persistency enables automatic recovery from node failures. The resiliency of Spark relies on the supervisory strategy of the underlying Akka actors, the persistency of their mailboxes and replication schemes of HDFS.
Spark is initialized through its context. For instance, a local Spark deployment on 8 cores, with 2 Gbytes allocated for data processing (RDDs) in memory only storage level and 512 Mbytes for the master process is defined by creating a spark configuration instance of type SparkConf

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
 
val sparkConf = new SparkConf()
            .setMaster("local[8]")
            .setAppName("SparkKMeans")
            .set("spark.executor.memory", "2048m")
            .set("spark.storageLevel", "MEMORY_ONLY")
            .set("spark.driver.memory", "512M")
            .set("spark.default.parallelism", "16")
 
implicit val sc = new SparkContext(sparkConf))



Apache Spark MLlib

MLlib is a scalable machine learning library built on top of Spark. As of version 1.0, the library is a work in progress. The main components of the library are:
  • Classification algorithms, including logistic regression, Naïve Bayes and support vector machines
  • Clustering limited to K-means in version 1.0
  • L1 & L1 Regularization
  • Optimization techniques such as gradient descent, logistic gradient and stochastic gradient descent and L-BFGS
  • Linear algebra such as Singular Value Decomposition
  • Data generator for K-means, logistic regression and support vector machines.
The machine learning byte code is conveniently included in the spark assembly jar file built with the simple build tool, sbt.
Let's consider the K-means clustering components bundled with Apache Spark MLlib. The K-means configuration parameters are:
  • K Number of clusters (line 4)
  • maxNumIters Maximum number of iterations for the minimizing the reconstruction error< (line 5)/li>
  • numRuns Number of runs or episode used for training the clusters (line 6)
  • caching Specify whether the resulting RDD has to be cached in memory (line 7)
  • xt The array of data points (type Array[Double]) (line 8)
  • sc Implicit Spark context
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
 
class SparkKMeans(
    K: Int, 
    maxNumIters: Int, 
    numRuns: Int,
    caching: Boolean,
    xt: Array[Array[Double]]) (implicit sc: SparkContext) {
   
 
  def train: Try[KMeansModel] = {
    val kmeans = new KMeans
    kmeans.setK(K)
    kmeans.setMaxIterations(maxNumIters)
    kmeans.setRuns(numRuns)
   
    val rdd = sc.parallelize(xt.map(new DenseVector(_)))
    rdd.persist(StorageLevel.MEMORY_ONLY)
    if( caching )
       rdd.cache
    kmeans.run(rdd)
  }
}

The clustering model is created by the train method (line 11). Once the Spark/MLlib K-means is instantiated and initialized (lined 12 -15), the ipnt data set xt is converted into a DenseVector then converted into a RDD (line 17). Finally the input RDD is fed to the Kmeans (kmeans.run)