Sunday, April 26, 2020

Handling Scala 2.x Option Elegantly

Target audience: Beginner
Estimated reading time: 3'

This post reviews the different alternative mechanisms in Scala to handle errors. It also illustrates the applicability of the Option monad.

Table of contents
Follow me on LinkedIn

The Option monad is a great tool for handling error, in Scala: developers do not have worry about NullPointerException or handling a typed exception as in Java and C++.

Notes:
  • For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted 
  • The code associated with this article is written using Scala 2.12.4

Use case

Let's consider the simple square root computation, which throws an exception if the input value is strictly negative (line 2). The most common "java-like" approach is to wrap the computation with a try - catch paradigm. In Scala catching exception can be implemented through the Try monad (lines 7-9).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def sqrt(x: Double): Double = 
    if(x < 0.0) 
       throw MathException(s"sqrt: Incorrect argument $x")
    else 
       Math.sqrt(x)
 
Try ( sqrt(a)) match {
    case Success(x) => {}
    case Failure(e) => Console.println(e.toString)
}

This type of implementation put the burden on the client code to handle the exception. The Option monad provides developer an elegant to control the computation flow.

Handling Option values

Clearly, there is no need to compute y if x is negative.
The most common to handle a Scala option is to unwrap it. Let's consider the function      y = sin(sqrt(x))
def sqrt(x: Double): Option[Double] = {
    if(x < 0.0) None
    else Math.sqrt(x)
}
 
 
def y(x: Double): Option[Double] = sqrt(x) match {
    case Some(y) => Math.sin(x)
    case None => None
}

The computation of the square root is implemented by the method sqrt while the final computation of sin(sqrt(x)) is defined by the method y.

This implementation is quite cumbersome because the client code has to process an extra Option. An alternative is to provide a default value (i.e 0.0) if the first computational step fails.

 
def y(x: Double): Double = Math.sin(sqrt(x)).getOrElse(0.0)

A more functional and elegant approach uses the map higher order function to propagate the value of the Option.

 
def y(x: Double): Double =  sqrt(x).map(Math.sin(_)).getOrElse(0.0)

What about a sequence of nested options? Let's consider the function y = 1/sqrt(x). There are two types of errors:
  • x < 0.0 for sqrt
  • x == 0.0 for 1/x
A third solution consist of applying the test for x > 0.0 to meet the two conditions at once.

 
def y(xdef y(x: Double): Double = 
    if(x < 1e-30) None  else Some(1.0/(Math.sqrt(x)))


for comprehension for options

However anticipating the multiple complex conditions on the argument is not always possible. The for comprehensive for loop is an elegant approach to handle sequence of options.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def inv(x: Double): Option[Double] = {
     if(Math.abs(x) < 1e-30) None
     else 1.0/x
}

def log(x: Double): Option[Double] = {
     if(x < 1e-30) None
     else Math.log(x)
}
 
def compose(x: Double): Double =
   (for {
       y <- sqrt(x)
       z <- inv(y)
       t <- log(z)
    } yield t).getOrElse(0.0)

The objective is to compose the computation of a square root with the inverse function inv (line 1) and natural logarithm log (line 6). The for comprehension construct (lines 11-15) propagates the result of each function to the next in the pipeline through the automatic conversion of option to its value. In case of error (None), the for method exists before completion.
For-comprehension is a monad that compose (cascading) multiple flatMap with a final map method.

Thank you for reading this article. For more information ...

References


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Sunday, February 23, 2020

Implement Kernel Density Estimator in Spark

Target audience: Intermediate
Estimated reading time: 4'

This article introduces a very powerful, non-parametric method to extract an empirical continuous probability density function from a dataset: Multivariate Kernel Density Estimation (KDE), also known as the Parzen’s windowAt its core the KDE is a smooth approximation of an histogram [ref 1] 


Table of contents

Follow me on LinkedIn
Notes
  • This article requires a basic knowledge in Apache spark MLlib framework and understanding of statistics and/or machine learning.
  • This implementation relies on Spark 2.3.1 and Scala 2.12.4

Introduction

KDE is one of the most well-known approaches to estimate the underlying probability density function of a dataset. KDE will learn the shape of the density from the data automatically. This flexibility arising from its non- parametric nature makes KDE a very popular approach for data drawn from a complicated distribution.

The KDE algorithm takes a parameter, bandwidth, that affects how “smooth” the resulting curve is. For a set of observations y, and given a kernel function K and a bandwidth, the estimation of the density function f, can be expressed as.


This post addresses the limitations of the current implementation of KDE in Apache Spark for the multi-variate features.

Spark implementation

Apache Spark is a fast and general-purpose cluster computing solution that provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.
The Apache Spark ecosystems includes a machine learning library, MLlib.

The implementation of the kernel density estimation in the current version of Apache Spark MLlib library, 2.3.1 
[ref 2]org.apache.spark.mllib.stats.KernelDensity has two important limitations:
  • It is a univariate estimation
  • The estimation is performed on a sequence of observations, not an RDD or data set, putting computation load on the Spark driver.
An example of application of KDE using Apache Spark MLlib 2.3.1 ... 

val sample = sparkSession.sparkContext.parallelize(data) 
 
val kd = new KernelDensity().setSample(sample).setBandwidth(3.0)
 
val densities = kd.estimate(Array(-2.0, 5.0))

The method setSample specifies the training set but the KDE is actually trained when the method estimate is invoked on the driver. 

Multivariate KDE

The purpose of this post is to extend the current functionality of the KDE by supporting multi-dimensional features and allows the developers to apply the estimation to a dataset. This implementation is restricted to the Normal distribution although it can easily be extended to other kernel functions. 
We assume 
  • The reference to the current Spark session is implicit (line 1)
  • The encoding of a row for serialization of the task is provided (line 1)
The method estimate has 3 arguments 
  • TrainingDS training dataset (line 9)
  • Validation validation set (line 10)
  • bandwidth size of the Parzen window
The validation set has to be broadcast to each worker nodes (line 14). This should not be a problem as the size of the validation set is expected of reasonable size. 
The training set is passed to each partitions as iterator through a mapPartitions (line 17). The probability densities and count are computed through a Scala aggregate method with a zero function of type, (Array[Double], Long) (line 23). The sequence operator invokes the multinomial normal distribution (line 29). 
  
The combiner (3rd argument of the aggregate) relies on the BLAS vectorization z = <- a.x+y dxapy (line 38). BLAS library [ref 3has 3 levels (1D, 2D and 3D arrays). Blas library
The vector of densities is scaled with invCount using the decal BLAS level 1 method (line 45).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final class KDE(implicit sparkSession: SparkSession, 
      encoder: Encoder[Row]) {
 
  /**
    * Applied the trained KDE to a set of validation data
    * @param trainingDS  Training data sets
    * @param validationRdd Validation data sets
    * @return Datasets of probability densities
    */
  def estimate(
    trainingDS: Dataset[Obs], 
    validationDS: Dataset[Obs], 
    bandwidth: Double = 1.0): Dataset[Double] = {
    import math._, sparkSession.implicits._
    val validation_brdcast = sparkSession.sparkContext
            .broadcast[Array[Obs]](validationDS.collect)

    trainingDS.mapPartitions((iter: Iterator[Obs]) => {
       val seqObs = iter.toArray
       val scale = 0.5 * seqObs.size* log(2 * Pi)
       val validation = validation_brdcast.value

       val (densities, count) = seqObs.aggregate(
         (new Array[Double](validation.length), 0L) ) (
           {        // seqOp (U, T) => U
            
             case ((x, z), y) => {
                var i = 0
                while (i < validation.length) {   
                 // Call the pdf function for the normal distribution
                    x(i) += multiNorm(y, bandwidth, scale, validation(i))
                    i += 1
                }
                (x, z + 1)  // Update  count & validation values
             }
          },
          {         // combOp: (U, U) => U
             case ((u, z), (v, t)) => { 
                // Combiner calls vectorization z <- a.x + y
                blas.daxpy(validation.length, 1.0, v, 1, u, 1)
                (u, z + t)
             }
          }
      )

      val invCount: Double = 1.0 / count
      blas.dscal(validation.length, invCount, densities, 1)  
          // Rescale the density using LINPACK z <- a.x
      densities.iterator
    })
  }
}

The companion singleton is used to define the multinomial normal distribution (line 5). The type of observations (feature) is Array[Double].

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
final object KDE {
    import math._
    type Obs = Array[Double]

    @throws(classOf[IllegalArgumentException])
    def multiNorm(
         means: Obs, 
         bandWidth: Double, 
         scale: Double, 
         x: Obs): Double = {
      require(x.length == means.length, 
           "Dimension of means and observations differs")

       exp(
          -scale - (0 until means.length).map( 
             n => {
                val sx = (means(n) - x(n)) / bandWidth
                -0.5 * sx * sx
             }
       ).sum
    )
  }
}

Application

This simple application requires that the spark context (SparkSession) to be defined as well as an explicit encoding of Row using Kryo serializer. The implicit conversion are made available by importing sparkSession.implicits.
The training set is a sequence of key-value pairs (lines 3-14). The validation set is synthetically generated by multiplying the data in the training value with 2.0 (line 17).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
implicit val sparkSession: SparkSession =    
       confToSessionFromFile(LocalSparkConf)
implicit val encoder = Encoders.kryo[Row]
import sparkSession.implicits._

val trainingData = Seq[(String, Array[Double])](
     ("A", Array[Double](1.0, 0.6)), ("B", Array[Double](2.0, 0.6)), 
     ("C", Array[Double](1.5, 9.5)), ("D", Array[Double](4.5, 0.7)), 
     ("E", Array[Double](0.4, 1.5)), ("F", Array[Double](2.1, 0.6)),
     ("G", Array[Double](0.5, 6.3)), ("H", Array[Double](1.5, 0.1)), 
     ("I", Array[Double](1.2, 3.0)), ("B", Array[Double](3.1, 1.1))
  ).toDS

  val validationData = trainingData
      .map { case (key, values) => values.map(_ *2.0) }

  val kde = new KDE
  val result = kde.estimate(trainingData.map(_._2),validationData)

  println(s"result: ${result.collect.mkString(", ")}")

  sparkSession.close


  val data = Seq[Double](1.0, 5.6)
  val sample = sparkSession.sparkContext.parallelize(data)
  val kd = new KernelDensity().setSample(sample) .setBandwidth(3.0)
  val densities = kd.estimate(Array(-2.0, 5.0))


Note: There are excellent research papers highlighting the statistical foundation behind KDE as well as recent advances [ref 4].

Thank you for reading this article. For more information ...

References

[3BLAS
Environment Scala: 2.12.4,  Java JDK 1.8, Apache Spark 2.3.1, OpenBLAS 0.3.4


---------------------------

Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Wednesday, December 11, 2019

Variance Annotation in Scala 2.x

Target audience: Intermediate
Estimated reading time: 3'

The purpose of this post, is to introduce the basics of parameterized classes, upper and lower bounds and variance sub-typing in Scala.
Table of contents
Overview
Follow me on LinkedIn

Note: The code associated with this article is written using Scala 2.12.4

Overview

Scala is a statically typed language similar to Java and C++. Most of those concepts have been already introduced and commonly in Java with generics and type parameters wildcard. However Scala added some useful innovation regarding sub-typing covariance and contra-variance. API designers and library developers are facing the dilemma of choosing between parameterized types and abstract types and which parameters should be invariant, covariant or contra-variant.


Sub-classing

Like in Java, generic or parameterized type have been introduced to overcome the limitation of polymorphism. Moreover, understanding of concept behind the Scala type system help developers decipher and fix the occasional compilation failure related to typing.
Let's consider a simple collection class ,Container that adds and retrieves an array of items of type Item as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class Item
case class SubItem() extends Item
 
class Container {
  def add(items: Array[Item]) : Unit =  items.foreach( add(_) )

  def retrieve(items: Array[Item]) : Unit =
    Range(0, items.size).foreach(items.update(_, retrieve))

  def add(item: Item) : Unit = { }
  def retrieve : Item = new SubItem
}

The client code can invoke the add (lines 5 & 10) and retrieve (lines 7 & 11) methods passing an array of elements of type Item. However passing an array of type SubItem inherited from Item (line 2) will generate a compiling error!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
val subItemsContainer = new Container
val array = Array[SubItem](new SubItem())
 
  // Compile error:
  // "type mismatch;  found   : Array[SubItem]  
  // required: Array[Item] 
  // Note: SubItem <: Item, but class Array is 
  // invariant in type T..."
 subItemsContainer.add(array)
 subItemsContainer.retrieve(Array[SubItem](new SubItem))


The Scala compiler throws a compiling error because the class Container does not have the adequate information about the sub type Item into SubItem in the add method (line 9). The invocation of the retrieve method, passing an argument of type SubItem generates also a compiler error. 
Scala provides developers with upper bounds and lower bounds sub-typing which is quite similar to the type variance available in Java.

Variance to the Rescue

The type T upper bounded by the type Item as defined by the following notation (line 1). T <: Item can be processed as a 'producer' of type Item. The notation provides the Scala compiler with any type inheriting Item. It should be allowed to be passed as argument of the add method (lines 2 & 7).
Similarly, the type U with a lower bound type SubItem
   U >: SubItem
can be processed as a 'consumer' operation of type SubItem, in the retrieve method (lines 4 & 8).

1
2
3
4
5
6
7
8
9
class Container[T <: Item] {
   def add(items: Array[T]) : Unit = items.foreach( add(_) )
      
   def retrieve[U >: SubItem](items: Array[U]) : Unit = 
       Range(0, items.size)foreach(items.update(_, retrieve))
  
   def add(item: T): Unit = {}
   def retrieve: U = new SubItem
}

Container[T] is a covariant class for the type Item as argument of the method add. The method retrieve[U] is contra-variant for the type SubItem.
This time around the compiler has the all the necessary information to subtype Item and super type SubItem, therefore the following code snippet won't generate a compiling error

val itemsContainer = new ContainerT[SubItem]
itemsContainer.add(Array[SubItem](new SubItem)))
itemsContainer.retrieve[Item](Array[Item](new SubItem))


Scala has a dedicated annotation for unvariance 'T', covariance '+T' and contra-variance '-T'. Using this annotation the container class can be expressed as follow.

1
2
3
4
5
6
7
8
9
class Container[+T] {
   def add(items: Array[T]) : Unit = items.foreach( add(_) )
 
   def retrieve[-U](items: Array[U]) : Unit = 
          Range(0, items.size).foreach(items.update(_, retrieve))
  
   def add(item: T) : Unit = { }
  
   def retrieve : U = new SubItem
}

In a nutshell, variances can be represented as functors:
Covariance: if (B inherit A) then (Container[B] inherit Container[A])
Contra-variance: if( B inherit A) then (Container[A] super class of Container[B])
Univariance: No rule applies.
 

The sub-typing rules above can be represented graphically 


Thank you for reading this article. For more information ...

References



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3