Sunday, February 26, 2017

Implement Function Vectors in Scala

Target audience: Intermediate
Estimated reading time: 4'

This article describes the definition and an implementation of the functions vector.

Follow me on LinkedIn
Note: Implementation relies on Scala 2.11.8

Overview

Space of functions are commonly used in machine learning (kernel functions) and differential geometry (tensors and manifolds).The most common technique to create a functions space is to define the space as a category and assign monadic operations to it.
This post discusses an alternative, less elaborate approach to define a functions space as a container. The key operator on function is the composition as defined in the Function1[-T, +R] class:
    def andThen[A](g: (R) ⇒ A): (T) ⇒ A
     def compose[A](g: (A) ⇒ T): (A) ⇒ R
Let's use andThen as the basic block to build our functions space.



... andThen

The andThen is a member method of some of the Scala standard library collections such as List and HashMap.

val xsInt = List[Int](4, 0, 9, 56, 11) 
 
xsInt.andThen((n: Int) => n*n).mkString(",")
   // print 16,0,81,3136,121

As mentioned in the introduction, the Function1 implements andThen to compose two functions as this o otherFunction where the o composition operator is defined as
     f o g:  (t: T) => f(g(t))
The method is implemented using the andThen method

val fg = Math.sin andThen (x: Double) => x*x
 
Console.println(s"fg(5.0): ${fg(5.0)}")


Function vectors space

A function space is defined as a space of function vectors, similar to Euclidean space for which variables is defined as a vector or array of real values. A function vector is commonly used to convert a vector or tensor from one space to another non-euclidean space by transforming the original coordinates (x,y,z) to another coordinates in a different space (f(x,y,z), ..) using the function vector.

Let's implement the functions vector as the class FunctionVector (line 1) and its two most important method, composition (line 2) and the dot product (tensor product) (line 3).

1
2
3
4
5
6
class FunctionVector[T](fVec: List[T=>T]) {
   def andThen(op: T => T): T=>T
   def dot(opVec: List[T=>T])(implicit num: Numeric[T]): T => T
   def map(op: T => T): List[T=>T]
   ...
}

The dot product is defined as the product of two vectors functions. For example in a function vector spaces of dimension 3, v(f,g,h) and v'(f',g',h')
    dot: (x,y,z) -> f o f'(x,y,z) + g o g'(x,y,z) + h o h'(x,y,z)
The method andThen composes this function vector with a function op and generates a 'cumulative' composed function as
    f o g o h o op
The iteration along the function vector is implemented as a tail recursion. The method relies on the List[T=>T].andThen method (line 6).

1
2
3
4
5
6
7
8
9
def andThen(g: T => T): T => T = {

   @scala.annotation.tailrec
   def andThen(xsf: List[T=>T])(op: T => T): T => T =
      if(xsf.size == 0) op 
      else andThen(xsf.drop(1))(xsf.head andThen op)
   
  andThen(fVec.reverse)(op)
}

The 'dot' product takes another function vector of same size as argument. The two vectors are zipped to generate a Iterable[T=>T] vector of composed function elements using a map. Finally the resulting dot function is computed by generating a new function with summation of the elements of the composed functions vector. The summation requires the instance of Numeric to be declared as an implicit argument.

def dot(gVec: List[T=>T])(implicit num: Numeric[T]): T => T = { 
 
    val composed = fVec.zip(opVec).map(fg => fg._1.andThen(fg._2))
    (t: T) => composed.map( _(t)).sum
}

Contrary to the andThen method, the map convert a function vector to another function vector by applying a natural transformation.

  // return List[T => T]
def map(op: T => T) = fVec.map( _ andThen op) 
 

Finally, the definition of the dot product can be extended to any aggregation method aggr

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def dot(
  opVec: List[T=>T], 
  aggr: (T =>T, T=>T) =>  (T=>T)): T => T = {
  
    val composed = fVec.zip(opVec).map( 
        fg => fg._1 andThen fg._2
    )
  
    (t: T) => composed./:((t: T) => t)(
        (h,f) =>aggr(h,f)
    )(t)
}

The aggregation method passed as argument of the dot method (line 3) is a simplified version of the aggregation defined in the Scala standard library. It is used to implement the dot method on the composed functions vectors (line 10). 

Let's apply our new found knowledge about FunctionVector:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
val functionsList = List[Function1[Double,Double]](
   Math.sin, Math.sqrt
)
   
val vec = new FunctionVector[Double](functionsList)
val output1 = vec.andThen((x: Double) => x*x)
val opVec = List[Double=>Double](
 (x: Double)=> x+ 1.0, 
 (x: Double)=> Math.exp(-x)
)
val output2 = vec.dot(opVec)

For evaluating our Function vectors classes, we used a list of functions of type Double => Double (line 1): a sinusoidal and a square root functions (line 2).
Once the function vector is created (line 5), it is available to be composed with the existing list of functions of the class FunctionVector (lines 5 and 6).


References

Monday, January 16, 2017

Histogram-Based Approximation Using Apache Spark

Target audience: Intermediate
Estimated reading time: 5'

This post illustrates the histogram-based function approximation for very large data sets using Apache Spark and Scala.

Follow me on LinkedIn

Note: Implementation relies on Scala 2.11.8, Spark 2.0

Overview

The previous post introduces and describes a model to approximate or fit a function to a given data set, using an histogram implemented in Scala. (see Histogram-based approximation using Scala).
This post re-implements the dynamic resizable histogram using Apache Spark with limited changes in the original code base.


Histogram

The Apache Spark version of histogram-based approximation reuses the classes Bin and Histogram described in the previous post. We made some minor changes to the constructors of the Histogram. As a reminder the Histogram class was defined as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
final class Histogram(var step: Double, 
     private var values: mutable.ArrayBuffer[Bin]) {
  var min = values.head._x - 0.5*step
  var max = values.last._x + 0.5*step

  private val maxNumBins = (values.length+1)<<RADIX

  def this(first: Double, step: Double, initNumBins: Int) = 
      this(step, Range(0, initNumBins-1)
      .scanLeft(first)((s, n) => s + step)
      ./:(new mutable.ArrayBuffer[Bin])(
              (vals, x) => vals += (new Bin(x)) )
       )

  def this(params: HistogramParams) = 
      this(params.first, params.step, params.initNumBins)
  
  def train(x: Double, y: Double): Unit
  final def predict(x: Double): Double
   // ….
}

The class Histogram has three constructors, for convenience's sake: 
  • Instantiation of an histogram using an estimate of the _x value of the first bin, the width of the bins, step and the initial number of bins, initNumBins. This constructor is invoked for the first batch of data .
  • Instantiation of an histogram using an initial given step and an existing array of bins/buckets of width step. This constructor is invoked for subsequent batch of data (line 8-12).
  • Instantiation of an histogram using its parameters params (line 15).
The parameters for the histogram are encapsulated in the following HistogramParams class described in a previous post.

I added an implementation of the concatenation of array bins which is required by the reducer implemented by a fold action on Spark data nodes (line 6)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
object Histogram {
  final val EPS = 1e-12
  final val RADIX = 8
  final def sqr(x: Double): Double = x*x

  def +(itBins1: Array[Bin], itBins2: Array[Bin]): Array[Bin]={
    val histBins = new mutable.ArrayBuffer[Bin]
    itBins1.scanLeft(histBins)((newBins, bin) => newBins +=bin)
    itBins2.scanLeft(histBins)((newBins, bin) => newBins +=bin)
    histBins.toArray
  }
}


Test data generation

For testing purpose, we need to generate a data set of very large (or even infinite) size. For this purpose we load an existing data set from a file which is duplicated then modified with a random uniform noise to create a large number of batch to be recursively processed by Spark data nodes. 

The amplitude of the randomization (synthetic noise) is significant enough to generate different data point while preserving the original data pattern so the algorithm can be easily validated.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final class DataGenerator(sourceName: String, nTasks: Int) {
  private final val DELIM = " "
  private final val RATIO = 0.05
  var datasetSize: Int = _

    // Generate the RDD from an existing data set template
    //  and a noise function
  def apply(sc: SparkContext): RDD[(Float, Float)] = {
    val r = new Random(System.currentTimeMillis + 
           Random.nextLong)
    val src = Source.fromFile(sourceName)
    val input = src.getLines.map(_.split(DELIM))
      ./:(new mutable.ArrayBuffer[(Float, Float)])
        ((buf, xy) => {
         val x = addNoise(xy(0).trim.toFloat, r)
         val y = addNoise(xy(1).trim.toFloat, r)
         buf += ((x, y))
    })

    datasetSize = input.size
    val data_rdd = sc.makeRDD(input, nTasks)
    src.close
    data_rdd
  }

  private def addNoise(value: Float, r: Random): Float = 
      value*(1.0 + RATIO*(r.nextDouble - 0.5)).toFloat
}

The data is generated for a specific number of tasks nTasks and loading the data from a local file sourceName (line 1). The generation of the histogram is implemented by the method apply. Once loaded from file (line 12), the input is generated locally as an array of (Float, Float) (line 12 -13).
Next, the algorithm adds randomization addNoise (line 15, 16) before converting into a RDD (line 21).


Apache Spark client

The first step is to create a Spark configuration and context for a given number of concurrent tasks.

1
2
3
4
5
6
val numTasks: Int = 12

val conf = new SparkConf().setAppName("Simple Application")
   .setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

The main goal of the Spark driver is to apply a tail recursive function across all the batches of data. The histogram is computed/updated on the Spark data node, then reduced to a single histogram which is then broadcasted to all the nodes to be updated with the next batch of data.


The RDDs are generated by an instance of the DataGenerator class as input (line 1). The input is loaded recursively into multiple partitions using the Spark method mapPartitionsWithIndex (line 11). Each partition generates a new array of bins (line30) once the class Histogram is instantiated (line 17) then trained (line 24). The data is then grouped by partition index (line 28). finally the fold method is invoked to generate the array of bins (line 30).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
val input = new DataGenerator("small.data", numTasks)

@scala.annotation.tailrec
def execute(iter: Int, 
    initParam_brdcast: Broadcast[HistogramParams]): ArrayBuffer[Bin]={
  val initParam = initParam_brdcast.value

  if( iter <= 0)
    initParam.values
  else {
    val hist_rdd = input(sc).mapPartitionsWithIndex(
     (idx: Int,  it: Iterator[(Float, Float)]) => {

      // The first invocation used the broadcasted parameters for
      //  the histogram and the subsequent invocation update 
      // the distribution of histogram bins
      val histogram =
        if (initParam.values.length == 0) 
           new Histogram(initParam)
        else 
           new Histogram(step, initParam.values)

      // Train the histogram with a new batch of data
     it.foreach { case (x, y) => histogram.train(x, y) }
         
     histogram.getValues.map((idx, _)).iterator
     })
     .groupBy(_._1)
     .map { case (n, itBins) => itBins.map(_._2).toArray }
     .fold(Array[Bin]())(Histogram +)

    execute(iter-1, sc.broadcast(
     HistogramParams(0.0, initParam.step, 0, new ArrayBuffer[Bin]++hist_rdd)
    )
  }
}

Note:
The invocation of the recursive method execute on the Apache Spark driver (line 4) broadcasts the current (recently updated) parameters of the histogram initParms_brdcast as one of the arguments of the recursive call (line 5). Upon receiving the broadcast values, the partitions load the next batch of data, generates and processes the RDD input.apply(sc) and update the histogram parameters (line 11).

Evaluation

Let's write a test driver to generate the Histogram using Spark RDD. After their initialization (line 1-4), the parameters of the histogram are simply broadcasted to the remote data/worker nodes (line 7,8).

1
2
3
4
5
6
7
8
9
val initNumBins = 2048
val range = (-1.0, 2.0)
var step = (range._2 - range._1)/initNumBins
val maxRecursions = 4
   
val hist = execute(maxRecursions, 
    sc.broadcast(
         HistogramParams(range._1 + 0.5 * step, step, initNumBins))
  )


References

Friday, November 18, 2016

Recursive Mean & Standard Deviation in Scala

Target audience: Intermediate
Estimated reading time: 3'

Have you considered the possibility of accelerating statistical computations? We're introducing the concept of tail recursion as a method to enhance the efficiency of calculating mean and standard deviation.


Table of contents
Follow me on LinkedIn
Note: Scala version 2.11.8

Overview

The computation of the mean and standard deviation of a very large data set may cause overflow of the summation of values. Scala tail recursion is a very good alternative to compute mean and standard deviation for data set of unlimited size.

Direct computation

There are many ways to compute the standard deviation through summation. The first mathematical expression consists of the sum the difference between each data point and the mean.
\[\sigma =\sqrt{\frac{\sum_{0}^{n-1}(x-\mu )^{2}}{n}}\]
The second formula allows to update the mean and standard deviation with any new data point (online computation).
\[\sigma =\sqrt{\frac{1}{n}\sum_{0}^{n-1}x^{2}-{\mu ^{2}}}\]
This second approach relies on the computation the sum of square values that can overflow


1
2
3
4
val x = Array[Double]( /* ... */ )
val mean = x.sum/x.length
val stdDev = Math.sqrt((x.map( _ - mean)
    .map(t => t*t).sum)/x.length)

A reduceLeft can be used as an alternative of map{ ... }.sum for the computation of the standard deviation (line 3).

Recursive computation

There is actually no need to compute the sum and the sum of squared values to compute the mean and standard deviation. The mean and standard deviation for n observations can be computed from the mean and standard deviation of n-1 observations.
The recursive formula for the mean is
\[\mu _{n}=\left ( 1-\frac{1}{n} \right )\mu _{n-1}+\frac{x_{n}}{n}\] The recursive formula for the standard deviation is
\[\varrho _{n}=\varrho _{n-1}+(x_{n}-\mu _{n})(x_{n}-\mu _{n-1}) \ \ \ \ \sigma _{n} = \sqrt{\frac{\varrho _{n}}{n}}\] 
Let's implement these two recursive formula in Scala using the tail recursion (line 4).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def meanStd(x: Array[Double]): (Double, Double) ={
    
  @scala.annotation.tailrec
  def meanStd(
      x: Array[Double], 
      mu: Double, 
      Q: Double, 
      count: Int): (Double, Double) = 
    if (count >= x.length) (mu, Math.sqrt(Q/x.length))
    else {
      val newCount = count +1
      val newMu = x(count)/newCount + mu * (1.0 - 1.0/newCount)
      val newQ = Q + (x(count) - mu)*(x(count) - newMu)
      meanStd(x, newMu, newQ, newCount)
    }
  
  meanStd(x, 0.0, 0.0, 0)
}

This implementation update the mean and the standard deviation for each new data point simultaneously. The recursion exits when all elements have been accounted for (line 9).

References