Monday, January 16, 2017

Histogram-Based Approximation Using Apache Spark

Target audience: Intermediate
Estimated reading time: 5'

This post illustrates the histogram-based function approximation for very large data sets using Apache Spark and Scala.

Follow me on LinkedIn

Note: Implementation relies on Scala 2.11.8, Spark 2.0

Overview

The previous post introduces and describes a model to approximate or fit a function to a given data set, using an histogram implemented in Scala. (see Histogram-based approximation using Scala).
This post re-implements the dynamic resizable histogram using Apache Spark with limited changes in the original code base.


Histogram

The Apache Spark version of histogram-based approximation reuses the classes Bin and Histogram described in the previous post. We made some minor changes to the constructors of the Histogram. As a reminder the Histogram class was defined as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
final class Histogram(var step: Double, 
     private var values: mutable.ArrayBuffer[Bin]) {
  var min = values.head._x - 0.5*step
  var max = values.last._x + 0.5*step

  private val maxNumBins = (values.length+1)<<RADIX

  def this(first: Double, step: Double, initNumBins: Int) = 
      this(step, Range(0, initNumBins-1)
      .scanLeft(first)((s, n) => s + step)
      ./:(new mutable.ArrayBuffer[Bin])(
              (vals, x) => vals += (new Bin(x)) )
       )

  def this(params: HistogramParams) = 
      this(params.first, params.step, params.initNumBins)
  
  def train(x: Double, y: Double): Unit
  final def predict(x: Double): Double
   // ….
}

The class Histogram has three constructors, for convenience's sake: 
  • Instantiation of an histogram using an estimate of the _x value of the first bin, the width of the bins, step and the initial number of bins, initNumBins. This constructor is invoked for the first batch of data .
  • Instantiation of an histogram using an initial given step and an existing array of bins/buckets of width step. This constructor is invoked for subsequent batch of data (line 8-12).
  • Instantiation of an histogram using its parameters params (line 15).
The parameters for the histogram are encapsulated in the following HistogramParams class described in a previous post.

I added an implementation of the concatenation of array bins which is required by the reducer implemented by a fold action on Spark data nodes (line 6)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
object Histogram {
  final val EPS = 1e-12
  final val RADIX = 8
  final def sqr(x: Double): Double = x*x

  def +(itBins1: Array[Bin], itBins2: Array[Bin]): Array[Bin]={
    val histBins = new mutable.ArrayBuffer[Bin]
    itBins1.scanLeft(histBins)((newBins, bin) => newBins +=bin)
    itBins2.scanLeft(histBins)((newBins, bin) => newBins +=bin)
    histBins.toArray
  }
}


Test data generation

For testing purpose, we need to generate a data set of very large (or even infinite) size. For this purpose we load an existing data set from a file which is duplicated then modified with a random uniform noise to create a large number of batch to be recursively processed by Spark data nodes. 

The amplitude of the randomization (synthetic noise) is significant enough to generate different data point while preserving the original data pattern so the algorithm can be easily validated.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final class DataGenerator(sourceName: String, nTasks: Int) {
  private final val DELIM = " "
  private final val RATIO = 0.05
  var datasetSize: Int = _

    // Generate the RDD from an existing data set template
    //  and a noise function
  def apply(sc: SparkContext): RDD[(Float, Float)] = {
    val r = new Random(System.currentTimeMillis + 
           Random.nextLong)
    val src = Source.fromFile(sourceName)
    val input = src.getLines.map(_.split(DELIM))
      ./:(new mutable.ArrayBuffer[(Float, Float)])
        ((buf, xy) => {
         val x = addNoise(xy(0).trim.toFloat, r)
         val y = addNoise(xy(1).trim.toFloat, r)
         buf += ((x, y))
    })

    datasetSize = input.size
    val data_rdd = sc.makeRDD(input, nTasks)
    src.close
    data_rdd
  }

  private def addNoise(value: Float, r: Random): Float = 
      value*(1.0 + RATIO*(r.nextDouble - 0.5)).toFloat
}

The data is generated for a specific number of tasks nTasks and loading the data from a local file sourceName (line 1). The generation of the histogram is implemented by the method apply. Once loaded from file (line 12), the input is generated locally as an array of (Float, Float) (line 12 -13).
Next, the algorithm adds randomization addNoise (line 15, 16) before converting into a RDD (line 21).


Apache Spark client

The first step is to create a Spark configuration and context for a given number of concurrent tasks.

1
2
3
4
5
6
val numTasks: Int = 12

val conf = new SparkConf().setAppName("Simple Application")
   .setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

The main goal of the Spark driver is to apply a tail recursive function across all the batches of data. The histogram is computed/updated on the Spark data node, then reduced to a single histogram which is then broadcasted to all the nodes to be updated with the next batch of data.


The RDDs are generated by an instance of the DataGenerator class as input (line 1). The input is loaded recursively into multiple partitions using the Spark method mapPartitionsWithIndex (line 11). Each partition generates a new array of bins (line30) once the class Histogram is instantiated (line 17) then trained (line 24). The data is then grouped by partition index (line 28). finally the fold method is invoked to generate the array of bins (line 30).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
val input = new DataGenerator("small.data", numTasks)

@scala.annotation.tailrec
def execute(iter: Int, 
    initParam_brdcast: Broadcast[HistogramParams]): ArrayBuffer[Bin]={
  val initParam = initParam_brdcast.value

  if( iter <= 0)
    initParam.values
  else {
    val hist_rdd = input(sc).mapPartitionsWithIndex(
     (idx: Int,  it: Iterator[(Float, Float)]) => {

      // The first invocation used the broadcasted parameters for
      //  the histogram and the subsequent invocation update 
      // the distribution of histogram bins
      val histogram =
        if (initParam.values.length == 0) 
           new Histogram(initParam)
        else 
           new Histogram(step, initParam.values)

      // Train the histogram with a new batch of data
     it.foreach { case (x, y) => histogram.train(x, y) }
         
     histogram.getValues.map((idx, _)).iterator
     })
     .groupBy(_._1)
     .map { case (n, itBins) => itBins.map(_._2).toArray }
     .fold(Array[Bin]())(Histogram +)

    execute(iter-1, sc.broadcast(
     HistogramParams(0.0, initParam.step, 0, new ArrayBuffer[Bin]++hist_rdd)
    )
  }
}

Note:
The invocation of the recursive method execute on the Apache Spark driver (line 4) broadcasts the current (recently updated) parameters of the histogram initParms_brdcast as one of the arguments of the recursive call (line 5). Upon receiving the broadcast values, the partitions load the next batch of data, generates and processes the RDD input.apply(sc) and update the histogram parameters (line 11).

Evaluation

Let's write a test driver to generate the Histogram using Spark RDD. After their initialization (line 1-4), the parameters of the histogram are simply broadcasted to the remote data/worker nodes (line 7,8).

1
2
3
4
5
6
7
8
9
val initNumBins = 2048
val range = (-1.0, 2.0)
var step = (range._2 - range._1)/initNumBins
val maxRecursions = 4
   
val hist = execute(maxRecursions, 
    sc.broadcast(
         HistogramParams(range._1 + 0.5 * step, step, initNumBins))
  )


References

Friday, November 18, 2016

Recursive Mean & Standard Deviation in Scala

Target audience: Intermediate
Estimated reading time: 3'

Have you considered the possibility of accelerating statistical computations? We're introducing the concept of tail recursion as a method to enhance the efficiency of calculating mean and standard deviation.


Table of contents
Follow me on LinkedIn
Note: Scala version 2.11.8

Overview

The computation of the mean and standard deviation of a very large data set may cause overflow of the summation of values. Scala tail recursion is a very good alternative to compute mean and standard deviation for data set of unlimited size.

Direct computation

There are many ways to compute the standard deviation through summation. The first mathematical expression consists of the sum the difference between each data point and the mean.
\[\sigma =\sqrt{\frac{\sum_{0}^{n-1}(x-\mu )^{2}}{n}}\]
The second formula allows to update the mean and standard deviation with any new data point (online computation).
\[\sigma =\sqrt{\frac{1}{n}\sum_{0}^{n-1}x^{2}-{\mu ^{2}}}\]
This second approach relies on the computation the sum of square values that can overflow


1
2
3
4
val x = Array[Double]( /* ... */ )
val mean = x.sum/x.length
val stdDev = Math.sqrt((x.map( _ - mean)
    .map(t => t*t).sum)/x.length)

A reduceLeft can be used as an alternative of map{ ... }.sum for the computation of the standard deviation (line 3).

Recursive computation

There is actually no need to compute the sum and the sum of squared values to compute the mean and standard deviation. The mean and standard deviation for n observations can be computed from the mean and standard deviation of n-1 observations.
The recursive formula for the mean is
\[\mu _{n}=\left ( 1-\frac{1}{n} \right )\mu _{n-1}+\frac{x_{n}}{n}\] The recursive formula for the standard deviation is
\[\varrho _{n}=\varrho _{n-1}+(x_{n}-\mu _{n})(x_{n}-\mu _{n-1}) \ \ \ \ \sigma _{n} = \sqrt{\frac{\varrho _{n}}{n}}\] 
Let's implement these two recursive formula in Scala using the tail recursion (line 4).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def meanStd(x: Array[Double]): (Double, Double) ={
    
  @scala.annotation.tailrec
  def meanStd(
      x: Array[Double], 
      mu: Double, 
      Q: Double, 
      count: Int): (Double, Double) = 
    if (count >= x.length) (mu, Math.sqrt(Q/x.length))
    else {
      val newCount = count +1
      val newMu = x(count)/newCount + mu * (1.0 - 1.0/newCount)
      val newQ = Q + (x(count) - mu)*(x(count) - newMu)
      meanStd(x, newMu, newQ, newCount)
    }
  
  meanStd(x, 0.0, 0.0, 0)
}

This implementation update the mean and the standard deviation for each new data point simultaneously. The recursion exits when all elements have been accounted for (line 9).

References

Sunday, September 18, 2016

Dependencies Injection in Scala

Target audience: Advanced
Estimated reading time: 4'

This post describes the concept of injection dependencies along with an implementation of the Cake pattern.
Versions: Java 11, Scala 2.11.6

Overview

Dependency injection is a design pattern that has been widely used in Java, by leveraging frameworks such as Spring. The objective of the pattern is to replace hard-coded dependencies with run-time association or injection of new type.

Java defines modules or components through the semantics and convention of packages. The functionality of a module is defined through one or more interfaces and implemented through the composition and inheritance of concrete classes. Polymorphism is used to "dynamically wire" those classes, assembled through composition and inheritance into patterns which address a specific design problem (publish-subscribe, strategy, factory..).
However those capabilities have been proven limited for creating very dynamic and complex applications. The Scala programming language provides developers with a dependencies injection mechanism based on self type annotation and that does not rely on 3rd party framework.

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted 


Reuse through inheritance

The simplest and commonly used form of reuse in any Object Oriented Programming is Inheritance. Let's consider an interface House which is implemented by an abstract or concrete class 'House with Furniture & Appliance" which in turn is sub-classed by a well defined House.

It is well documented that inheritance is a poor mechanism for code reuse because data is not properly encapsulated as a sub-class may access internals of the base class. Moreover any future changes in the base class of interface (Framework) will propagate through the sub-class (dependencies).

Reuse through composition

It is a well documented and researched fact that composition provides a more robust encapsulation than inheritance as the main class delegates or routes method invocation to the appropriate internal components. Contrary to inheritance for which changes in the base class may have unintended consequences over the sub-classes, changes in components or inner classes can be made independently of the main class or outer component.
Clearly in the example above, composition is more appropriate. After all a House with Furniture and Appliance can be defined as a House that contains Furniture and Appliance:

Inversion of control

Framework such as Spring have introduced the concept of Inversion of Control Containers (IoC) and dependency injection which is a form of IoC. In case of inversion of control, a framework define interfaces which are extended or implemented by the application or client code. Instead of having the application using the Framework API, the framework relies on the application for implementing a specific function.

Let's take the example of a generic service,Service that access a database using Java.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public interface Service {
    JSonObject query(String mySQLQuery);
}
 
public interface DbAccess { }
 
public class ServiceImpl implements Service {
  private Dbaccess dbAccess = null;
  public void setDbaccess(Dbaccess dbAccess) { 
    this.dbAccess = dbAccess; 
  }
 
  @override
  public JSonObject query(String mySQLQuery) {}
}

In the example above, a concrete implementation of DbAccess interface such as mySQLQuery can be injected (as passed to the implementation of the service, line 9). The database access instance is used to enable the query and return a JSON object (line 14).
Scala provides the developer with a similar and powerful mechanism to inject dependencies to a concrete class, known as Cake pattern.

Dependency injection

At its core, dependencies injection relies on 3 components:
- Consumer or list of dependent components
- Provider which injects the dependencies
- Dependencies

Let's consider the recipe example above. A House requires not only Furniture, Appliance but a Layout plan with step by step instructions. 


Each piece of furniture is defined by its name, category and price. More specific categories of furniture such as PatioFurniture and BathroomFurniture can also be created with similar arguments.

case class Furniture(name: String, category: String, price: Double) 
 
case class PatioFurniture(
  name: String, 
  price: Double, 
  season: String
) extends Furniture(name, "Patio", price)
 
case class BathroomFurniture(
  name: String, 
  price: Double, 
  floor: Int=1
) extends Furniture(name, "Bathroom", price)

A house contains also appliances and require a layout plan to be setup. An appliance has a name, price and a warranty if needed and available. The class Layout is instantiated with a name and an option.

case class Appliance(
  name: String, 
  warranty: Boolean, 
  price: Double)

case class Layout(name: String, option: Int)

The goal is to furnish a house with a combination of appliances, pieces of furniture, following a layout plan. The implementation computes the total cost, once a combination of furniture, appliance and layout is selected.

Cake pattern: traits

To this purpose, we create several modules, implemented as a trait of type FurnitureModule (line 1) to encapsulate each category of furniture. In our case the FurnitureModule define patio furniture, PatioFurniture (lines 6, 10). The bathroom furniture, instance of BathroomFurniture (lines 15- 19) is wrapped into its dedicated module BathroomFurnitureModule (line 14).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
trait FurnitureModule {
    // abstract attribute
  val furnitures: List[Furniture]
   
  class Furniture(id: String, category: String, price: Double) 
  class PatioFurniture(
    id: String, 
    price: Double, 
    season: String
  ) extends Furniture(id, "Patio", price)
}
 
 
trait BathroomFurnitureModule extends FurnitureModule {
  class BathroomFurniture(
    id: String, 
    price: Double, 
    floor: Int=1
  )  extends Furniture(id, "Bathroom", price)
}

Let's dig into the code ... 
The first trait, FurnitureModule defines a generic Furniture (line 5) and a specialized furniture type, PatioFurniture. Dynamic binding is managed by the module that encapsulates the hierarchy of furniture types. Alternatively, the client code can manage dynamic binding or dependency injection by creating a sub-module, BathroomFurnitureModule (line 14), to manage other type of furniture, BathroomFurniture. The bathroom furniture, instance of BathroomFurniture (lines 15- 19) is wrapped into its dedicated module BathroomFurnitureModule (line 14). It should be clear, by now that these two different approaches
  • Customizing modules
  • Customizing classes or types within each module
can be applied either separately or all together.

The same strategy is applied to the Appliance and Layout.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
trait ApplianceModule {
  val appliances: List[Appliance]
   
  class Appliance(name: String, warranty: Boolean, price: Double) {
     
    def this(name: String, price: Double) = this(name, true ,price)
    def cost: Double = if( warranty ) price * 1.15 else price
  }
}
 
 
trait LayoutModule {
  val layout: Layout
     
  class Layout(name: String, option: Int) {
    val movingInstructions: List[String] = List.empty
  }
}

A single class Appliance (line 4) with two constructors (lines 4 & 6) is enough to describe all types of appliances in the ApplianceModule space or scope. The same concept applies to the LayoutModule component (lines 12, 17). 

Cake pattern: implementation

The factory class, RoomFurnishing,(line 1) relies on a self referential condition, using one of the components, LayoutModule and other components as mixin, ApplianceModule and FurnitureModule (line 2).. The factory class defines all the methods that is required to manage any combination of the components to enable us to compute the total cost (line 4).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class RoomFurnishing {
self: LayoutModule with FurnitureModule with ApplianceModule => 

  def cost: String = {
    val totalCost = furnitures.map(_.cost).sum + appliances.map(_.cost).sum
    s"RoomFurnishing: ${totalCost }"
  }
    
  def movingDate: String = "October, 2013"
}

Here is an example how the client code can dynamically assemble all the components and compute the total cost.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
val houseFurnishing =
 if( country ) 
   new RoomFurnishing 
     with FurnitureModule 
       with ApplianceModule 
        with LayoutModule {
 
    val layout = new Layout("Country Home", 2)

    val furnitures = List[Furniture](
      new Furniture("Modern Chair", "Chair", 136.0), 
      new PatioFurniture("Bench", 350.0, "Summer")
    )
      
    val appliances = List[Appliance](
      new Appliance("Microwave Oven", false, 185.0), 
      new Appliance("Dishwaher", true, 560.0)
    )
  }
    
  else 
    new RoomFurnishing 
      with BathroomFurnitureModule 
        with ApplianceModule 
          with LayoutModule {
 
      val layout = new Layout("Apartment", 4)
      val furnitures = List[BathroomFurniture](
        new BathroomFurniture("Stool", 89.5), 
        new BathroomFurniture("Cabinet", 255.6, 2)
      )
       
      val appliances = List[Appliance](
        new Appliance("Microwave Oven", false, 185.0), 
        new Appliance("Dishwaher", true, 560.0)
      )
  }
Console.println(houseFurnishing.cost)

The dynamic composition of Furniture, appliance given a layout to furnish a room, instance of RoomFurnishing is made clear in the code snippet. If the goal is to furnish a country house (line 2), then the interior decorator selects Modern chair (line 11), bench (line 12), Microwave Oven (line 16) and Dishwasher (line 17) following a Country Home layout (line 8). The same principle applies to the selection of appliances and furniture to furnish an apartment (lines 22- 36) 

This technique combines class composition, inheritance, self-reference and abstract variables to provide a simple and flexible framework. The post also introduces the concept of layout or assembler to hide some complexity and become the primary mixin. I may elaborate on this concept in a future post. I strongly recommend the article written by Jonas Bonér on Cake pattern (listed in the references) to get in depth understanding on both the motivation and variant of this pattern.

References