Thursday, January 30, 2014

The Fast and The Ugly

Overview
It is not unusual that Scala developers struggle in re-conciliating elegant functional programming style with efficient and fast execution. High order collection methods are conducive to very expressive constructs at the cost of poor performance. the zip method is no exception.
def GenIterable.zip[B](that: GenIterable[B]): CC[(A, B)]
Fortunately, the authors of the Scala library have been diligent enough to provide us with alternatives whenever possible. The Tuple2Zippped class in the run-time package offer a fast implementation of the zip.

Performance Benchmark
Let's consider the case of two large sized arrays (the same test applies to lists, vectors...) and a function f that iterates on those two arrays. The content of the arrays has no impact on the performance test, therefore we initialize the arrays with random values. The first implementation uses the Scala zipper
val rGen = new scala.util.Random
val len = 15000000
val x = Array.tabulate(len)(x => rGen.nextDouble)
val y = Array.tabulate(len)(y => rGen.nextDouble)
def zipper(x:Array[Double], y:Array[Double], f: (Double,Double) => Double): Array[Double] = x.zip(y).map( x => f(x._1, x._2) )
We compare the performance of the zip method with three "non-elegant" alternative iterators: while loop, the foreach interator and a map method as follows
def whileLoop(x:Array[Double], y:Array[Double], f:(Double,Double) => Double):Array[Double] ={
  val z = new Array[Double](x.size)
  var k = 0;
  while( k < z.size) {
   z(k) = f(x(k), y(k))
   k += 1
  }
  z
}

def forEach(x:Array[Double], y:Array[Double], f:(Double,Double)=> Double):Array[Double] = {
  val z = new Array[Double](x.size)
  var j = -1
  x.foreach( x => { j += 1; z(j) = f(x, y(j)) })
  z
}

def mapIter(x:Array[Double], y:Array[Double], f:(Double,Double)=> Double): Array[Double] = {
  var i = -1
  x.map( x => { i += 1; f(x, y(i)) })
}

Test
We perform the test with increasing values of len from 1 to 20 million floating point values as illustrated in the following driver program.
whileLoop(x,y,(x:Double, y:Double)=>x*y)
forEach(x,y,(x:Double, y:Double)=>x*y)
mapIter(x,y,(x:Double, y:Double)=>x*y)
zipper(x,y,(x:Double, y:Double)=>x*y)

The test is performed on i7 8-CPU machine with 32-Gbytes or RAM running Ubuntu with JVM heap settings of -Xmx:4G, although I would expect similar performance results on less powerful computers. The time recorded on the Y-Axis are in milliseconds and the X-Axis represents the number of iterations in millions.

Looking at the bar chart, it is clear that the "ugly" while loop has by far the most efficient execution. The test also confirms that the for loop and map are notoriously slow iterators are discovered and explained in some of my previous posts. It seems the more elegant the implementation, the poorer the performance is. However for small arrays the overhead of the zip methods is negligible.

Tuple2Zipped to the rescue
The class scala.runtime.Tuple2Zipped[El1, Repr1, El2, Repr2] is evaluated against the original zip methods on the test array. Let's reuse the array x, and y and compare Array.zip with Tuple2.zipped for the multiplication operator as follow:
x.zip(y).map( t => t._1 * t._2)
(x, y).zipped map ( _ * _)
And the winner is .. 
The Tuple2.zipped method is 2.5 times faster than Array.zip. However the non-elegant while loop is still the most effective option.

Monday, January 13, 2014

Performance Scala Parallel Collections

Overview
The Scala standard library includes some parallel collections which purpose is to shield developers from the intricacies of concurrent thread execution and race condition. The parallel collections are a very convenient approach to encapsulate concurrency into a high level abstraction similar to the traditional data workflow, scientists are familiar with.
Parallel computing is supported for some collection using the par method as listed below.
List[T]. par: ParSeq[T]
Array[T].par : ParArray[T]
Map[K,V].par : ParMap[K,V]
HashMap[K,V].par : ParHashMap[K,V]
Set[T].par : ParSet[T]
ParVector, ParRange and ParIterable

Performance Benchmark
The main purpose of parallel collections is to improve the performance of execution through concurrency. Let’s consider a map and reduce function applied to an array of arbitrary length
final val sz = 100000
val data = Array.tabulate(sz) ( _ << 1)
data.par.map( x => f(x))
data.par.reduceLeft( _ + _)
The next step is to create a benchmark test class, TestContainer that automates the performance evaluation of an array t of arbitrary type to run on a specific number of CPU cores, nCores
class TestContainer[U <: Double](val t: Array[U], nCores: Int) {
  def map(f: U => U): Unit = {
 // Single threaded processing
    t map f
 // Parallel processing
    val tpar = t.par
    tpar.tasksupport = new ForkJoinTaskSupport(new 
                               ForkJoinPool(nCores))
       def run = tpar map f 
  }
 
  def reduceLeft(f: (U,U) => U): Unit = {
    t reduceLeft f  
    val tpar = t.par
    tpar.tasksupport = new ForkJoinTaskSupport(new 
                                   ForkJoinPool(nCores))
       def run = tpar reduceLeft f 
   }
}
The method calls t.map and t.reduceLeft refer to the single threaded, default implementation of the mapper and reducer functions applied to the array t. The call t.par map and t.par reduceLeft execute the mapper and reducer concurrently. The scala class ForkJoinTaskSupport and ForkJoinPool are wrappers around the Java classes, ForkJoinTask and ForkJoinPool. The ForkJoinPool class provides developers with a very efficient way to manage threads pool because it may execute tasks that are potentially created by other tasks.
The tasks are implemented using Java threads, managed by an executor service, familiar to most Java developers.

Performance Results
The objective of the performance test is to evaluate the efficiency of the Scala parallel collection according to
  • The number of available CPU cores
  • The complexity of the computation
  • The size of the collection
In our scenario, TestContainer, we select an array of floating point values as collection, a computational intensive Fourier series as a mapper and a simple summation for our reducer.
val data = Array.tabulate(i) ( _ * 0.1)
val test = new TestContainer[Double](data, 4)
test.map(x => 2.5*Math.sin(0.01*x) + 2.0*Math.cos(0.01*x))
test.reduceLeft((s, x) => s + x)
Let’s look at the relative performance of the map task on a single threaded Array and a parallel array ParArray.


The parallelization of the array improves the performance of the map task by up to 800%! The computation of the Fourier series takes advantage of the multi-core architecture of the host. These results are consistent with the performance improvement observed in Java programming when a multi-threaded application uses the executor service, although the improvement (500%) is not as dramatic. The same test is performed with the reducer.


The parallelization of the reduceLeft method on the array does not result in significant performance improvement. As a matter of fact, the single thread version is faster that the forked joined task using 4 cores. The counter intuitive result can be easily explained by the fact that parallelization adds task synchronization overhead that is not amortized in the case of simple computation. Scala parallel collections make sense only for computation intensive tasks. What about the impact of the number of cores on the parallelization? Let’s compare the performance of the map task on 8 cores vs. 4 cores.



Summary
The performance improvement by executing the Fourier series in the map task on 8 cores instead of 4 cores is quite significant. However, it is obvious that increasing the number of cores on a single host won’t address the need for processing large datasets.

Tuesday, December 10, 2013

Temporal Difference Reinforcement Learning

Overview
There are two different approaches to implement reinforcement learning
1.  Searching in the value function space using temporal difference method
 2.  Searching in the policy space using genetic algorithm or gradient descent methods

This post focuses on the first approach.
All known reinforcement learning methods share the same objective of solving the sequential decision tasks. In a sequential decision task, an agent interacts with a dynamic system by selecting actions that affect the transition between states in order to optimize a given reward function.

At any given step i, the agent select an action a(i) on the current state s(i). The dynamic system responds by rewarding the agent for its optimal selection of the next state:\[s_{i+1}=V(s_{i})\]
The learning agent infers the policy that map the set of states {s} to the set of available actions {a}, using a value function  \[V(s_{i})\] The policy is defined at \[\pi :\,\{s_{i}\} \mapsto \{a_{i}\} \left \{ s_{i}|s_{i+1}=V(s_{i}) \right \}\]

The most common approach of learning a value function V is to use the Temporal Difference method (TD). The method uses observations of prediction differences from consecutive states, s(i) & s(i+1). If we note r the reward for selection an action from state s(i) to s(i+1) and n the learning rate, then the value V is updated as \[V(s_{i})\leftarrow V(s_{i})+\eta .(V(s_{i+1}) -V(s_{i}) + r_{i})\]

Therefore the goal of the temporal difference method is to learn the value function for the optimal policy. The 'action-value' function represents the expected value of action a on a state s and defined as \[Q(s_{i},a_{i}) = r(s_{i}) + V(s_{i})\] where r is the reward value for the state.

The Temporal Difference method relies on the estimate of the final reward to be computed for each state. There are two methods of the Temporal Difference algorithm:On-Policy and Off-Policy:
  - On-Policy method learns the value of the policy used to make the decision. The value function is derived from the execution of actions using the same policy but based on history
 - Off-Policy method learns potentially different policies.Therefore the estimate is computed using actions that have not been executed yet.
The most common formula for temporal difference approach is the Q-learning formula. It introduces the concept of discount rate to reduce the impact of the first few states on the optimization of the policy. It does not need a model of its environment. The exploitation of action-value approach consists of selecting the next state is by computing the action with the maximum reward. Conversely the exploration approach focus on the total anticipated reward.The update equation for the Q-Learning is \[Q(s_{i},a_{i}) \leftarrow Q(s_{i},a_{i}) + \eta .(r_{i+1} +\alpha .max_{a_{i+1}}Q(s_{i+1},a_{i+1}) - Q(s_{i},a_{i}))\] \[Q(s_{i},a_{i}): \mathrm{expected\,value\,action\,a\,on\,state\,s}\,\,\eta : \mathrm{learning\,rate}\,\,\alpha : \mathrm{discount\,rate}\] . One of the most commonly used On-Policy method is Sarsa which does not necessarily select the action that offer the most value.The update equation is defined as\[Q(s_{i},a_{i}) \leftarrow Q(s_{i},a_{i}) + \eta .(r_{i+1} +\alpha .Q(s_{i+1},a_{i+1}) - Q(s_{i},a_{i}))\]

Implementation
Functional languages are particularly suitable for iterative computation. We use Scala for the implementation of the temporal difference algorithm. We allow the user to specify any variant of the learning formula, using local functions or closures.
Firstly, we have to define a state class, State, that contains a list of actions that can be executed from this state. The main purpose of this class is to select the most suitable action (action with the highest reward r) and the associated maximum values Q
final class State[T <: Double](val label : String) {
   private lazy val actions : ListBuffer[Action[T]] = new ListBuffer[Action[T]]
   var maxQValue : Double = Double.MinValue
 
     // Add actions to the existing list of actions for this state
   def addAction(action : Action[T]) {
      require (action != null, "Cannot add undefined action") 
      actions.append(action)
   }
 
     // Test if the state has any actions associated
   def hasAction : Boolean = actions != null && actions.length > 1
 
 
     // Extract the most suitable action from this state and 
     // compute the maximum Q value function for this state
   def getBestAction : Action[T] = {
       maxQValue = Double.MinValue
       var bestAction : Action[T] = null
  
       if( actions.length > 1) {
          for( action <- actions) {
             if( action.qValue > maxQValue )
                maxQValue = action.qValue
                bestAction = action
             }
          }
       }
       else {
          bestAction = actions.head
          maxQValue = bestAction.qValue
       }
       bestAction
   }
}
As described in the introduction, an action of class Action has a reward and the destination state, toState (state which is reached following the action). A state except the goal state, has multiple actions but an action has only one destination or resulting state.
case class Action[T <: Double](val toState : State[T], 
                                  val reward : Double, 
                                  var qValue : Double = 0)                       
The state and action can be loaded, generated and managed by a directed graph. The graph contains a hash map of actions and a list of states. It also contains the goal or final state.The relation between state and actions is defined -  from state to actions (1:many)
-  from action to state (1:1)

A goal state, goalState, is the final state of the search and therefore has no actions associated to it. Any action which has the goal as destination state, toState, has a reward of 1.0. The value 1.0 can be interpreted as any action that leads to the goal state has probability 1.0 to reach the goal.
class StateGraph[T <: Double](val goalState : State[T]) {
   require(goalState!=null && !goalState.hasAction, "State graph requires a goal")
      
   lazy val actions: HashMap[Action[T], State[T]] = new HashMap[Action[T], State[T]]
   lazy val states = new ListBuffer[State[T]]
 
         // Add a state and its associated actions
   def addState(state: State[T]) : Unit = {
      require(state != null, "Cannot add undefined state in the state graph")
      states.append(state)
   }
 
   def getState(action: Action[T]): Option[State[T]] = actions.get(action)
 
   def isEmpty: Boolean = states.length > 1 && actions.size > 1
 
      // Initialize the search by selecting a state randomly
   def initialState: State[T] = {
      val rgen = new Random(System.currentTimeMillis)
      states(rgen.nextInt(states.length))
   }
}
Finally, the class AdaptiveSearch implements the basic temporal difference algorithm. The simplest implementation consists of two loops:
- inner loop traverses the state-action graph
- outer loop iterates to reach the best search path

The main parameter of the Temporary class are:
- Learning rate
- Discount rate
- Q-learning formula that compute the new,best possible q value
- Maximum number of iterations

In this implementation, the initial state can be either provided by the user or selected randomly from the original search/state space.
class AdaptiveSearch[T <: Double]((val learning: Double,  
                     val discount: Double,
                     qFormula: (Action[T], State[T]) => Double 
                     val maxIterations: Int = 100) {
   require(learning > 0.3 && learning < 0.95, "Incorrect learning rate")  
   require(discount > 0.6 && discount < 1.0, "Incorrect discount rate")
   require(regression != null, "Undefined regression")
 
      // Find the optimal route in a search space (or graph).   
   def search(graph: StateGraph[T], initial: State[T] =null): Int = {
      require(graph != null && !graph.isEmpty, "Undefined graph")
  
      var iters: Int = 0
      val goalState = graph.goalState
      var curState = if(initial== null) graph.initial else initial
      for( i <- 0 until maxIterations) {
   
         // Navigate the search space until the goal is reached.
         while (goalState != curState) {
            val bestAction = curState.getBestAction
            val nextBestState = bestAction.toState
            val nextBestAction = nextBestState.getBestAction

            // Update the current state with the next state with the most reward   
            curtState = nextBestState
            bestAction.qValue = qFormula(bestAction, nextBestState)
         }
         iters = i
      }
      iters
   }
}
In the following test program, the state-action directed graph is built manually with a predefined matrix of Q values {S}x{A} along with reward associated to each action. However, the input to the adaptive search can be extracted from three functions s(i) -> a(i), a(i) -> s(i+1) and r(i)
object LearningTest extends App {
   val learning: Double = 0.74
   val discount: Double = 0.95
        
     // closure that implement a variant of the Bellman/Q-formula
   val QLearning =(action: Action[Double], state: State[Double]) => {
        action.qValue + learning*(action.reward + discount*state.maxQValue-action.qValue)
   }
     
   final val goal = new State[Double]("Goal")
   val stateGraph = new StateGraph[Double](goal)
             // Build the graph of state
              .....
       
   val learningStrategy = new AdaptiveSearch[Double](learning, discount, QLearning)
   learningStrategy.search(stateGraph)
}


References
Online Value Function Determination for Reinforcement Learning  J Laird, N Debersky, N Tinkerhess


https://github.com/prnicolas

Monday, November 18, 2013

Dependencies Injection in Scala

Overview
Dependency injection is a design pattern that has been widely used in Java, by leveraging frameworks such as Spring. The objective of the pattern is to replace hard-coded dependencies with run-time association or injection of new type.

Java defines modules or components through the semantics and convention of packages. The functionality of a module is defined through one or more interfaces and implemented through the composition and inheritance of concrete classes. Polymorphism is used to "dynamically wire" those classes, assembled through composition and inheritance into patterns which address a specific design problem (publish-subscribe, strategy, factory..).
However those capabilities have been proven limited for creating very dynamic and complex applications. The Scala programming language provides developers with a dependencies injection mechanism based on self type annotation and that does not rely on 3rd party framework.

Reuse through Inheritance
The simplest and commonly used form of reuse in any Object Oriented Programming is Inheritance. Let's consider an interface House which is implemented by an abstract or concrete class 'House with Furniture & Appliance" which in turn is sub-classed by a well defined House.

It is well documented that inheritance is a poor mechanism for code reuse because data is not properly encapsulated as a sub-class may access internals of the base class. Moreover any future changes in the base class of interface (Framework) will propagate through the sub-class (dependencies).

Reuse through Composition
It is a well documented and researched fact that composition provides a more robust encapsulation than inheritance as the main class delegates or routes method invocation to the appropriate internal components. Contrary to inheritance for which changes in the base class may have unintended consequences over the subclasses, changes in components or inner classes can be made independently of the main class or outer component.
Clearly in the example above, composition is more appropriate. After all a House with Furniture and Appliances can be defined as a House that contains 
Furniture and Appliance:



Inversion of Control
Framework such as Spring have introduced the concept of Inversion of Control Containers (IoC) and dependency injection which is a form of IoC. In case of inversion of control, a framework define interfaces which are extended or implemented by the application or client code. Instead of having the application using the Framework API, the framework relies on the application for implementing a specific function.
Let's take the example of a generic service that access a database.

public interface Service {
    JSonObject query(String mySQLQuery);
}

public interface DbAccess { }

public class ServiceImpl implements Service {
    private Dbaccess dbAccess = null;
    public void setDbaccess(Dbaccess dbAccess) { this.dbAccess = dbAccess; }

    @override
    public JSonObject query(String mySQLQuery) { .. }
}

In the example above, a concrete implementation of DbAccess interface such as MySQLAccess or MongoDBAccess can be injected or passed to the implementation of the service. Scala provides the developer with a similar and powerful mechanism to inject dependencies to a concrete class, known as Cake pattern.

Dependency Injection
At its core, dependencies injection relies on 3 components:
- Consumer or list of dependent components
- Provider which injects the dependencies
- Dependencies

Let's consider the recipe example above. A House requires not only Furniture, Appliance but a Layout plan with step by step instructions. 


Each piece of furniture is defined by its name, category and price. Specialized furniture such as PatioFurniture and BathroomFurniture can also be created.

class Furniture(val name: String, val category: String, val price: Double) 

class PatioFurniture(val _name: String, val _price: Double, val season: String) 
                                        extends Furniture(_name, "Patio", _price)

class BathroomFurniture(val _name: String, val _price: Double, val floor: Int=1) 
                                  extends Furniture(_name, "Bathroom", _price)

A house contains also appliances and require a layout plan to be setup. An appliance has a name, price and a warranty if needed and available. A layout object is defined by its name and option.
class Appliance(val name: String, val warranty: Boolean, val price: Double)
class Layout(val name: String, val option: Int)

The goal is to dynamically furnish a house with a combination of appliances, pieces of furniture, following a layout plan and ultimately computing the total cost.
To this purpose, we create a module, implemented as a trait that encapsulates each type of classes. For instance to manage the furniture, we can create a FurnitureModule to define furniture.

trait FurnitureModule {
  val furnitures: List[Furniture]
  
   class Furniture(val id: String, val category: String, val price: Double) 
   class PatioFurniture(val _id: String, val _price: Double, val season: String) 
                                                  extends Furniture(_id, "Patio", _price)
}


trait BathroomFurnitureModule extends FurnitureModule {
   class BathroomFurniture(val _id: String, val _price: Double, val floor: Int=1) 
                                            extends Furniture(_id, "Bathroom", _price)
}

The first trait, FurnitureModule defines a generic Furniture and a specialized furniture type, PatioFurniture. Dynamic binding is managed by the module that encapsulates the hierarchy of furniture types. Alternatively, the client code can manage dynamic binding or dependency injection by creating a sub-module, BathroomFurnitureModule, to manage other type of furniture, BathroomFurniture. Those two approach of injecting dependencies can be easily combined.
The same strategy is applied to the Appliance and Layout.
trait ApplianceModule {
  val appliances: List[Appliance]
  
  class Appliance(val name: String, val warranty: Boolean, val price: Double) {
     def this(name: String, price: Double) = this(name, true ,price)
     def cost: Double = if( warranty ) price * 1.15 else price
  }
}


trait LayoutModule {
  val layout: Layout
    
  class Layout(val name: String, val option: Int) {
     val movingInstructions: List[String] = List.empty
  }
}

The factory class, RoomFurnishing, relies on a self reference using one of the components, LayoutModule and other components as mixin, ApplianceModule and FurnitureModule. The factory class defines all the methods that is required to manage any combination of the components, in our case the cost.

class RoomFurnishing {
   self: LayoutModule with FurnitureModule with ApplianceModule => 
 
      def cost: String = {
         val totalCost = furnitures.foldLeft[Double](0.0)((cost,p) => cost + p.price) +
                   appliances.foldLeft[Double](0.0)((cost,p) => cost + p.price)
         new StringBuilder(layout.name).append(" cost ").append(totalCost.toString).toString
      }
      def movingDate: String = "October, 2013"
}

Here is an example how the client code can dynamically assemble all the components and compute the total cost.

if( country ) {
   val houseFurnishing = new RoomFurnishing 
                                  with FurnitureModule 
                                          with ApplianceModule 
                                                 with LayoutModule {

      val layout = new Layout("Country Home", 2)
      val furnitures = List[Furniture](new Furniture("Modern Chair", "Chair", 136.0), 
                                         new PatioFurniture("Bench", 350.0, "Summer"))
      val appliances = List[Appliance](new Appliance("Microwave Oven", false, 185.0), 
                                         new Appliance("Dishwaher", true, 560.0))
    }
    println(houseFurnishing.cost)
}

else {
    val houseFurnishing = new RoomFurnishing 
                                with BathroomFurnitureModule 
                                         with ApplianceModule 
                                                    with LayoutModule {

       val layout = new Layout("Apartment", 4)
       val furnitures = List[BathroomFurniture](new BathroomFurniture("Stool", 89.5), 
                                                 new BathroomFurniture("Cabinet", 255.6, 2))
       val appliances = List[Appliance](new Appliance("Microwave Oven", false, 185.0), 
                                         new Appliance("Dishwaher", true, 560.0))
    }
    println(houseFurnishing.cost)
}

This technique combines class composition, inheritance, self-reference and abstract variables to provide a simple and flexible framework. The post also introduces the concept of layout or assembler to hide some complexity and become the primary mixin. I may elaborate on this concept in a future post. I strongly recommend the article written by Jonas Bonér on Cake pattern (listed in the references) to get in depth understanding on both the motivation and variant of this pattern.


References
Jonas Boner article on Cake Pattern
Cake solutions Blog
The Scala Programming Language - M. Odersky, L. Spoon, B.Venners - Artima 2007 github.com/prnicolas

Friday, November 8, 2013

Discrete Kalman Optimal Estimator

Objective
This post is an introduction to the Kalman optimal filter using the Scala programming language as implementation. The Kalman filter is widely used in signal processing and statistical analysis to quantify or estimate noise created by a process and noise generated by measurement devices.

Overview 
A Kalman filter is an optimal estimator that derives parameters from indirect and inaccurate observations. The objective is the algorithm is to minimize the mean square error of the model parameters. The algorithm is recursive and therefore can be used for real-time signal analysis. The Kalman filter has one main limitation: it requires the process to be linear y = a.f(x) + b.g(x) + .... The state is impacted by Gaussian noise in the process and measurement.



Covariance
The  Kalman filter represents the estimated state and error state as a covariance matrix. The covariance of a random vector x = { ..  x  .. } is a n by n positive definite, symmetric matrix with the variance of each variable as diagonal elements \[cov(\mathbf{x})= E[(\mathbf{x} - \overline{\mathbf{x}})(\mathbf{x} - \overline{\mathbf{x}} )^{t}] = \int_{-\infty }^{\infty} .. \int_{-\infty }^{\infty}(\mathbf{x} - \overline{\mathbf{x}})(\mathbf{x} - \overline{\mathbf{x}} )^{t}\,p(\mathbf{x})\,dx_{1} .. dx_{n}\] Such a matrix can be diagonalized by computing the eigenvectors (or basis vectors) in order to decouple the contribution of the noise or errors.

State Equation Model
The state of a deterministic discrete time linear dynamic system is the smallest vector that summarizes the past of the system in full and allow a theoretical prediction of the future behavior, in the absence of noise. If x(k) is the n-dimension state vector at step k, u(k) the input vector, w(k) the unknown process noise vector normalized for a zero mean, and R(k) the covariance matrix for the measurement noise at step k, z the actual measurement of the state at stepk, then \[\mathbf{x}_{k+1} = A_{k}\mathbf{x}_{k} + B_{k}\mathbf{u}_{k} + \mathbf{w}_{k}\,\,\,(1)\,\,with\,\,R_{k} = E[\mathbf{w}_{k}.\mathbf{w}_{k}^T]\\\mathbf{z}_{k} = H_{k}.\mathbf{x}_{k} + \mathbf{v}_{k}\,\,\,(2)\,\,\,\,with\,\,\,Q_{k} = E[\mathbf{v}_{k}.\mathbf{v}_{k}^T]\] where H(k) is the measurement equation related to the state A(k) and Q(k) is covariance matrix of the process noise at step k. We assume that the covariance matrix for the measurement noise, R and the covariance matrix for the error noise Q follow a Gaussian probability distribution.

Implementation
We leverage the support for functional constructs provided in Scala. Validation of method arguments, exceptions and non essential supporting methods are omitted for the sake of readability of the code snippets.
First we need to define the noise w generated by the linear  process and the noise generated by the measurement device. Those functions are defines as members of the KalmanNoise class and should generate Gaussian random distributions for the noise on the process and measurement. The validation that the noise distribution follows a normal distribution is omitted. Lazy values are convenient in this case where one of the noise distribution is null or unknown.

final class KalmanNoise(private val size : Int,
                       val processNoiseGen: () => Double= Random.nextGaussian,
                       val measureNoiseGen: () => Double= Random.nextGaussian) {
 
    lazy val processNoise = generate( processNoiseGen )
    lazy val measurementNoise = generate( measureNoiseGen )
    
    private[this] def generate( f: () => Double): ArrayRealVector = {
         val noise = new ArrayRealVector(size)
         (0 until size) foreach( n => noise.setEntry(n, f()) )
         noise
    }
}

The KalmanModel class has two objectives: 
  - Encapsulate the Matrices and Vectors used in the generation of the state and error equations
   - Generate a process model and measurement model on demand (factory)
The processModel instance depends on the initial state of the process x0 which is specified independently of the Kalman model parameters. The model can be extended through inheritance by overriding the processModel and measurementModel methods by custom algorithms.

case class KalmanModel(val A: Matrix[Double], 
                       val B: Matrix[Double],
                       val H: Matrix[Double],
                       val Q: Matrix[Double],
                       val R: Matrix[Double],
                       val P0: Matrix[Double]) {
  def processModel(x0: Array[Double]): ProcessModel 
                         = new DefaultProcessModel(A,B,Q,x0,P0)
  def measurementModel: MeasurementModel = new DefaultMeasurementModel(H,R)
}

The method below implements the basic sequences of the execution of an iteration of the update of the state of the process
  1. predict the state of the process at the next step (x, A)
  2. extract or generate noise for the process and the measurement devices (w, v)
  3. update the state of the process (x)
  4. computes the error on the measurement (z)
  5. adjust the covariance matrices
Note that the control input u and initial state x0 are defined as arguments of the main method because they are independent from the model.

def compute(u: Array[Double], x0: Array[Double], maxNumIters: Int = 50) : Unit =  {
   lazy val processModel = model.processModel(x0)
   lazy val measurementModel = model.measurementModel

    val filter = new KalmanFilter(processModel, measurementModel)
    val uVector = new ArrayRealVector(u)
    var xVector = processModel.getInitialStateEstimate

     // Iterate the sequence: prediction, noise & state estimation and correction
     (0 until maxNumIters).foreach( i => {
           filter.predict(uVector)
           xVector = newState(uVector, xVector, processModel)
           filter.correct(newMeasurement(xVector))
     })
}

This method implements the iterative state equation of the model:  x <- A.x + B.u + w described in the introduction.
private def newState(uVector: RealVector, 
                     xVector: RealVector, 
                     processModel: ProcessModel): RealVector = {                     
    A = processModel.getStateTransitionMatrix
    B = processModel.getControlMatrix
    w = noise.processNoise.operate(xVector).add(B.operate(uVector)).add(w)
}  

Lastly, the method defined below, Implements the computation of the error on observed state using the equation z = H.x + v where x is the current state of the process and v is the noise in measurement.
private def newMeasurement(xVector: RealVector): RealVector ={                           
   H = model.measurementModel.getMeasurementMatrix
   val v = noise.measurementNoise
   H.operate(xVector).add(v)
}
Example We will use a simple example of the Newton law of gravity.  If x is the weight of an object, the differential equation can be integrated with a step 1 as follows \[\frac{\mathrm{d}^2 y_{t}}{\mathrm{d} t^{2}} + g = 0\,\,\Rightarrow\\ y_{t+dt} = y_{t}+ \dot{y}_{t}\,dt - \frac{g}{2}\,dt^{2}\,\,\,;\,\,\dot{y}_{t+1} = \dot{y}_{t} - g\,dt\,\,\,(\mathbf{3})\] The state vector x(k) for object at time k is defined by \[\mathbf{x}_{k} = [y_{k},\frac{\mathrm{d} y_{k}}{\mathrm{d} x}]^{T}\] and the state equation   \[\mathbf{x}_{k+1} = A_{k}.\mathbf{x}_{k} + B_{k}.\mathbf{u}_{k}\,\,\,with\,\,\,A_{k} =\begin{vmatrix} 1 & 1\\ 0 & 1 \end{vmatrix}\,\,,\,B_{k}=\begin{vmatrix}0.5 \\ 1 \end{vmatrix}\] We use the Apache Commons Math library version 3.0 (Apache Commons Math User Guide to filter and predict the motion of a body subjected to gravity.
val dt = 0.05 
val initialHeight = 100.0
val initialSpeed = 0.0
val processNoise = 0.0
val measureNoise = 0.2
val gravity = 9.81
 
val A = Matrix[Double](1.0, dt, 0.0, 1.0)
val B = Matrix[Double](0.5*dt*dt, dt)
val H = Matrix[Double](1.0, 0.0)
val Q = Matrix[Double](1e-4, 1e-3, 1e-3, 1e-4)
val R = Matrix[Double](measureNoise, measureNoise)
 
  // Initialize the drop at 100 feet with no speed
val x0 = Array[Double](initialHeight, initialSpeed)
 
  // Create the process and noise models
val model = new KalmanModel(A, B, H, null, R, Defaults.IdentityMatrix)
val noise = new KalmanNoise(2, null, () => Random.nextGaussian*measureNoise)        
 
  // Compute the new state for height and velocity
val predictor = new KalmanPredictor(model, noise)
predictor.compute(Array[Double](gravity), x0)


References 
Introduction to Kalman Filter University of North Carolina G. Welsh, G. Bishop 2006 github.com/prnicolas

Wednesday, October 30, 2013

Type Erasure, Manifest, Specialization in Scala

Overview
Scala and Java programming languages use type erasure to compile away generics. The type parameters [U <: T]  are removed and replaced by their upper bound T or Any. The process involves boxing and un-boxing the primitives types if they are used in the code as type parameters, degrading performance. This post describes two approaches to work around type erasure: Manifest and type specialization.

Description
Let consider a class ListCompare that compare lists of parametric type U bounded by the type Item. Note that ancillary code such as returned Option type or check on method and class arguments are omitted for the sake of simplicity.

case class Item

class ListCompare[U <: Item](val xs: List[U])(implicit f: U =&gt Ordered[U]) {
   def compare(xso: List[U]): Boolean = {
      xso match {
        case str: List[String] => 
           if(xs.size==xso.size) xs.zip(xso).exists( x=> x._1.compareTo(x._2) != 0) else false
      
        case n: List[Int] => 
           if(xs.size==xso.size) xs.zip(xso).exists(x => x._1!= x._2) else false
    
        case _ => false
     }
  }
}
>
The class has to have an implicit conversion U => Ordered[T] to support the comparison of strings. The code above will generate the following warning message: "non-variable type argument String in type pattern List[String] is unchecked since it is eliminated by erasure". The warning message is generated by the compiler because the two parameterized type, List[String] and List[Int] may not be available to JVM during the execution of the pattern matching.
On solution is to use a Manifest. A Manifest[T] is an opaque descriptor for type T. It allows access to the erasure of the type as a Class instance. The most common usage of manifest is related to the creation of native Arrays if the class is not known at compile time as illustrated in the code snippet below.

def myArray[T] = new Array[T](0
def myArray[T](implicit m: Manifest[T]) = new Array[T](0)
def myArray[T: Manifest] = new Array[T](0)

The first line of code won't compile. The second function will maintain the erasure by passing the manifest as an implicit parameter. The last function defined the manifest as a context bound. The solution to our problem is to use the Manifest as a implicit parameter to the compare function.

class ListCompare[U <: Item](val xs: List[U])(implicit f: U => Ordered[U]) {
  def compare(xso: List[U])(implicit u: Manifest[List[U]]): Boolean = {
    
    if( u <:< manifest[List[String]] ) 
       if( xs.size == xso.size)  xs.zip(xso).exists( x=> x._1.compareTo(x._2) != 0) else false
    
    else if(u <:< manifest[List[Int]])  
       if( xs.size == xso.size) xs.zip(xso).exists(x => x._1!=x._2) else false
    
    else false
   }
}

A second option is to generate a separate class for the primitive type, Int and String using the @specialized annotation. The annotation forces the compiler to generate byte code for each primitive listed as specialized type. For instance the instruction @specialized(Int, Double) T generates two extra, efficient classes for the primitive types Int and Double. The original ListCompare class is re-written using the annotation as follows:

class ListComp[@specialized Int, String, U <: Item]
                 (val xs:List[U])(implicit f: U =>Ordered[U]) {
  
  def compare(xso: List[U]): Boolean = 
      if(xs.size == xso.size) xs.zip(xso).exists( x=> x._1.compareTo(x._2)!=0) else false
}
>
The code above will not throw a warning or error. However there is not such a thing as a free lunch, as the compiler generates extra byte code for the methods associated to the specialized primitive types. The objective in this case is to trade a higher memory consumption for performance improvement.

References
Programming in Scala M. Odesky, L.Spoon, B. Venners - Artima 2008
Scala for the Impatient - Cay Horstman Addison-Wesley 2012