Thursday, September 10, 2015

Histogram-Based Approximation in Scala

Target audience: Beginner
Estimated reading time: 4'


Introduction to a simple function approximation algorithm using a dynamically resizable histogram in Scala.


Table of contents


Overview

A typical function approximation consists of finding a model that fit a given data set. Let's consider the following data set {x, y} for which a simple model f: y = f(x) has to be approximated.


The black dots represent the data set and the red line the model y = f(x)
There are multiple approaches to approximate a model or a function to fit a given data set. The list includes

  • Splines
  • Least square regression
  • Levenberg-Marquardt
  • K-nearest neighbors
  • Histogram
  • Polynomial interpolation
  • Logistic regression
  • Neural Networks
  • Tensor products
  • ... and many more
Interpolation using dynamically resizable histograms is a very simple and effective method to approximate a data set.

Histogram class

An histogram consists of array or sequence of bins. A bin is defined with three parameters:
  • _x Center of the bin
  • _y Average value for the bin
  • count Number of data points in the bin
The distribution of histogram bins is illustrated in the following chart

Let's look at an implementation of the Bin class. The constructor takes two values:
  • _x mid point of the bin (or bucket)
  • _y current frequency for this bin
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Bin(
   var _x: Double, 
   var _y: Double =0.0) extends Serializable {
  var count: Int = 0

  def += (y: Double): Unit = {
    val newCount = count + 1
    _y = if(count == 0) y else (_y*count + y)/newCount
    count = newCount
  }
 
  def + (next: Bin): this.type = {
    val newCount = count + next.count
    if(newCount > 0) 
     _y = (_y*count + next._y*next.count)/newCount
    this
  }

  def + (next: Array[Bin]): this.type = {
    val newCount = next.aggregate(count)(
      (s,nxt) => s +nxt.count, _ + _
    )
    if( newCount > 0) {
      _y = next./:(_y*count)(
         (s, nxt) => s + nxt._y*nxt.count)/newCount
      count = newCount
    }
    this
  }
}

The method += (y: Double): Unit adds a new value y for this bin (line 6). It recomputes the average frequency _y (line 8). The method + (next: Bin): this.type (line 12) adds the content of another bin, next to this bin (line 15). Finally, the method + (next: Array[Bin]): this.type (line 19) merges an array of bins into this bin (line 23-26).
Next let's create a class, Histogram (line 1) to manage the array of bins. The constructor for the histogram has four parameters
  • maxNumBins maximum number of bin (line 2)
  • min expected minimum value in the data set (line 3)
  • max expected maximum value in the data set (line 4)
  • optional smoothing weights (line 5)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final class Histogram(
    maxNumBins: Long, 
    var min: Double = -1.0, 
    var max: Double = 1.0, 
    weights: Array[Float] = Array.empty[Float]) {
 
  val initNumBins: Int = (maxNumBins>>RADIX).toInt
  private var step = (max-min)/initNumBins
  
  private[this] var values = Range(0, initNumBins-1)
        .scanLeft(min + 0.5*step)((s, n) => s + step)
        ./:(new ArrayBuffer[Bin])(
          (vals, x) => vals += (new Bin(x)) 
        )
 
  def train(x: Double, y: Double): Unit = {
    <<(x)
    values(index(x)) += y
    if( values.size >= maxNumBins) >>
   }
 
   final def predict(x: Double): Double = {
      if( x < min || x > max) Double.NaN 
      else if(weights.length > 0) smoothPredict(x)
      else values(index(x))._y
   }
    // ... ancillary methods
 }

Implementation

The two main methods are
  • train (line 16) which updates a model (histogram) with each new data point from the training set. The histogram expands when a new data point exceeds the current boundary (min, max) of the histogram (line 19).
  • predict (line 22) which predicts the value y of the new observation x. The prediction relies on an interpolation (weighted moving average) (line 24) in the case the user specifies an array of weights in the histogram constructor.
The method Histogram.index extracts the index of the bin containing a data point (x, y).

1
2
3
4
5
6
private def index(x: Double): Int = {
    val idx = ((x - min)/step).floor.toInt
    if( idx < 1) 0 
    else if (idx >= values.size) values.size -1 
    else idx
}

This implementation of the dynamically resizable histogram, the array of bins expends by adding new bins if the new data point from the training set has a x value that is either greater that the current maximum value or lower than the current minimum value. The width of the bins, step does not change, only the current number of bins. The number of bins expanded until the maximum number of bins, maxNumBins. The method Histogram.<< Implements the expansion of the histogram for a constant bin width.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
private def >>(x: Double): Unit = 
  if(x > max) {
    values.appendAll(generate(max, x))
    max = values.last._x + step*0.5
  }
  else if(x < min) {
    values.prependAll(generate(min, x))
    min = values.head._x - step*0.5
  }
 
final def generate(limit: Double, x: Double): Array[Bin] = 
  if( x > limit) {
     val nBins = ((x-limit)/step).ceil.toInt
     var z = limit - step*0.5
     Array.tabulate(nBins)(_ => {z += step; new Bin(z) })
  }
  else {
     val nBins = ((limit-x)/step).ceil.toInt
     var z = limit + step*0.5
     Array.tabulate(nBins)(_ => {z -= step; new Bin(z) })
  }

Once the maximum number of bins maxNumBins is reached, the histogram is resized by halving the current width of the bin step. The consolidation of the histogram bins is implemented by the method Histogram.>>

1
2
3
4
private def -- : Unit = 
   values = (0 until values.size-1 by 2 ./:(new ArrayBuffer[Bin])( (ab, n) => 
             ab += (values(n) + values(n+1))
   )

Testing

Finally, the predictive method is used to compute the accuracy of the model, through the validate method. The histogram class is tested by loading a data set from file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def validate(
    hist: Histogram, 
    fileName: String, 
    eps: Double): Try[Double] = Try {
  val src = Source.fromFile(fileName)
  val fields = src.getLines.map( _.split(DELIM))
  
  val counts = fields./:((0L, 0L))((s, xy) => 
    if( Math.abs(hist.predict(xy(0).trim.toFloat)-xy(1).toFloat) < eps) 
       (s._1 + 1L, s._2 + 1L) 
    else 
       (s._1, s._2 + 1L)
  )
  
  val accuracy = counts._1.toDouble/counts._2
  src.close
  accuracy
}


References

  • Introduction to Machine Learning Chap 8 Nonparametric methods / Nonparametric density estimation E. Alpaydin- MIT press 2004
  • Scala Cookbook A. Alexander O'Reilly 2013
  • Histogram-based approximation using Apache Spark
  • Introduction to Machine Learning Chap 8 Nonparametric methods / Nonparametric density estimation E. Alpaydin- MIT press 2004
  • github.com/patnicolas

Friday, June 19, 2015

Scala View Bound vs. Context Bound

Target audience: Intermediate
Estimated reading time: 15'


This post illustrates the conversion of class using view bound parameter types into class using context bound types.

Overview

There have been some discussion of deprecating view bound in Scala 2.12. Binding a parameterized type T to a predefined and known type using view (i.e. implicit function conversion) is quite convenient. I have used (or even abuse) this construct in writing machine leaning-based applications.
There are few options for replacing view bound class parameter types with any kind of ad-hoc polymophism techniques. I chose to convert my legacy code by binding the class parameter type to a context instead of a view.

Converting class parameter type binding

First, let's look at binding the parameterized type, T of a class Classifier to the primitive type Double (line 3). The binding is actually implemented by defining an implicit conversion function f: T => Double (line 6) as described in the following code snippet. 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def sqr(x: Double): Double = x*x

class Classifier[T <: AnyVal](
   xt: Vector[Array[T]], 
   yt: Vector[Double])
  (implicit f: T => Double) {
 
 def loss(weights: Array[T]): Double = {
   xt.zip(yt).map{ case(x, y) => 
     y - x.zip(weights).map{ case(_x, w)
       => _x*w}.sum }.map(sqr(_)).sum
}

In this example, the observed features and the model parameters weights have the same type: array of element T (line 8). The input time series xt is a vector of observations and the labels (or expected outcome) yt is a vector of single variable observation of type Double (line 9). The implicit conversion apply to the features and weights. Scala provides developers with shorter alternative notation T <% Uas follows: 

class Classifier[T <% Double](
   xt: Vector[Array[T]], 
   yt: Vector[Double]) { }

Replacing the binding of the class parameters for many classes would be tedious and error prone. One solution is to create a template or pattern that can be applied to several section of the code base.
The process of binding the type T of a class parameter to another known parameter type consists of defining a value (or context) Feature for the type T (line 1). The context binding is an implicit value conversion T => Feature[T] (line 3). 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
trait Feature[T]{ def apply(x: T): Array[Double] }
 
class Classifier[T : Feature](
   xt: Vector[T],
   yt: Vector[Double]) {
  
 def loss(weights: T): Double = {
   xt.zip(yt).map{case(x, y) => {
     val mapper: Feature[T] = implicitly[Feature[T]]
     y - mapper(x).zip(mapper(weights)).map{ case(x, w) =>x*w}.sum
    }}.map(sqr(_)).sum
  }
}

The Feature trait defines the context for the conversion which is actually implemented by the apply method (line 1). The view binding implicit function T => Double defined in the first implementation of the Classifier class is replaced by the mapping (or conversion) method: Feature.apply: T => Array[Double]
The mapping function mapper is implicitly declared using the implicitly.

The notation T: Feature specifies the implicit value, feature: It is equivalent to the more descriptive declaration of the Classifier class: 

1
2
3
class Classifier[T](
   xt: Vector[T],
   yt: Vector[Double])(implicit feature: Feature[T])

Important note
A class parameterized type T cannot be bound to a primitive type P using a context: the developer has to define either an implicit view (T => P) or a context (or wrapper) W for the type T => W[P].

References

Wednesday, April 15, 2015

Recursive Viterbi Path for Hidden Markov Models

Target audience: Advanced
Estimated reading time: 7'

Many machine learning algorithms utilize dynamic programming, and the Hidden Markov Model (HMM) is a prime example of this. Scala's tail recursion optimization offers a superb alternative to the conventional iterative approach for implementing the three fundamental forms of the HMM.


Table of contents
      Markov chain
      Viterbi path
Follow me on LinkedIn

What you will learn: How to write a fast implementation of Viterbi algorithm in Scala


Notes:

Theory

Markov chain

A Markov process is a stochastic process characterized by its dependence on time, where the future's state is independent of past states if the present state is known. 

The intricacies of Markov process theory vary significantly based on the nature of the time space, whether it is discrete or continuous, and the nature of the state space, which can be discrete (countable and with all subsets measurable) or a broader topological space.
The following diagram describes a simple 2-state Markov chain [ref 2]:
Fig. 1 State transition diagram for a 2-state Markov chain

The state transition probabilities matrix is defined as: \[\begin{bmatrix} 1-\alpha & \alpha \\ \beta & 1-\beta \end{bmatrix}\]

Hidden Markov model

Hidden Markov Models (HMMs) are versatile tools used across various domains to infer sequences of data that are not directly observable. Their applications include:
  • Neuroscience
  • Speech analysis and recognition
  • Gene sequencing prediction
  • Time series analysis in general
  • Logistics and supply chain optimization
  • Handwriting recognition
  • Computational finance and quantitative analysis

A HMM algorithm uses 3 key components [ref 3].
  • A set of observations
  • A sequence of hidden states
  • A model that maximizes the joint probability of the observations and hidden states, known as the Lambda model
HMM are usually visualized as lattice of states with transition and observations as illustrated in fig 2.

Fig. 2  Diagram of lattice of states, transition and observations


There are 3 use cases (or canonical forms) of the HMM [ref 4]:
  • Evaluation: Evaluate the probability of a given sequence of observations, given a model.
  • Training: Identify (or learn) a model given a set of observations.
  • Decoding Estimate the state sequence with the highest probability to generate a given as set of observations and a model.
The last use case, Decoding, is implemented numerically using the Viterbi algorithm.

Viterbi algorithm

The terms "Viterbi path" and "Viterbi algorithm" are now commonly used to describe the application of dynamic programming algorithms in solving maximization problems that deal with probabilities.

Given a sequence of states {qt} and sequence of observations {oj}, the probability δt(i) for any sequence to have the highest probability path for the first T observations is defined for the state Si

Definition of delta function (eq 1):\[\delta _{t}(i)= \max_{q_{i}:0,T-1}\ p(q_{0:T-1} =S_{i},\ O_{0:T-1} \ | \lambda )\]
Initialization of delta (eq 2): \[\delta _{0}(i)=\pi_{i}b_{i}(O_{0})\ \ \ \psi_{t}(i)=0 \  \ i:0,T-1\]
Recursive computation of delta (eq 3):\[\delta_{t}(i)=\max_{i}( \delta_{t-1}(i)a_{ij}b_{j}O_{t})\ \ \ \ \psi_{t}=arg \max_{i}(\delta_{t-1}(i).a_{ij})\]
Computation of the optimum sequence of states Q (eq 4):\[q^{*}_{t}=\psi_{t+1}(q^{*}_{t+1})\ \ \ \ q^{*}_{T}=arg \max_{i}\delta_{T}(i)\]

Scala implementation

State transition

  • delta: sequence to have the highest probability path for the first i observations is defined for a specific test δt(i)
  • psi Matrix that contains the indices used in the maximization of the probabilities of pre-defined states
  • Qstar: the optimum sequence q* of states Q0:T-1
The state of the Viterbi/HMM computation regarding the state transition and emission matrices is defined in the HMMState class.

final class HMMState(hmmModel: HMMModel, maxIters: Int) {
  
  // Matrix of elements (t, i) that defines the highest  probability of a single path
  // of t  observations reaching state S(i) - Initialized
  val delta = Array.fill(hmmModel.getT)(Array.fill(hmmModel.getN)(0.0))

 
  // Auxiliary matrix of indices that maximize the probability 
  //of a given sequence of states - Initialized
  val psi = Array.fill(hmmModel.getT)(Array.fill(hmmModel.getN)(0))

  // Singleton to compute the sequence Q* of states with 
  // the highest probability given a sequence of observations 
  object QStar {
     private val qStar = Array.fill(lambda.getT)(0)

    // Update Q* the optimum sequence of state using backtracking.. 
    def update(t: Int, index: Int): Unit
       ...
  }

  def getModel: HMMModel = hmmModel
}


The class HMMModel is defined by the 3 components of the Markov processes
  • State transition matrix A
  • Observation emission matrix B
  • Initial state probabilities, pi
and the number of observations, numObs.

case class HMMModel  (
   A: Array[Array[Double]],
   B: Array[Array[Double]],
   pi: Array[Double],
   numObs: Int
)  {

  def numStates: Int = A.nRows
  def numSymbols: Int = B.nCols
}

The number of states and symbols are directly extracted from the state-transition A, and observations B matrices.

Viterbi path

The algorithm is conveniently illustrated by the following diagram.
Fig. 3 Illustration of state-transition and observation matrices


First, let's create the key member and method for the Lambda model for HMM. The model is defined as a tuple of the transition probability matrix A, emission probability matrix B and the initial probability π

Then let's define the basic components for implementing the Viterbi algorithm. The class ViterbiPath encapsulates the member variables critical to the recursion.
The Viterbi algorithm is fully defined with

  • lambda: Lambda model as described in the previous section (line 2)
  • obsIdx: Index of observed states
class ViterbiPath(state: HMMState, obsIdx: Array[Int]) {
  
  val hmmModel = state.getModel()       // Access the model through its state
  
  def getPath: HMMPrediction = {
      val path = recurse(1)
      HMMPrediction(path, qStar())
} }

The predicted output from the Viterbi algorithm consists of
  • List of predicted states
  • Likelihood of the sequence of observed states
case class HMMPrediction(likelihood: Double, states: Array[Int])

Tail recursion

The recursive method, recurse that implements the formula or steps defined earlier. The method relies on the tail recursion. Tail recursion or tail elimination algorithm avoid the creation of a new stack frame for each invocation, improving the performance of the entire recursion [ref 5].

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@scala.annotation.tailrec
private def recurse(t: Int): Double = {
  
  // Initialization of the delta value for the first observation
  if( t == 0)
    initial 

   // then for the subsequent observations ...
  else { 
    // Update the maximum delta value and its state index  for observation t
    Range(0, hmmModel.getN).foreach( updateMaxDelta(t, _) )
   
     // If we reached the last observation... exit by backtracking the
     // computation from the last observation
    if( t ==  obs.size-1) {
      val idxMaxDelta = Range(0, hmmModel.getN)
             .map(i => (i, state.delta(t, i)))
             .maxBy(_._2)

      // Update the Q* value with the index that maximize /the delta.A
      state.QStar.update(t+1, idxMaxDelta._1)
      idxMaxDelta._2
    }
    else    
      recurse(t+1)
  }
}

Once initialized (line 5), the maximum value of delta, maxDelta, is computed through the method updateMaxDelta after the first iteration (line 10). Once the step t reaches the maximum number of observation labels, last index in the sequence of observations obs.size-1) (line 15), the optimum sequence of state q* / state.QStar is updated (line 21). The index of the column of the transition matrix A, idxMaxDelta corresponding to the maximum of delta is computed (lines 15-18). The last step is to update the matrix QStar (line 21).

The method updateMaxDelta computes the maximum value of delta of the states for this observation at time t and the state index j.

def updateMaxDelta(t: Int, j: Int): Unit = {

    val (psiValue, index) = (0 until hmmModel.numStates)
      .map(i => (i, delta(t - 1, i) * hmmModel.A(i, j)))
      .maxBy(_._2)

    psi(t)(j) = psiValue
    delta += (t, j, index)
}


References

  • [1Scala Programming Language
  • [2] Pattern Recognition and Machine Learning; Chap 13 Sequential Data/Hidden Markov Models - C. Bishop - Springer 2009
  • [3] Scala for Machine Learning; Chap 7 Sequential Data Models - P. Nicolas - Pack Publishing 2017
  • [4] Machine Learning: A Probabilistic Perspective; Chap 17 Markov and Hidden Markov models - K. Murphy - The MIT Press 2012
  • [5] Wikipedia: Tail Recursion



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3