Tuesday, December 10, 2013

Temporal Difference Reinforcement Learning

Overview
There are two different approaches to implement reinforcement learning
1.  Searching in the value function space using temporal difference method
2.  Searching in the policy space using genetic algorithm or gradient descent methods

This post focuses on the first approach.
All known reinforcement learning methods share the same objective of solving the sequential decision tasks. In a sequential decision task, an agent interacts with a dynamic system by selecting actions that affect the transition between states in order to optimize a given reward function.

At any given step i, the agent select an action a(i) on the current state s(i). The dynamic system responds by rewarding the agent for its optimal selection of the next state:$s_{i+1}=V(s_{i})$
The learning agent infers the policy that map the set of states {s} to the set of available actions {a}, using a value function  $V(s_{i})$ The policy is defined at $\pi :\,\{s_{i}\} \mapsto \{a_{i}\} \left \{ s_{i}|s_{i+1}=V(s_{i}) \right \}$

The most common approach of learning a value function V is to use the Temporal Difference method (TD). The method uses observations of prediction differences from consecutive states, s(i) & s(i+1). If we note r the reward for selection an action from state s(i) to s(i+1) and n the learning rate, then the value V is updated as $V(s_{i})\leftarrow V(s_{i})+\eta .(V(s_{i+1}) -V(s_{i}) + r_{i})$

Therefore the goal of the temporal difference method is to learn the value function for the optimal policy. The 'action-value' function represents the expected value of action a on a state s and defined as $Q(s_{i},a_{i}) = r(s_{i}) + V(s_{i})$ where r is the reward value for the state.

The Temporal Difference method relies on the estimate of the final reward to be computed for each state. There are two methods of the Temporal Difference algorithm:On-Policy and Off-Policy:
- On-Policy method learns the value of the policy used to make the decision. The value function is derived from the execution of actions using the same policy but based on history
- Off-Policy method learns potentially different policies.Therefore the estimate is computed using actions that have not been executed yet.
The most common formula for temporal difference approach is the Q-learning formula. It introduces the concept of discount rate to reduce the impact of the first few states on the optimization of the policy. It does not need a model of its environment. The exploitation of action-value approach consists of selecting the next state is by computing the action with the maximum reward. Conversely the exploration approach focus on the total anticipated reward.The update equation for the Q-Learning is $Q(s_{i},a_{i}) \leftarrow Q(s_{i},a_{i}) + \eta .(r_{i+1} +\alpha .max_{a_{i+1}}Q(s_{i+1},a_{i+1}) - Q(s_{i},a_{i}))$ $Q(s_{i},a_{i}): \mathrm{expected\,value\,action\,a\,on\,state\,s}\,\,\eta : \mathrm{learning\,rate}\,\,\alpha : \mathrm{discount\,rate}$ . One of the most commonly used On-Policy method is Sarsa which does not necessarily select the action that offer the most value.The update equation is defined as$Q(s_{i},a_{i}) \leftarrow Q(s_{i},a_{i}) + \eta .(r_{i+1} +\alpha .Q(s_{i+1},a_{i+1}) - Q(s_{i},a_{i}))$

Implementation
Functional languages are particularly suitable for iterative computation. We use Scala for the implementation of the temporal difference algorithm. We allow the user to specify any variant of the learning formula, using local functions or closures.
Firstly, we have to define a state class, State, that contains a list of actions that can be executed from this state. The main purpose of this class is to select the most suitable action (action with the highest reward r) and the associated maximum values Q
final class State[T <: Double](val label : String) {
private lazy val actions : ListBuffer[Action[T]] = new ListBuffer[Action[T]]
var maxQValue : Double = Double.MinValue

// Add actions to the existing list of actions for this state
require (action != null, "Cannot add undefined action")
actions.append(action)
}

// Test if the state has any actions associated
def hasAction : Boolean = actions != null && actions.length > 1

// Extract the most suitable action from this state and
// compute the maximum Q value function for this state
def getBestAction : Action[T] = {
maxQValue = Double.MinValue
var bestAction : Action[T] = null

if( actions.length > 1) {
for( action <- actions) {
if( action.qValue > maxQValue )
maxQValue = action.qValue
bestAction = action
}
}
}
else {
maxQValue = bestAction.qValue
}
bestAction
}
}

As described in the introduction, an action of class Action has a reward and the destination state, toState (state which is reached following the action). A state except the goal state, has multiple actions but an action has only one destination or resulting state.
case class Action[T <: Double](val toState : State[T],
val reward : Double,
var qValue : Double = 0)

The state and action can be loaded, generated and managed by a directed graph. The graph contains a hash map of actions and a list of states. It also contains the goal or final state.The relation between state and actions is defined -  from state to actions (1:many)
-  from action to state (1:1)

A goal state, goalState, is the final state of the search and therefore has no actions associated to it. Any action which has the goal as destination state, toState, has a reward of 1.0. The value 1.0 can be interpreted as any action that leads to the goal state has probability 1.0 to reach the goal.
class StateGraph[T <: Double](val goalState : State[T]) {
require(goalState!=null && !goalState.hasAction, "State graph requires a goal")

lazy val actions: HashMap[Action[T], State[T]] = new HashMap[Action[T], State[T]]
lazy val states = new ListBuffer[State[T]]

// Add a state and its associated actions
def addState(state: State[T]) : Unit = {
require(state != null, "Cannot add undefined state in the state graph")
states.append(state)
}

def getState(action: Action[T]): Option[State[T]] = actions.get(action)

def isEmpty: Boolean = states.length > 1 && actions.size > 1

// Initialize the search by selecting a state randomly
def initialState: State[T] = {
val rgen = new Random(System.currentTimeMillis)
states(rgen.nextInt(states.length))
}
}

Finally, the class AdaptiveSearch implements the basic temporal difference algorithm. The simplest implementation consists of two loops:
- inner loop traverses the state-action graph
- outer loop iterates to reach the best search path

The main parameter of the Temporary class are:
- Learning rate
- Discount rate
- Q-learning formula that compute the new,best possible q value
- Maximum number of iterations

In this implementation, the initial state can be either provided by the user or selected randomly from the original search/state space.
class AdaptiveSearch[T <: Double]((val learning: Double,
val discount: Double,
qFormula: (Action[T], State[T]) => Double
val maxIterations: Int = 100) {
require(learning > 0.3 && learning < 0.95, "Incorrect learning rate")
require(discount > 0.6 && discount < 1.0, "Incorrect discount rate")
require(regression != null, "Undefined regression")

// Find the optimal route in a search space (or graph).
def search(graph: StateGraph[T], initial: State[T] =null): Int = {
require(graph != null && !graph.isEmpty, "Undefined graph")

var iters: Int = 0
val goalState = graph.goalState
var curState = if(initial== null) graph.initial else initial
for( i <- 0 until maxIterations) {

// Navigate the search space until the goal is reached.
while (goalState != curState) {
val bestAction = curState.getBestAction
val nextBestState = bestAction.toState
val nextBestAction = nextBestState.getBestAction

// Update the current state with the next state with the most reward
curtState = nextBestState
bestAction.qValue = qFormula(bestAction, nextBestState)
}
iters = i
}
iters
}
}

In the following test program, the state-action directed graph is built manually with a predefined matrix of Q values {S}x{A} along with reward associated to each action. However, the input to the adaptive search can be extracted from three functions s(i) -> a(i), a(i) -> s(i+1) and r(i)
object LearningTest extends App {
val learning: Double = 0.74
val discount: Double = 0.95

// closure that implement a variant of the Bellman/Q-formula
val QLearning =(action: Action[Double], state: State[Double]) => {
action.qValue + learning*(action.reward + discount*state.maxQValue-action.qValue)
}

final val goal = new State[Double]("Goal")
val stateGraph = new StateGraph[Double](goal)
// Build the graph of state
.....

val learningStrategy = new AdaptiveSearch[Double](learning, discount, QLearning)
learningStrategy.search(stateGraph)
}


References
Online Value Function Determination for Reinforcement Learning  J Laird, N Debersky, N Tinkerhess

https://github.com/prnicolas

Monday, November 18, 2013

Dependencies Injection in Scala

Overview
Dependency injection is a design pattern that has been widely used in Java, by leveraging frameworks such as Spring. The objective of the pattern is to replace hard-coded dependencies with run-time association or injection of new type.

Java defines modules or components through the semantics and convention of packages. The functionality of a module is defined through one or more interfaces and implemented through the composition and inheritance of concrete classes. Polymorphism is used to "dynamically wire" those classes, assembled through composition and inheritance into patterns which address a specific design problem (publish-subscribe, strategy, factory..).
However those capabilities have been proven limited for creating very dynamic and complex applications. The Scala programming language provides developers with a dependencies injection mechanism based on self type annotation and that does not rely on 3rd party framework.

Reuse through Inheritance
The simplest and commonly used form of reuse in any Object Oriented Programming is Inheritance. Let's consider an interface House which is implemented by an abstract or concrete class 'House with Furniture & Appliance" which in turn is sub-classed by a well defined House.

It is well documented that inheritance is a poor mechanism for code reuse because data is not properly encapsulated as a sub-class may access internals of the base class. Moreover any future changes in the base class of interface (Framework) will propagate through the sub-class (dependencies).

Reuse through Composition
It is a well documented and researched fact that composition provides a more robust encapsulation than inheritance as the main class delegates or routes method invocation to the appropriate internal components. Contrary to inheritance for which changes in the base class may have unintended consequences over the subclasses, changes in components or inner classes can be made independently of the main class or outer component.
Clearly in the example above, composition is more appropriate. After all a House with Furniture and Appliances can be defined as a House that contains
Furniture and Appliance:

Inversion of Control
Framework such as Spring have introduced the concept of Inversion of Control Containers (IoC) and dependency injection which is a form of IoC. In case of inversion of control, a framework define interfaces which are extended or implemented by the application or client code. Instead of having the application using the Framework API, the framework relies on the application for implementing a specific function.
Let's take the example of a generic service that access a database.

public interface Service {
JSonObject query(String mySQLQuery);
}

public interface DbAccess { }

public class ServiceImpl implements Service {
private Dbaccess dbAccess = null;
public void setDbaccess(Dbaccess dbAccess) { this.dbAccess = dbAccess; }

@override
public JSonObject query(String mySQLQuery) { .. }
}


In the example above, a concrete implementation of DbAccess interface such as MySQLAccess or MongoDBAccess can be injected or passed to the implementation of the service. Scala provides the developer with a similar and powerful mechanism to inject dependencies to a concrete class, known as Cake pattern.

Dependency Injection
At its core, dependencies injection relies on 3 components:
- Consumer or list of dependent components
- Provider which injects the dependencies
- Dependencies

Let's consider the recipe example above. A House requires not only Furniture, Appliance but a Layout plan with step by step instructions.

Each piece of furniture is defined by its name, category and price. Specialized furniture such as PatioFurniture and BathroomFurniture can also be created.

class Furniture(val name: String, val category: String, val price: Double)

class PatioFurniture(val _name: String, val _price: Double, val season: String)
extends Furniture(_name, "Patio", _price)

class BathroomFurniture(val _name: String, val _price: Double, val floor: Int=1)
extends Furniture(_name, "Bathroom", _price)


A house contains also appliances and require a layout plan to be setup. An appliance has a name, price and a warranty if needed and available. A layout object is defined by its name and option.
class Appliance(val name: String, val warranty: Boolean, val price: Double)
class Layout(val name: String, val option: Int)


The goal is to dynamically furnish a house with a combination of appliances, pieces of furniture, following a layout plan and ultimately computing the total cost.
To this purpose, we create a module, implemented as a trait that encapsulates each type of classes. For instance to manage the furniture, we can create a FurnitureModule to define furniture.

trait FurnitureModule {
val furnitures: List[Furniture]

class Furniture(val id: String, val category: String, val price: Double)
class PatioFurniture(val _id: String, val _price: Double, val season: String)
extends Furniture(_id, "Patio", _price)
}

trait BathroomFurnitureModule extends FurnitureModule {
class BathroomFurniture(val _id: String, val _price: Double, val floor: Int=1)
extends Furniture(_id, "Bathroom", _price)
}


The first trait, FurnitureModule defines a generic Furniture and a specialized furniture type, PatioFurniture. Dynamic binding is managed by the module that encapsulates the hierarchy of furniture types. Alternatively, the client code can manage dynamic binding or dependency injection by creating a sub-module, BathroomFurnitureModule, to manage other type of furniture, BathroomFurniture. Those two approach of injecting dependencies can be easily combined.
The same strategy is applied to the Appliance and Layout.
trait ApplianceModule {
val appliances: List[Appliance]

class Appliance(val name: String, val warranty: Boolean, val price: Double) {
def this(name: String, price: Double) = this(name, true ,price)
def cost: Double = if( warranty ) price * 1.15 else price
}
}

trait LayoutModule {
val layout: Layout

class Layout(val name: String, val option: Int) {
val movingInstructions: List[String] = List.empty
}
}


The factory class, RoomFurnishing, relies on a self reference using one of the components, LayoutModule and other components as mixin, ApplianceModule and FurnitureModule. The factory class defines all the methods that is required to manage any combination of the components, in our case the cost.

class RoomFurnishing {
self: LayoutModule with FurnitureModule with ApplianceModule =>

def cost: String = {
val totalCost = furnitures.foldLeft[Double](0.0)((cost,p) => cost + p.price) +
appliances.foldLeft[Double](0.0)((cost,p) => cost + p.price)
new StringBuilder(layout.name).append(" cost ").append(totalCost.toString).toString
}
def movingDate: String = "October, 2013"
}


Here is an example how the client code can dynamically assemble all the components and compute the total cost.

if( country ) {
val houseFurnishing = new RoomFurnishing
with FurnitureModule
with ApplianceModule
with LayoutModule {

val layout = new Layout("Country Home", 2)
val furnitures = List[Furniture](new Furniture("Modern Chair", "Chair", 136.0),
new PatioFurniture("Bench", 350.0, "Summer"))
val appliances = List[Appliance](new Appliance("Microwave Oven", false, 185.0),
new Appliance("Dishwaher", true, 560.0))
}
println(houseFurnishing.cost)
}

else {
val houseFurnishing = new RoomFurnishing
with BathroomFurnitureModule
with ApplianceModule
with LayoutModule {

val layout = new Layout("Apartment", 4)
val furnitures = List[BathroomFurniture](new BathroomFurniture("Stool", 89.5),
new BathroomFurniture("Cabinet", 255.6, 2))
val appliances = List[Appliance](new Appliance("Microwave Oven", false, 185.0),
new Appliance("Dishwaher", true, 560.0))
}
println(houseFurnishing.cost)
}


This technique combines class composition, inheritance, self-reference and abstract variables to provide a simple and flexible framework. The post also introduces the concept of layout or assembler to hide some complexity and become the primary mixin. I may elaborate on this concept in a future post. I strongly recommend the article written by Jonas Bonér on Cake pattern (listed in the references) to get in depth understanding on both the motivation and variant of this pattern.

References
Jonas Boner article on Cake Pattern
Cake solutions Blog
The Scala Programming Language - M. Odersky, L. Spoon, B.Venners - Artima 2007 github.com/prnicolas

Friday, November 8, 2013

Discrete Kalman Optimal Estimator

Objective
This post is an introduction to the Kalman optimal filter using the Scala programming language as implementation. The Kalman filter is widely used in signal processing and statistical analysis to quantify or estimate noise created by a process and noise generated by measurement devices.

Overview
A Kalman filter is an optimal estimator that derives parameters from indirect and inaccurate observations. The objective is the algorithm is to minimize the mean square error of the model parameters. The algorithm is recursive and therefore can be used for real-time signal analysis. The Kalman filter has one main limitation: it requires the process to be linear y = a.f(x) + b.g(x) + .... The state is impacted by Gaussian noise in the process and measurement.

Covariance
The  Kalman filter represents the estimated state and error state as a covariance matrix. The covariance of a random vector x = { ..  x  .. } is a n by n positive definite, symmetric matrix with the variance of each variable as diagonal elements $cov(\mathbf{x})= E[(\mathbf{x} - \overline{\mathbf{x}})(\mathbf{x} - \overline{\mathbf{x}} )^{t}] = \int_{-\infty }^{\infty} .. \int_{-\infty }^{\infty}(\mathbf{x} - \overline{\mathbf{x}})(\mathbf{x} - \overline{\mathbf{x}} )^{t}\,p(\mathbf{x})\,dx_{1} .. dx_{n}$ Such a matrix can be diagonalized by computing the eigenvectors (or basis vectors) in order to decouple the contribution of the noise or errors.

State Equation Model
The state of a deterministic discrete time linear dynamic system is the smallest vector that summarizes the past of the system in full and allow a theoretical prediction of the future behavior, in the absence of noise. If x(k) is the n-dimension state vector at step k, u(k) the input vector, w(k) the unknown process noise vector normalized for a zero mean, and R(k) the covariance matrix for the measurement noise at step k, z the actual measurement of the state at stepk, then $\mathbf{x}_{k+1} = A_{k}\mathbf{x}_{k} + B_{k}\mathbf{u}_{k} + \mathbf{w}_{k}\,\,\,(1)\,\,with\,\,R_{k} = E[\mathbf{w}_{k}.\mathbf{w}_{k}^T]\\\mathbf{z}_{k} = H_{k}.\mathbf{x}_{k} + \mathbf{v}_{k}\,\,\,(2)\,\,\,\,with\,\,\,Q_{k} = E[\mathbf{v}_{k}.\mathbf{v}_{k}^T]$ where H(k) is the measurement equation related to the state A(k) and Q(k) is covariance matrix of the process noise at step k. We assume that the covariance matrix for the measurement noise, R and the covariance matrix for the error noise Q follow a Gaussian probability distribution.

Implementation
We leverage the support for functional constructs provided in Scala. Validation of method arguments, exceptions and non essential supporting methods are omitted for the sake of readability of the code snippets.
First we need to define the noise w generated by the linear  process and the noise generated by the measurement device. Those functions are defines as members of the KalmanNoise class and should generate Gaussian random distributions for the noise on the process and measurement. The validation that the noise distribution follows a normal distribution is omitted. Lazy values are convenient in this case where one of the noise distribution is null or unknown.

final class KalmanNoise(private val size : Int,
val processNoiseGen: () => Double= Random.nextGaussian,
val measureNoiseGen: () => Double= Random.nextGaussian) {

lazy val processNoise = generate( processNoiseGen )
lazy val measurementNoise = generate( measureNoiseGen )

private[this] def generate( f: () => Double): ArrayRealVector = {
val noise = new ArrayRealVector(size)
(0 until size) foreach( n => noise.setEntry(n, f()) )
noise
}
}


The KalmanModel class has two objectives:
- Encapsulate the Matrices and Vectors used in the generation of the state and error equations
- Generate a process model and measurement model on demand (factory)
The processModel instance depends on the initial state of the process x0 which is specified independently of the Kalman model parameters. The model can be extended through inheritance by overriding the processModel and measurementModel methods by custom algorithms.

case class KalmanModel(val A: Matrix[Double],
val B: Matrix[Double],
val H: Matrix[Double],
val Q: Matrix[Double],
val R: Matrix[Double],
val P0: Matrix[Double]) {
def processModel(x0: Array[Double]): ProcessModel
= new DefaultProcessModel(A,B,Q,x0,P0)
def measurementModel: MeasurementModel = new DefaultMeasurementModel(H,R)
}


The method below implements the basic sequences of the execution of an iteration of the update of the state of the process
1. predict the state of the process at the next step (x, A)
2. extract or generate noise for the process and the measurement devices (w, v)
3. update the state of the process (x)
4. computes the error on the measurement (z)
Note that the control input u and initial state x0 are defined as arguments of the main method because they are independent from the model.

def compute(u: Array[Double], x0: Array[Double], maxNumIters: Int = 50) : Unit =  {
lazy val processModel = model.processModel(x0)
lazy val measurementModel = model.measurementModel

val filter = new KalmanFilter(processModel, measurementModel)
val uVector = new ArrayRealVector(u)
var xVector = processModel.getInitialStateEstimate

// Iterate the sequence: prediction, noise & state estimation and correction
(0 until maxNumIters).foreach( i => {
filter.predict(uVector)
xVector = newState(uVector, xVector, processModel)
filter.correct(newMeasurement(xVector))
})
}


This method implements the iterative state equation of the model:  x <- A.x + B.u + w described in the introduction.
private def newState(uVector: RealVector,
xVector: RealVector,
processModel: ProcessModel): RealVector = {
A = processModel.getStateTransitionMatrix
B = processModel.getControlMatrix
}


Lastly, the method defined below, Implements the computation of the error on observed state using the equation z = H.x + v where x is the current state of the process and v is the noise in measurement.
private def newMeasurement(xVector: RealVector): RealVector ={
H = model.measurementModel.getMeasurementMatrix
val v = noise.measurementNoise
}

Example We will use a simple example of the Newton law of gravity.  If x is the weight of an object, the differential equation can be integrated with a step 1 as follows $\frac{\mathrm{d}^2 y_{t}}{\mathrm{d} t^{2}} + g = 0\,\,\Rightarrow\\ y_{t+dt} = y_{t}+ \dot{y}_{t}\,dt - \frac{g}{2}\,dt^{2}\,\,\,;\,\,\dot{y}_{t+1} = \dot{y}_{t} - g\,dt\,\,\,(\mathbf{3})$ The state vector x(k) for object at time k is defined by $\mathbf{x}_{k} = [y_{k},\frac{\mathrm{d} y_{k}}{\mathrm{d} x}]^{T}$ and the state equation   $\mathbf{x}_{k+1} = A_{k}.\mathbf{x}_{k} + B_{k}.\mathbf{u}_{k}\,\,\,with\,\,\,A_{k} =\begin{vmatrix} 1 & 1\\ 0 & 1 \end{vmatrix}\,\,,\,B_{k}=\begin{vmatrix}0.5 \\ 1 \end{vmatrix}$ We use the Apache Commons Math library version 3.0 (Apache Commons Math User Guide to filter and predict the motion of a body subjected to gravity.
val dt = 0.05
val initialHeight = 100.0
val initialSpeed = 0.0
val processNoise = 0.0
val measureNoise = 0.2
val gravity = 9.81

val A = Matrix[Double](1.0, dt, 0.0, 1.0)
val B = Matrix[Double](0.5*dt*dt, dt)
val H = Matrix[Double](1.0, 0.0)
val Q = Matrix[Double](1e-4, 1e-3, 1e-3, 1e-4)
val R = Matrix[Double](measureNoise, measureNoise)

// Initialize the drop at 100 feet with no speed
val x0 = Array[Double](initialHeight, initialSpeed)

// Create the process and noise models
val model = new KalmanModel(A, B, H, null, R, Defaults.IdentityMatrix)
val noise = new KalmanNoise(2, null, () => Random.nextGaussian*measureNoise)

// Compute the new state for height and velocity
val predictor = new KalmanPredictor(model, noise)
predictor.compute(Array[Double](gravity), x0)


References
Introduction to Kalman Filter University of North Carolina G. Welsh, G. Bishop 2006 github.com/prnicolas

Wednesday, October 30, 2013

Type Erasure, Manifest, Specialization in Scala

Overview
Scala and Java programming languages use type erasure to compile away generics. The type parameters [U <: T]  are removed and replaced by their upper bound T or Any. The process involves boxing and un-boxing the primitives types if they are used in the code as type parameters, degrading performance. This post describes two approaches to work around type erasure: Manifest and type specialization.

Description
Let consider a class ListCompare that compare lists of parametric type U bounded by the type Item. Note that ancillary code such as returned Option type or check on method and class arguments are omitted for the sake of simplicity.

case class Item

class ListCompare[U <: Item](val xs: List[U])(implicit f: U =&gt Ordered[U]) {
def compare(xso: List[U]): Boolean = {
xso match {
case str: List[String] =>
if(xs.size==xso.size) xs.zip(xso).exists( x=> x._1.compareTo(x._2) != 0) else false

case n: List[Int] =>
if(xs.size==xso.size) xs.zip(xso).exists(x => x._1!= x._2) else false

case _ => false
}
}
}

>
The class has to have an implicit conversion U => Ordered[T] to support the comparison of strings. The code above will generate the following warning message: "non-variable type argument String in type pattern List[String] is unchecked since it is eliminated by erasure". The warning message is generated by the compiler because the two parameterized type, List[String] and List[Int] may not be available to JVM during the execution of the pattern matching.
On solution is to use a Manifest. A Manifest[T] is an opaque descriptor for type T. It allows access to the erasure of the type as a Class instance. The most common usage of manifest is related to the creation of native Arrays if the class is not known at compile time as illustrated in the code snippet below.

def myArray[T] = new Array[T](0
def myArray[T](implicit m: Manifest[T]) = new Array[T](0)
def myArray[T: Manifest] = new Array[T](0)


The first line of code won't compile. The second function will maintain the erasure by passing the manifest as an implicit parameter. The last function defined the manifest as a context bound. The solution to our problem is to use the Manifest as a implicit parameter to the compare function.

class ListCompare[U <: Item](val xs: List[U])(implicit f: U => Ordered[U]) {
def compare(xso: List[U])(implicit u: Manifest[List[U]]): Boolean = {

if( u <:< manifest[List[String]] )
if( xs.size == xso.size)  xs.zip(xso).exists( x=> x._1.compareTo(x._2) != 0) else false

else if(u <:< manifest[List[Int]])
if( xs.size == xso.size) xs.zip(xso).exists(x => x._1!=x._2) else false

else false
}
}


A second option is to generate a separate class for the primitive type, Int and String using the @specialized annotation. The annotation forces the compiler to generate byte code for each primitive listed as specialized type. For instance the instruction @specialized(Int, Double) T generates two extra, efficient classes for the primitive types Int and Double. The original ListCompare class is re-written using the annotation as follows:

class ListComp[@specialized Int, String, U <: Item]
(val xs:List[U])(implicit f: U =>Ordered[U]) {

def compare(xso: List[U]): Boolean =
if(xs.size == xso.size) xs.zip(xso).exists( x=> x._1.compareTo(x._2)!=0) else false
}

>
The code above will not throw a warning or error. However there is not such a thing as a free lunch, as the compiler generates extra byte code for the methods associated to the specialized primitive types. The objective in this case is to trade a higher memory consumption for performance improvement.

References
Programming in Scala M. Odesky, L.Spoon, B. Venners - Artima 2008
Scala for the Impatient - Cay Horstman Addison-Wesley 2012

Thursday, October 17, 2013

Curried and Partial Functions in Scala

Introduction
Although most of Scala developers have some level of knowledge of curried and partial functions, they struggle to grasp the different use case either of those functional programming techniques are applied and their relative benefits. For those interested in more detailed explanation of currying existing functions, I would recommend the excellent post of Daniel Westheide.

Partial Functions
Partially defined functions are commonly used to restrict the domain of applicability of function arguments. The restriction can apply to either the type of the argument or its values. Let's consider the computation of square root of a floating point value. The value of the argument has to be positive. A simple implementation relies on the Option monad.
def _sqrt(x: Double): Option[Double]= if(x<0.0) None else Some(Math.sqrt(x))

The same method can be implemented using a partial function by applying the matching pattern to the argument as follows.
def _sqrt: PartialFunction[Double, Double]= {
case x: Double  if(x >= 0.0) => Math.sqrt(x)
}

A similar restriction can be applied to the type of argument. Let's consider the incremental method as follow.
class Value(val x: Int) {
def += (any: Any): Any = {
if(any.isInstanceOf[Int]) {
val value = any.asInstanceOf[Int]
(x + value).asInstanceOf[Any]
}
else (any.isInstanceOf[Double]) {
val value = any.asInstanceOf[Double].floor.toInt
(x + value).asInstanceOf[Any]
}
....
}

The implementation is cumbersome to say the list. It makes sense to replace it with a pattern matching on the type and defined a partial function to handle any type of arguments
class Value(val x: Int) {
def += : PartialFunction[Any, Any] = {
case n: Int => x + n
case y: Double => x + y.floor.toInt
}
}


In the example above, we do not have to handle the case for each the argument has an improper type. The partial function will simply discards it.
The method Actor.receive that define a message loop in an actor, consuming messages from the mail box are indeed partial functions.

Currying
Currying is the transformation of function with multiple arguments into a chain of function taking a single argument. if f: x-> f(x,y) then curry(f): x -> (y->f(x,y))
Let's take a simple example of a sum of two floating point values. The original 2 arguments functions (1) can be converted into a single argument function returning a anonymous function taking the second argument as parameter (2). Scala provides developers with a simple syntax sugar to define the cascade of functions calls (3)
def sum(x: Double, y: Double): Double = x+y
def sum(x: Double): Double = (y: Double) => x+y
def sum(x: Double)(y: Double): Double = x+y

Most of high order methods on collections are curried. The following example illustrate the commonly used foldLeft.
class Collection[T](private val values: Array[T]) {
def foldLeft[U](u:U, op:(U,T)=>U):U = values.foldLeft[U](u)((u,t)=> op(u,t))
def foldLeft[U](u:U)(op:(U,T)=>U):U = this.foldLeft(u, op)
}

val myCollection = new Collection[Int](Array[Int](3, 5, 8))
val product = myCollection.foldLeft[Int](0)((prod, x) => prod*x)

Is there any benefits of using curried function instead of functions or methods with multiple arguments? Yes, in the case the type inferencer has more information that the second argument can use. Let's consider the foldLeft method above:
def foldLeft[U](u: U)(op:(U, T)=>U):U = this.foldLeft(u, op)
The type inferencer determine the type U of the first argument and used it subsequently in the binary operator parameters op:(U, T)=>U

References
Scala for the Impatient - Cay Horstman Addison-Wesley 2012
The Neophyte's Guide to Scala Part 11: Currying and Partially Applied Function - Daniel Westheide

Thursday, October 3, 2013

Bloom Filter in Scala

Overview
Bloom filter became a popular probabilistic data structure to enable membership queries (object x belonging to set or category Y) a couple of years ago. The main benefit of Bloom filter is to reduce the requirement of large memory allocation by avoiding allocating objects in memory much like HashSet or Hash Table. The compact representation comes with a trade-off: although the filter does not allow false negatives it does not guarantee that there is no false positives. In other words, a query returns:
- very high probability that an object belong to a set
- an object does not belong to a set
A Bloom filter is quite often used as a front end to a regular, deterministic algorithm

Theory
Let's consider a set A = {a0,.. an-1} of n elements for which a query to determine membership is executed. The data structure consists of a bit vector V of m bits and k completely independent hash functions that are associated to a position in the bit vector. The assignment (or mapping) of hash functions to bits has to follow a uniform distribution. The diagram below illustrates the basic mechanism behind the Bloom filter. The set A is defined by the pair a1 and a2. The hash functions h1 and h2 map the elements to bit position (bit set to 1) in the bit vector. The element b has one of the position set to 0 and therefore does not belong to the set. The element c belongs to the set because its associated positions have bits set to 1

However, the algorithm does not prevent false positive. For instance, a bit may have been set to 1 during the insertion of previous elements and the query reports erroneously that the element belongs to the set.
The insertion of an elements depends on the h hash functions, therefore the time needed to add a new element is h (number of hash functions) and independent from size of the bit vector: asymptotic insertion time = O(h). However, the filter requires h bits for each element and is less effective that traditional bit array for small sets.
The probability of false positives decreases as the number n of inserted elements decreases and the size of the bitvector m, increases. The number of hash functions that minimizes the probability of false positives is defined by h = m.ln2/n.

Implementation in Scala
The implementation relies on the MessageDigest java library class to generated the unique hash values. Ancillary methods and condition on methods arguments are ommitted for sake of clarity.
class BloomFilter(private val length: Int,
private val numHashs: Int,
private val algorithm: String) {
private[this] val set = new Array[Byte](length)
private[this] var numElements = 0
private[this] val digest = {
try { MessageDigest.getInstance(algorithm) }
catch { case e: NoSuchAlgorithmException => null  }
}

// add an array of elements to the filter
def add(anyArray: Array[Any]): Boolean = {
if( digest != null) {
anyArray.foreach( getSet(_).foreach( set(_) = 1) )
numElements += anyArray.size
}
digest != null
}

@inline

@inline
def contains(any: Any): Boolean =
if(digest != null) !getSet(any).exists(set(_)!= 1) else false

private[this] def hash(value: Int) : Int = {
digest.reset
digest.update(value) // invoke implicit conversion from Int to bytes
Math.abs(new BigInteger(1, digest.digest).intValue) % (set.size -1)
}

private[this] def getSet(any: Any): Array[Int] = {
val newSet = new Array[Int](numHashs)
newSet.update(0, hash(any.hashCode))
getSet(newSet, 1)
newSet
}

@scala.annotation.tailrec
private[this] def getSet(values: Array[Int], index: Int): Boolean ={
if( index >= values.size)
true
else {
values.update(index, hash(values(index-1)))
getSet(values, index+1)
}
}
}

Then MessageDigest class generates a hash value using either MD5 or SHA-1 algorithm. The only two methods that are allowed are add an array of objects and query if a object is contained in the set. Tail recursion is used to generate the set with numHash hash functions. The last code snippet implements a very simple Int to Array[Byte] conversion and a test code.

implicit def int2Bytes(value: Int) : Array[Byte] = {
val bytes = new Array[Byte](4)
bytes.map( x => {
val offset=(bytes.size-1-bytes.indexOf(x))<<3
((value>>>offset) & 0xFF).asInstanceOf[Byte]
})
bytes
}

val filter = new BloomFilter(100, 100, "SHA")
final val newValues = Array[Any](57, 97, 91, 23, 67,33)