Thursday, February 5, 2015

Akka mailbox back pressure


Akka is a very reliable and effective library to deploy tasks over multiple cores and over multiple hosts. Fault-tolerance is implemented through a hierarchy of supervising actors, not that different from the Zookeeper framework.
But what about controlling the message flows between actors or clusters of actors? How can we avoid messages backing up in mailboxes for slow, unavailable actors or lengthy computational tasks?
Typesafe has introduced Akka reactive streams and Flow materialization to control TCP back-pressure and manage data flows between actors and cluster of actors. TCP back pressure is a subject for a future post. In the meantime let's design the poor's man back-pressure handler using bounded mail boxes.

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.

Simple workflow with back pressure control
Let's look at a simple computational workflow with the following components:
  • Workers: These actors process and transform data sets. They start a computation task upon receiving a Compute message that contain the next data set to process. Each worker actor returns the results of the computation (transformed data set) back to the controller using the Completed message.
  • Controller is responsible for loading batch of data through a Load message and forward it to the workers in a Compute message. The Controller request the status of the bounded mail box for each worker by sending a GetStatus to the watcher.
  • Watcher monitor the state of the workers' mail box and return the current load (as percentage of the mail box is currently used) to the controller with a Status message.
The following diagram describe the minimum set of messages required to execute the workflow.

The worker actor is responsible for processing a slice of data using a function forwarded by the controller.
final class Worker(id: Int) extends Actor {
  override def receive = {
    case msg: Compute => {
      val output = msg.fct(msg.xt)
      sender ! Completed(id, output,
    case Cleanup => sender ! Done
The worker receives the input to the computation (slice of time series msg.xt and the data transformation msg.fct through the Compute message sent by the controller. Note that the function fct cannot be defined as a closure as the context of the function is unknown to the worker.
The actor return the result output of the computation through the Completed message. Finally all the workers notify the controller that their tasks is completed by responding to the Cleanup message.

type DblSeries = List[Array[Double]]

sealed abstract class Message(val id: Int)
case class Completed(
    i: Int = -1, 
    xt: DblSeries, 
    nIter: Int) extends Message(i)
case class Compute(
    i: Int, 
    xt: DblSeries, 
    fct: DblSeries => DblSeries) extends Message(i)
case class Cleanup() extends Message(-1)

The worker actor is responsible for processing a slice of data using a function forwarded by the controller. It takes three parameters.
  • numWorkers: Number of worker actors to create
  • watermark: Define the utilization of the worker mail box which trigger a throttle action. If the utilization of the mailbox is below the watermark, the controller throttles up ( increases the pressure) on the actor; If the utilization of the mail box exceeds 1.0 - watermark the controller decreases the pressure on the actor, otherwise the controller does not interfere with the data flow.
  • mailboxCapacity:Capacity of the mailboxes for the worker actors (maximum number of messages). It is assumed that all mailboxes have the same capacity
class Controller(
     numWorkers: Int, 
     watermark: Double, 
     mailboxCapacity: Int) extends Actor {
   var id: Int = 0
   var batchSize = mailboxCapacity>>1
   val workers = List.tabulate(numWorkers)(n => 
    context.actorOf(Props(new Worker(n)), name = s"worker$n"))
   val pushTimeout = new FiniteDuration(10, MILLISECONDS)
   val msgQueues = => (new BoundedMailbox(mailboxCapacity, pushTimeout))
                  .create(Some(w), Some(context.system)))

   val watcher = context.actorOf(Props(new Watcher(msgQueues)))
The set of workers are created using the tabulate higher order method. A message queue (mailbox) has to be created for each actor. The mailboxes are bounded in order to avoid buffer overflow. Finally a watch dog actor of type Watcher is created through the Akka context to monitor the mailboxes for the worker. The watcher actor is described in the next sub paragraph.
Let's look at the Controller message loop.
override def receive = {
  case input: Load => load(input.strm)
  case msg: Completed => if( == -1) kill else process(msg)
  case status: Status => throttle(status.load)
The controller performs 3 distinct functions:
  • load: Load a variable number of data points and partition them for each worker
  • process: Aggregate the results of computation for all the workers
  • throttle: Increase or decrease the number of data points loaded at each iteration.
Let's look at these methods.
def load(strm: InputStream): Unit = 
  while( strm.hasNext ) {
     val nextBatch =
            .foreach(w => w._2 ! Compute(id, w._1, strm.fct) )
     id += 1

def process(msg: Completed): Unit = {
   ..  // aggregation
  watcher ! GetStatus
def throttle(load: Double): Unit = {
  if(load < watermark) 
    batchSize += (batchSize*(load-watermark)).floor.toInt
  else if(load > 1.0 - watermark) 
    batchSize -= (batchSize*(load-1.0 + watermark)).floor.toInt
  if( batchSize > (mailboxCapacity>>1) )
    batchSize = (mailboxCapacity>>1)
load extracts the next batch of data, partitions it then send each partition to a worker actor along with the data transformation fct
process aggregates the result (Completed) from the transformation on each worker. Then the controller requests a status on the mail box to the watcher
throttle recompute the size of the next batch, batchSize using the load information provided by the watcher relative to the watermark.

The watcher has a simple task: compute the average load of the mailbox of all workers. The computation is done upon reception of the GetStatus message from the controller.
class Watcher(queue: Iterable[MessageQueue]) extends Actor {
  def load = _.numberOfMessages)
  override def receive = { 
    case GetStatus => sender ! Status(load) }
Memory consumption profile
The bottom graph displays the action of the controller (throttling). The Y-axis display the intensity of the throttle from -6 for significant decrease in load (size of batches) to +6 for significant increase in load/pressure on the workers. A throttle index of 0 means that no action is taken by the controller.
The top graph displays the average size of the worker's mailbox, in response of action taken by the controller.

Important notes
This implementation of a feedback controller loop is rather crude and mainly described as an introduction to the concept of back pressure control. Production quality implementation relies on:
TCP-back pressure using reactive streams
A more sophisticated throttle algorithm to avoid significant adjustment or resonance
Handling dead letters in case the throttle algorithm fails and the mailbox overflows


Thursday, January 8, 2015

Implicit Classes to Extend Libraries

Implicit methods are quite useful in defining global type conversion (as long as the semantic is clearly understood). But what about implicit classes?
Implicit classes can be used to extend existing Scala standard library classes. Most of Scala classes are declared final or implement a sealed trait. Composition is a viable alternative to Inheritance in term of re-usability: the class to extend is wrapped into a helper or utility class. However, a helper/container class adds an indirection and "pollute" the name space.

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.
Let's look at an example of extension of standard library.

The use case consists of extending the Try class, scala.util.Try with a Either semantic: Execute a function if the code throws an exception, and another function if the computation succeeds. Such simple design allows computation flows to be routed depending on unexpected state.

The main construct is the declaration of a implicit class that wraps Try. A recovery function rec is called if an exception is thrown, a computation f is performed otherwise.

import scala.util._

implicit class Try2Either[T](_try: Try[T]) {
  def toEither[U](rec: ()=>U)(f: T=>T): Either[U,T] = _try match {
    case Success(res) => Right(f(res))
    case Failure(e) => println(e.toString); Left(rec())  
You may notice that the method toEither is curried to segregate the two parameterized type T and U. It also comply with the semantic of Either with Left element for error (and recovery) and Right element dedicated to the successful outcome.

Let's take the example of the normalization of a large array of floating point values by their sum. A small value will generate a rounding error during the normalization and an exception is thrown. However we do not want to burden the client code with handling the exception (the client method may not know the context of the exception after all). In this example the recovery function rec instantiates the class Recover which is responsible for a retry, potentially from a different state. Implicit classes have an important limitation in terms of re-usability. You cannot override a default method without having to sub-class the original Scala standard library class, which is not an option in our example because Try is a sealed abstract class.
As with implicit methods, the name of the class is never actually used in the code but need to reflect the intention of the developer. Let's apply this implicit class
class Recover {
  def reTry(x: Array[Double]): Array[Double] = Array[Double](0.0)
def normalize(x: Array[Double]): Either[Recover,Array[Double]] = Try {
  val sum = x.sum 
  if(sum < 0.01) 
    throw new IllegalStateException(s"sum $sum") _ / sum) 
}.toEither(() => new Recover)((v: Array[Double]) => Math.log(_))

The implementation is very elegant. There is no need for new semantic and naming convention and the return type Either is very well understood in the Scala development community.


Sunday, December 14, 2014

Example of Scala Style Guide

There are many posts and articles dedicated to the style in Scala. The following style guide is heavily borrowed from Effective Scala - M. Eriksen-Twitter
Source code style and format
* Tab indentation: with 2 blank space/characters
* Margin: 100 characters
* Line wrap: 2 indentations

The source comments complies with the Scaladoc tag annotation guideline

Organization of imports
Imports are defined in top of the source file and grouped in the following order [Scala standard library, 3rd party libraries, Scala for Machine Learning imports]
import scala. // Scala standard library
  import org....     // Third party libraries
  import my_package..  // Import related to your library
Some collection such as Set or Map are defined as mutable and immutable classes in Scala standard library. These classes are differentiated in the code by their package.
import scala.collection._
  val myMap = new mutable.HashMap[T, U]
  def process(values: immutable.Set[Double]) ...

Long pipelines of data transformation, operations and higher order method are written with one method per line
val lsp = builder.model(lrJacobian)
Most of the class are declared as protected with package as scope. The constructors are defined in the class companion object using the apply method
protected class HMM[@specialized T <% Array[Int]](
    lambda: HMMLambda, 
    form: HMMForm, 
    maxIters: Int)
   (implicit f: DblVector => T) { }

object HMM {
  def apply[T <% Array[Int]](
      lambda: HMMLambda, 
      form: HMMForm, 
      maxIters: Int)
     (implicit f: DblVector => T): HMM[T] = 
         new HMM[T](lambda, form, maxIters)
Lengthy parameters declaration
The length of the declaration of some constructors and methods exceeds 100 characters. In this case, class or method is written with one argument per line.
def apply[T <% Array[Int]](
    lambda: HMMLambda, 
    form: HMMForm, 
    maxIters: Int)
   (implicit f: DblVector => T): HMM[T]
Null references vs. options
Null objects are avoided as much as possible: Option should be used instead of null references
def normalized(stats: Option[Stats]): Option[List[Double]] = _.compute).getOrElse(None)
Null references and Empty collections
Null collections should be avoided. Empty container such as List.empty, Array.empty... should be used instead.
def test: List[T] = {
if( !test.isEmpty)
Class parameter validation
The parameters of a class are validated in the companion object through a private method check
protected class LogisticRegression[T <% Double](
    xt: XTSeries[Array[T]], 
    labels: Array[Int], 
    optimizer: LogisticRegressionOptimizer) {

  import LogisticRegression._
  check(xt, labels)

object LogisticRegression {
  private def check[T <% Double](xt: Array[Array[T]], labels: Array[Int]): Unit = {
    require( !xt.isEmpty,"Cannot compute the logistic regression of undefined time series")
    require(xt.size == labels.size, 
      s"Size of input data ${xt.size} is different from size of labels ${labels.size}")
Scala 2.1+ exception handling scala.util.{Try, Success, Failure} is preferred instead of Java typed exception handling. The failure type can be extracted by matching the type of exception, if necessary and critical to the understanding of the code.
Try(process(args)) match {
  case Success(results) => …
  case Failure(e) => e match {
     case e: MathRuntimeError =>
View bounds for built-in type
Parameterized type with view bounded to a primitive are usually preferred to parameterized type with context bound (upper or lower bound)
class MultiLinearRegression[T <% Double](
     xt: List[Array[T]], 
     y: Array[Double])
Enumeration and case classes
As a general rule, enumeration is used only in the case the type has a single parameter convertible to an Int. Structures that require specific attributes are implemented as case classes
object YahooFinancials extends Enumeration {
  type YahooFinancials = Value

sealed abstract class Message(val id: Int)
case class Start(i: Int =0) extends Message(i)
case class Completed(i: Int, xt: Array[Double]) extends Message(i)
case class Activate(i: Int, xt: Array[Double]) extends Message(i)

Effective Scala - M. Eriksen, Twitter 2012
Scala by Example - M. Odersky
Scala for Machine Learning - P. Nicolas - Packt Publishing 2014

Saturday, November 29, 2014

Apache Spark/MLlib for K-means

Apache Spark attempts to address the limitation of Hadoop in terms of performance and real-time processing by implementing in-memory iterative computing, which is critical to most discriminative machine learning algorithms. Numerous benchmark tests have been performed and published to evaluate the performance improvement of Spark relative to Hadoop. In case of iterative algorithms, the time per iteration can be reduced by a ratio of 1:10 or more.
The core element of Spark is Resilient Distributed Datasets (RDD), which is a collection of elements partitioned across the nodes of a cluster and/or CPU cores of servers. An RDD can be created from local data structures such as list, array or hash tables, from the local file system or the Hadoop distributed file system (HDFS).

Apache Spark RDDs
The operations on an RDD in Spark are very similar to the Scala higher order methods. These operations are performed concurrently over each partition. Operations on RDD can be classified as:
* Transformation: convert, manipulate and filter the elements of an RDD on each partition
* Action: aggregate, collect or reduce the elements of the RDD from all partitions
An RDD can persist, be serialized and cached for future computation. Spark provides a large array of pre-built transforms and actions which go well beyond the basic map-reduce paradigm. Those methods on RDDs are a natural extension of the Scala collections making code migration seamless for Scala developers.
Apache Spark supports fault-tolerant operations by allowing RDDs to persist both in memory and in the file systems. Persistency enables automatic recovery from node failures. The resiliency of Spark relies on the supervisory strategy of the underlying Akka actors, the persistency of their mailboxes and replication schemes of HDFS.
Spark is initialized through its context. For instance, a local Spark deployment on 8 cores, with 8 Gbytes allocated for data processing (RDDs) in memory only and 512 Mbytes for the master process is defined as
import org.apache.spark.{SparkConf, SparkContext}

val sparkConf = new SparkConf()
            .set("spark.executor.memory", "2048m")
            .set("spark.storageLevel", "MEMORY_ONLY")
            .set("spark.driver.memory", "512M")
            .set("spark.default.parallelism", "16")

implicit val sc = new SparkContext(sparkConf)

Apache Spark MLlib
MLlib is a scalable machine learning library built on top of Spark. As of version 1.0, the library is a work in progress. The main components of the library are:
* Classification algorithms, including logistic regression, Naïve Bayes and support vector machines
* Clustering limited to K-means in version 1.0
* L1 & L1 Regularization
* Optimization techniques such as gradient descent, logistic gradient and stochastic gradient descent and L-BFGS.
* Linear algebra such as Singular Value Decomposition
* Data generator for K-means, logistic regression and support vector machines.
The machine learning byte code is conveniently included in the spark assembly jar file built with the simple build tool.
Let's consider the K-means clustering components bundled with Apache Spark MLlib. The K-means configuration parameters are:
* K Number of clusters
* maxNumIters Maximum number of iterations for the minimizing the reconstruction error
* numRuns Number of runs or episode used for training the clusters
* caching Specify whether the resulting RDD has to be cached in memory
* xt The array of data points (type Array[Double])
* sc Implicit Spark context
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}

class SparkKMeans(
    K: Int, 
    maxNumIters: Int, 
    numRuns: Int,
    caching: Boolean,
    xt: Array[Array[Double]]) (implicit sc: SparkContext) {

  def train: Try[KMeansModel] = {
    val kmeans = new KMeans
    val rdd = sc.parallelize( DenseVector(_)))
    if( caching )
The clustering model is created by the train method. Once the Spark/MLlib K-means is instantiated and intializes, the original data set xt is converted into a DenseVector then converted into a RDD. Finally the input RDD is fed to the Kmeans (


Thursday, November 13, 2014

Scala Immutability & Covariance

There is a relation between immutable and covariance which may not be apparent. Let's consider the case of a mutable and immutable implementation of a stack. The mutable stack is a container of elements with method to push element into (pop the last element from) the stack.
class MutableStack[T]  {
  import scala.collection.mutable.ListBuffer
  private[this] val _stack = new ListBuffer[T]
  def pop: Option[T] = if(_stack.isEmpty) None else Some(_stack.last)
  def push(t: T): Unit = _stack.append(t)
The internal container is defined as a ListBuffer instance. The elements are appended to the list buffer (push) and the method pop pops the last elements pushed onto the stack.
This implementation has a major inconvenient: It cannot accept elements of type other than T because ListBuffer is a invariant collection. Let's consider then a immutable stack

Immutability, covariance and tail recursion
An covariant immutable stack cannot access its elements unless its elements are contained by itself. This feat is accomplish by breaking down the stack recursively as the last element pushed into the stack and the previous state of the stack.
class ImmutableStack[+T](
    val t: T, 
    val stack: Option[ImmutableStack[T]]) {
  def this(t: T) = this(t, Some(new EmptyImmutableStack(t)))
In this recursive approach the immutable stack is initialized with a single element of type T and the option of the existing immutable stack. The stack can be defined as reusable with covariance because elements are managed by the stack itself stack.
The next step is to define the initial state of the stack. We could have chosen a singleton empty stack with no elements. Instead, we define the first state of the immutable stack as:
class EmptyImmutableStack[+T](t: T) 
     extends ImmutableStack[T](t, None)
Next let's define the pop and push operators for ImmutableStack. The pop method return the previous state of the immutable stack that is next to last element pushed into the stack. The push method is contra-variant as its push an element of super type of T. The existing state this stack is added as the previous (2nd argument) state.
def pop: ImmutableStack[T] = 
    new ImmutableStack[T](stack.get.t, stack.get.stack)
def push[U >: T](u: U): ImmutableStack[U] = new ImmutableStack[U](u, Some(this))
The next step is to traverse the entire stack and return a list of all its element. This is accomplished through a tail recursion on the state of the stack
def popAll[U >: T]: List[U] = pop(this,List[U]())

private def pop[U >: T](_stack: ImmutableStack[U], xs: List[U]): List[U] = _stack match { 
  case st: EmptyImmutableStack[T] => xs.reverse
  case st: ImmutableStack[T] => pop(_stack.stack.get, _stack.t :: xs)
The recursion update the list xs and ends when the last state (2nd argument of ImmutableStack) is empty: of type EmptyImmutableStack. The list has to be reversed to index the list elements from the last to the first. As long as the stack is not empty (or type ImmutableStack) the method recurses.
It is time to test drive this immutable stack.
val intStack = new ImmutableStack[Int](4)
val newStack = intStack.push(56).push(14).push(77)

println(newStack.popAll.mkString(", "))
The results display 77, 14, 56, 4.
This examples illustrates the concept of immutable, covariant stack by using the instance of the stack has its state (current list of elements it contains).

Scala By Example - M. Odersky - June 2014

Thursday, October 30, 2014

Scala collect, partition higher order functions

The Scala higher order methods collect and partition are not commonly used, even though these collection methods provide developers with a higher degree of flexibility than map and filter.

The method create a new collection by applying a partial function to all elements of this traversable collection, such as arrays, list or map on which the function is defined. It signature is
def collect[B](pf: PartialFunction[A, B]): Traversable[B]
The use case is to validate K set (or samples) of data from a dataset. Once validated, these K sets are used in K-fold validation of a model generated through training of an machine learning algorithm: K-1 sets are used for training and the last set is used for validation. The validation consists of extracting K samples arrays from a generic array then test that each of these samples are not too noisy (standard deviation does not exceed a high threshold.
. The first step is to create the two generic functions of the validation: breaking the dataset into K sets, then compute the standard deviation of each set. This feat is accomplished by the ValidateSample trait
trait ValidateSample {
  type DVec = Array[Double]
  protected def split(x: DVec, nSegments: Int): IndexedSeq[DVec] = {
    require(nSegments > 1 && nSegments < 100, 
      s"Number of segments $nSegments if out of range")
    require(x.size >= nSegments, 
      s"Number of segments $nSegments should be < size datasets ${x.size}")
    val nStep = (x.size/nSegments).floor.toInt
    Range(0, nSegments).map(n => x.slice(n, n+nStep))
  protected lazy val stdDev = (x: DVec) => {
    val sums = x.foldLeft((0.0, 0.0))( (s, x) => (s._1 + x, s._2 + x*x))
    Math.sqrt((sums._2 - sums._1*sums._1/x.size)/(x.size-1))
  def filter(x: DVec, nSegments: Int): Boolean

The first method, split breaks down the initial array x into an indexed sequence of segments or sub-arrays. The standard deviation stdDev is computed by folding the sum of values and sum of squared values. The value is defined as lazy so it is computed on demand once for all. The first validation class ValidateSampleMap uses a sequence of map and find to test that all the data segments extracted from the dataset have a standard deviation less than 0.8
class ValidateSampleMap extends ValidateSample {
  override def filter(x: DVec, nSegments: Int): Boolean = 
    split(x, nSegments).map( stdDev(_) )
                      .find( _ > 0.8) == None

The second implementation of the validation ValidateSampleCollect uses the higher order function collectFirst to test that all the data segments (validation folds) are not very noisy. collectFirst requires a PartialFunction to be defined with a condition of the standard deviation.
class ValidateSampleCollect extends ValidateSample {
  override def filter(x: DVec, nSegments: Int): Boolean = 
    split(x, nSegments).collectFirst { 
        case arr: DVec => (stdDev(arr) > 0.8) } == None
There are two main differences between the map/find and collectFirst implementation
  • The second version requires a single higher order function, collectFirst, while the first version uses map and find.
  • The second version throws a MatchErr exception as soon as a data segment does not comply
These two implementations can be evaluated using a simple driver application that takes a ValidateSample as argument.
import scala.util.{Try, Success, Failure, Random}
  final val NUM_VALUES = 500
  def apply(filt: ValidateSample): Unit = {
    val rValues = Array.fill(NUM_VALUES)(0.5*Random.nextDouble) 
                  ++ Array.fill(NUM_VALUES)(2.0*Random.nextDouble)
    Try ( filt.filter(rValues, 2) ) match {
      case Success(seq) => println("Sets are valid")
      case Failure(e) => e match {
 case ex: MatchError => println(ex.toString)
 case _ => println("Other exception")

The Higher order method partition is used to partition or segment a mutable indexed sequence of object into a two indexed sequences given a boolean condition (or predicate).
def partition(p: (A) ⇒ Boolean): (Repr, Repr)
The test case consists of segmenting an array of random values, along the mean value 0.5 then compare the size of the two data segments. The data segments, segs should have similar size.
  final val NUM_VALUES = 10000
  val rValues = Array.fill(NUM_VALUES)(Random.nextDouble)
  val segs = rValues.partition( _ >= 0.5)
  val ratio = segs._1.size.toDouble/segs._2.size
  println(s"Relative size of segments $ratio")
The test is executed with different size of arrays.:
   50      0.9371
 1000      1.0041
10000      1.0002
As expected the difference between the two data segments size converges toward zero as the size of the original data set increases (law of large numbers).

Scala By Example - M. Odersky - June 2014