Thursday, January 8, 2015

Implicit Classes to Extend Libraries

Overview
Implicit methods are quite useful in defining global type conversion (as long as the semantic is clearly understood). But what about implicit classes?
Implicit classes can be used to extend existing Scala standard library classes. Most of Scala classes are declared final or implement a sealed trait. Composition is a viable alternative to Inheritance in term of re-usability: the class to extend is wrapped into a helper or utility class. However, a helper/container class adds an indirection and "pollute" the name space.
Let's look at an example of extension of standard library.

Example
The use case consists of extending the Try class, scala.util.Try with a Either semantic: Execute a function if the code throws an exception, and another function if the computation succeeds. Such simple design allows computation flows to be routed depending on unexpected state.

The main construct is the declaration of a implicit class that wraps Try. A recovery function rec is called if an exception is thrown, a computation f is performed otherwise.

import scala.util._

implicit class Try2Either[T](_try: Try[T]) {
   
  def toEither[U](rec: ()=>U)(f: T=>T): Either[U,T] = _try match {
    case Success(res) => Right(f(res))
    case Failure(e) => println(e.toString); Left(rec())  
  }
}
You may notice that the method toEither is curried to segregate the two parameterized type T and U. It also comply with the semantic of Either with Left element for error (and recovery) and Right element dedicated to the successful outcome.

Let's take the example of the normalization of a large array of floating point values by their sum. A small value will generate a rounding error during the normalization and an exception is thrown. However we do not want to burden the client code with handling the exception (the client method may not know the context of the exception after all). In this example the recovery function rec instantiates the class Recover which is responsible for a retry, potentially from a different state. Implicit classes have an important limitation in terms of re-usability. You cannot override a default method without having to sub-class the original Scala standard library class, which is not an option in our example because Try is a sealed abstract class.
As with implicit methods, the name of the class is never actually used in the code but need to reflect the intention of the developer. Let's apply this implicit class
class Recover {
  def reTry(x: Array[Double]): Array[Double] = Array[Double](0.0)
}
 
def normalize(x: Array[Double]): Either[Recover,Array[Double]] = Try {
  val sum = x.sum 
  if(sum < 0.01) 
    throw new IllegalStateException(s"sum $sum")
  x.map( _ / sum) 
}.toEither(() => new Recover)((v: Array[Double]) => v.map( Math.log(_))

The implementation is very elegant. There is no need for new semantic and naming convention and the return type Either is very well understood in the Scala development community.

References

Sunday, December 14, 2014

Example of Scala Style Guide

Overview
There are many posts and articles dedicated to the style in Scala. The following style guide is heavily borrowed from Effective Scala - M. Eriksen-Twitter
Source code style and format
Editor
* Tab indentation: with 2 blank space/characters
* Margin: 100 characters
* Line wrap: 2 indentations

Scaladoc
The source comments complies with the Scaladoc tag annotation guideline

Organization of imports
Imports are defined in top of the source file and grouped in the following order [Scala standard library, 3rd party libraries, Scala for Machine Learning imports]
import scala. // Scala standard library
  import org....     // Third party libraries
  import my_package..  // Import related to your library
Collections
Some collection such as Set or Map are defined as mutable and immutable classes in Scala standard library. These classes are differentiated in the code by their package.
import scala.collection._
  ..
  val myMap = new mutable.HashMap[T, U]
  def process(values: immutable.Set[Double]) ...

Pipelining
Long pipelines of data transformation, operations and higher order method are written with one method per line
val lsp = builder.model(lrJacobian)
     .weight(createDiagonalMatrix(1.0)) 
     .target(labels)
     .checkerPair(exitCheck)
     .maxEvaluations(optimizer.maxEvals)
     .start(weights0)
     .maxIterations(optimizer.maxIters)
     .build
Constructors
Most of the class are declared as protected with package as scope. The constructors are defined in the class companion object using the apply method
protected class HMM[@specialized T <% Array[Int]](
    lambda: HMMLambda, 
    form: HMMForm, 
    maxIters: Int)
   (implicit f: DblVector => T) { }

object HMM {
  def apply[T <% Array[Int]](
      lambda: HMMLambda, 
      form: HMMForm, 
      maxIters: Int)
     (implicit f: DblVector => T): HMM[T] = 
         new HMM[T](lambda, form, maxIters)
Lengthy parameters declaration
The length of the declaration of some constructors and methods exceeds 100 characters. In this case, class or method is written with one argument per line.
def apply[T <% Array[Int]](
    lambda: HMMLambda, 
    form: HMMForm, 
    maxIters: Int)
   (implicit f: DblVector => T): HMM[T]
Null references vs. options
Null objects are avoided as much as possible: Option should be used instead of null references
def normalized(stats: Option[Stats]): Option[List[Double]] =
  xt.map( stats.map( _.compute).getOrElse(None)
Null references and Empty collections
Null collections should be avoided. Empty container such as List.empty, Array.empty... should be used instead.
def test: List[T] = {
  ...
  List.empty[T]
}
if( !test.isEmpty)
  ...
Class parameter validation
The parameters of a class are validated in the companion object through a private method check
protected class LogisticRegression[T <% Double](
    xt: XTSeries[Array[T]], 
    labels: Array[Int], 
    optimizer: LogisticRegressionOptimizer) {

  import LogisticRegression._
  check(xt, labels)
  ...
}

object LogisticRegression {
  private def check[T <% Double](xt: Array[Array[T]], labels: Array[Int]): Unit = {
    require( !xt.isEmpty,"Cannot compute the logistic regression of undefined time series")
    require(xt.size == labels.size, 
      s"Size of input data ${xt.size} is different from size of labels ${labels.size}")
  }
  ...
}
Exceptions
Scala 2.1+ exception handling scala.util.{Try, Success, Failure} is preferred instead of Java typed exception handling. The failure type can be extracted by matching the type of exception, if necessary and critical to the understanding of the code.
Try(process(args)) match {
  case Success(results) => …
  case Failure(e) => e match {
     case e: MathRuntimeError =>
  ..
}
View bounds for built-in type
Parameterized type with view bounded to a primitive are usually preferred to parameterized type with context bound (upper or lower bound)
class MultiLinearRegression[T <% Double](
     xt: List[Array[T]], 
     y: Array[Double])
Enumeration and case classes
As a general rule, enumeration is used only in the case the type has a single parameter convertible to an Int. Structures that require specific attributes are implemented as case classes
object YahooFinancials extends Enumeration {
  type YahooFinancials = Value
  val DATE, OPEN, HIGH, LOW, CLOSE, VOLUME = Value
  ...
}

sealed abstract class Message(val id: Int)
case class Start(i: Int =0) extends Message(i)
case class Completed(i: Int, xt: Array[Double]) extends Message(i)
case class Activate(i: Int, xt: Array[Double]) extends Message(i)

References

Saturday, November 29, 2014

Apache Spark/MLlib for K-means

Overview
Apache Spark attempts to address the limitation of Hadoop in terms of performance and real-time processing by implementing in-memory iterative computing, which is critical to most discriminative machine learning algorithms. Numerous benchmark tests have been performed and published to evaluate the performance improvement of Spark relative to Hadoop. In case of iterative algorithms, the time per iteration can be reduced by a ratio of 1:10 or more.
The core element of Spark is Resilient Distributed Datasets (RDD), which is a collection of elements partitioned across the nodes of a cluster and/or CPU cores of servers. An RDD can be created from local data structures such as list, array or hash tables, from the local file system or the Hadoop distributed file system (HDFS).

Apache Spark RDDs
The operations on an RDD in Spark are very similar to the Scala higher order methods. These operations are performed concurrently over each partition. Operations on RDD can be classified as:
* Transformation: convert, manipulate and filter the elements of an RDD on each partition
* Action: aggregate, collect or reduce the elements of the RDD from all partitions
An RDD can persist, be serialized and cached for future computation. Spark provides a large array of pre-built transforms and actions which go well beyond the basic map-reduce paradigm. Those methods on RDDs are a natural extension of the Scala collections making code migration seamless for Scala developers.
Apache Spark supports fault-tolerant operations by allowing RDDs to persist both in memory and in the file systems. Persistency enables automatic recovery from node failures. The resiliency of Spark relies on the supervisory strategy of the underlying Akka actors, the persistency of their mailboxes and replication schemes of HDFS.
Spark is initialized through its context. For instance, a local Spark deployment on 8 cores, with 8 Gbytes allocated for data processing (RDDs) in memory only and 512 Mbytes for the master process is defined as
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel

val sparkConf = new SparkConf()
            .setMaster("local[8]")
            .setAppName("SparkKMeans")
            .set("spark.executor.memory", "2048m")
            .set("spark.storageLevel", "MEMORY_ONLY")
            .set("spark.driver.memory", "512M")
            .set("spark.default.parallelism", "16")

implicit val sc = new SparkContext(sparkConf)

Apache Spark MLlib
MLlib is a scalable machine learning library built on top of Spark. As of version 1.0, the library is a work in progress. The main components of the library are:
* Classification algorithms, including logistic regression, Naïve Bayes and support vector machines
* Clustering limited to K-means in version 1.0
* L1 & L1 Regularization
* Optimization techniques such as gradient descent, logistic gradient and stochastic gradient descent and L-BFGS.
* Linear algebra such as Singular Value Decomposition
* Data generator for K-means, logistic regression and support vector machines.
The machine learning byte code is conveniently included in the spark assembly jar file built with the simple build tool.
Let's consider the K-means clustering components bundled with Apache Spark MLlib. The K-means configuration parameters are:
* K Number of clusters
* maxNumIters Maximum number of iterations for the minimizing the reconstruction error
* numRuns Number of runs or episode used for training the clusters
* caching Specify whether the resulting RDD has to be cached in memory
* xt The array of data points (type Array[Double])
* sc Implicit Spark context
.
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}

class SparkKMeans(
    K: Int, 
    maxNumIters: Int, 
    numRuns: Int,
    caching: Boolean,
    xt: Array[Array[Double]]) (implicit sc: SparkContext) {
  

  def train: Try[KMeansModel] = {
    val kmeans = new KMeans
    kmeans.setK(K)
    kmeans.setMaxIterations(maxNumIters)
    kmeans.setRuns(numRuns)
  
    val rdd = sc.parallelize(xt.map(new DenseVector(_)))
    rdd.persist(StorageLevel.MEMORY_ONLY)
    if( caching )
       rdd.cache
    kmeans.run(rdd)
  }
} 
The clustering model is created by the train method. Once the Spark/MLlib K-means is instantiated and intializes, the original data set xt is converted into a DenseVector then converted into a RDD. Finally the input RDD is fed to the Kmeans (kmeans.run)

References
Scala for Machine Learning - Patrick Nicolas - Packt Publishing

Thursday, November 13, 2014

Scala Immutability & Covariance

Overview
There is a relation between immutable and covariance which may not be apparent. Let's consider the case of a mutable and immutable implementation of a stack. The mutable stack is a container of elements with method to push element into (pop the last element from) the stack.
class MutableStack[T]  {
  import scala.collection.mutable.ListBuffer
  private[this] val _stack = new ListBuffer[T]
 
  def pop: Option[T] = if(_stack.isEmpty) None else Some(_stack.last)
  def push(t: T): Unit = _stack.append(t)
}
The internal container is defined as a ListBuffer instance. The elements are appended to the list buffer (push) and the method pop pops the last elements pushed onto the stack.
This implementation has a major inconvenient: It cannot accept elements of type other than T because ListBuffer is a invariant collection. Let's consider then a immutable stack

Immutability, covariance and tail recursion
An covariant immutable stack cannot access its elements unless its elements are contained by itself. This feat is accomplish by breaking down the stack recursively as the last element pushed into the stack and the previous state of the stack.
class ImmutableStack[+T](
    val t: T, 
    val stack: Option[ImmutableStack[T]]) {
  def this(t: T) = this(t, Some(new EmptyImmutableStack(t)))
  ...
}
In this recursive approach the immutable stack is initialized with a single element of type T and the option of the existing immutable stack. The stack can be defined as reusable with covariance because elements are managed by the stack itself stack.
The next step is to define the initial state of the stack. We could have chosen a singleton empty stack with no elements. Instead, we define the first state of the immutable stack as:
class EmptyImmutableStack[+T](t: T) 
     extends ImmutableStack[T](t, None)
Next let's define the pop and push operators for ImmutableStack. The pop method return the previous state of the immutable stack that is next to last element pushed into the stack. The push method is contra-variant as its push an element of super type of T. The existing state this stack is added as the previous (2nd argument) state.
def pop: ImmutableStack[T] = 
    new ImmutableStack[T](stack.get.t, stack.get.stack)
def push[U >: T](u: U): ImmutableStack[U] = new ImmutableStack[U](u, Some(this))
The next step is to traverse the entire stack and return a list of all its element. This is accomplished through a tail recursion on the state of the stack
def popAll[U >: T]: List[U] = pop(this,List[U]())

@scala.annotation.tailrec
private def pop[U >: T](_stack: ImmutableStack[U], xs: List[U]): List[U] = _stack match { 
  case st: EmptyImmutableStack[T] => xs.reverse
  case st: ImmutableStack[T] => pop(_stack.stack.get, _stack.t :: xs)
}
The recursion update the list xs and ends when the last state (2nd argument of ImmutableStack) is empty: of type EmptyImmutableStack. The list has to be reversed to index the list elements from the last to the first. As long as the stack is not empty (or type ImmutableStack) the method recurses.
It is time to test drive this immutable stack.
val intStack = new ImmutableStack[Int](4)
val newStack = intStack.push(56).push(14).push(77)

println(newStack.popAll.mkString(", "))
The results display 77, 14, 56, 4.
This examples illustrates the concept of immutable, covariant stack by using the instance of the stack has its state (current list of elements it contains).


References
Scala By Example - M. Odersky - June 2014

Thursday, October 30, 2014

Scala collect, partition higher order functions

Overview
The Scala higher order methods collect and partition are not commonly used, even though these collection methods provide developers with a higher degree of flexibility than map and filter.

TraversableLike.collect
The method create a new collection by applying a partial function to all elements of this traversable collection, such as arrays, list or map on which the function is defined. It signature is
def collect[B](pf: PartialFunction[A, B]): Traversable[B]
The use case is to validate K set (or samples) of data from a dataset. Once validated, these K sets are used in K-fold validation of a model generated through training of an machine learning algorithm: K-1 sets are used for training and the last set is used for validation. The validation consists of extracting K samples arrays from a generic array then test that each of these samples are not too noisy (standard deviation does not exceed a high threshold.
. The first step is to create the two generic functions of the validation: breaking the dataset into K sets, then compute the standard deviation of each set. This feat is accomplished by the ValidateSample trait
trait ValidateSample {
  type DVec = Array[Double]
 
  protected def split(x: DVec, nSegments: Int): IndexedSeq[DVec] = {
    require(nSegments > 1 && nSegments < 100, 
      s"Number of segments $nSegments if out of range")
    require(x.size >= nSegments, 
      s"Number of segments $nSegments should be < size datasets ${x.size}")
   
    val nStep = (x.size/nSegments).floor.toInt
    Range(0, nSegments).map(n => x.slice(n, n+nStep))
  }
 
  protected lazy val stdDev = (x: DVec) => {
    val sums = x.foldLeft((0.0, 0.0))( (s, x) => (s._1 + x, s._2 + x*x))
    Math.sqrt((sums._2 - sums._1*sums._1/x.size)/(x.size-1))
  }
 
  def filter(x: DVec, nSegments: Int): Boolean
}

The first method, split breaks down the initial array x into an indexed sequence of segments or sub-arrays. The standard deviation stdDev is computed by folding the sum of values and sum of squared values. The value is defined as lazy so it is computed on demand once for all. The first validation class ValidateSampleMap uses a sequence of map and find to test that all the data segments extracted from the dataset have a standard deviation less than 0.8
class ValidateSampleMap extends ValidateSample {
  override def filter(x: DVec, nSegments: Int): Boolean = 
    split(x, nSegments).map( stdDev(_) )
                      .find( _ > 0.8) == None
}

The second implementation of the validation ValidateSampleCollect uses the higher order function collectFirst to test that all the data segments (validation folds) are not very noisy. collectFirst requires a PartialFunction to be defined with a condition of the standard deviation.
class ValidateSampleCollect extends ValidateSample {
  override def filter(x: DVec, nSegments: Int): Boolean = 
    split(x, nSegments).collectFirst { 
        case arr: DVec => (stdDev(arr) > 0.8) } == None
}
There are two main differences between the map/find and collectFirst implementation
  • The second version requires a single higher order function, collectFirst, while the first version uses map and find.
  • The second version throws a MatchErr exception as soon as a data segment does not comply
These two implementations can be evaluated using a simple driver application that takes a ValidateSample as argument.
import scala.util.{Try, Success, Failure, Random}
  final val NUM_VALUES = 500
  def apply(filt: ValidateSample): Unit = {
    val rValues = Array.fill(NUM_VALUES)(0.5*Random.nextDouble) 
                  ++ Array.fill(NUM_VALUES)(2.0*Random.nextDouble)
  
    Try ( filt.filter(rValues, 2) ) match {
      case Success(seq) => println("Sets are valid")
      case Failure(e) => e match {
 case ex: MatchError => println(ex.toString)
 case _ => println("Other exception")
      }
    }
  }
}


TraversableLike.partition
The Higher order method partition is used to partition or segment a mutable indexed sequence of object into a two indexed sequences given a boolean condition (or predicate).
def partition(p: (A) ⇒ Boolean): (Repr, Repr)
The test case consists of segmenting an array of random values, along the mean value 0.5 then compare the size of the two data segments. The data segments, segs should have similar size.
  final val NUM_VALUES = 10000
  val rValues = Array.fill(NUM_VALUES)(Random.nextDouble)
 
  val segs = rValues.partition( _ >= 0.5)
  val ratio = segs._1.size.toDouble/segs._2.size
  println(s"Relative size of segments $ratio")
The test is executed with different size of arrays.:
NUM_VALUES  ratio
   50      0.9371
 1000      1.0041
10000      1.0002
As expected the difference between the two data segments size converges toward zero as the size of the original data set increases (law of large numbers).

References
Scala By Example - M. Odersky - June 2014

Friday, October 10, 2014

Scala Lists and Streams

Overview
A Stream instance can be regarded as lazy list, or more accurately a list with lazy elements. The elements are allocated only when accessed. Stream allows Scala developers to write infinite sequences. Elements can be removed from memory (to be handled by the GC)defined) by eliminating any reference to its elements once no longer needed.

Performance Evaluation
It is easy to understand the benefit of Stream in term of memory management. But what about the performance?
Let's compare Stream and List using 3 simple operations:
- Allocation of elements
- Retrieving a random element
- Traversing the entire collection
Let's start by defining these operations in the context of computing the mean value of a very large sequence of double values.
val NUM = 10000000

val lst = Range(0, NUM).foldLeft(List[Double]())((xs,n) => 
    n.toDouble :: xs)
Range(0, 10000).foreach( _ =>  lst(Random.nextInt(NUM-1))
lst.reduce( _ + _ )/lst.size
The operation of reading a value at a random index is very fast so it is repeated 10,000 times.
Let's implement the same sequence of operations on Stream
val strm = Range(0, NUM).foldLeft(Stream[Double]()((xs,n) => 
    Stream.cons(n.toDouble, xs))
Range(0, 10000).foreach( _ => strm(Random.nextInt(NUM-1))
strm.reduceRight( _ + _ )/strm.size

As you can see the implementation using Stream is almost identical to the implementation using List.
The test is run 20 times to avoid distortion of the initialization of the JVM. 

The allocation of the elements in the stream is slightly faster than the creation of the list. The main difference is the time required by the List and Stream to traverse the entire collection using a reducer reduceRight. The Stream has to allocate all its elements at that time. This scenario is very unlikely as Streams are usually needed to process section or slices of a very large sequence of values or objects, as demonstrated in the next section.

Use case
The most common application of Scala Stream is iterative or recursive application of a function/transform or sequence of functions to a very large data set, in this case, the mean value.
val strm = Range(0, NUM).foldLeft(Stream[Double]())((xs, _)
     => Random.nextDouble #:: xs)
(0 until NUM by STEP).foldRight(0.0)((i, s) => s + strm.slice(i, i+STEP).sum)/n
The performance of the computation of the mean can be greatly improved by parallel its execution, Stream.par

References
Streams in Scala: Part 1 Scott Shipp 2014
Streams in Scala: Part 2 Scott Shipp 2014