Showing posts with label Histogram. Show all posts
Showing posts with label Histogram. Show all posts

Monday, January 16, 2017

Histogram-Based Approximation Using Apache Spark

Target audience: Intermediate
Estimated reading time: 5'

This post illustrates the histogram-based function approximation for very large data sets using Apache Spark and Scala.

Follow me on LinkedIn

Note: Implementation relies on Scala 2.11.8, Spark 2.0

Overview

The previous post introduces and describes a model to approximate or fit a function to a given data set, using an histogram implemented in Scala. (see Histogram-based approximation using Scala).
This post re-implements the dynamic resizable histogram using Apache Spark with limited changes in the original code base.


Histogram

The Apache Spark version of histogram-based approximation reuses the classes Bin and Histogram described in the previous post. We made some minor changes to the constructors of the Histogram. As a reminder the Histogram class was defined as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
final class Histogram(var step: Double, 
     private var values: mutable.ArrayBuffer[Bin]) {
  var min = values.head._x - 0.5*step
  var max = values.last._x + 0.5*step

  private val maxNumBins = (values.length+1)<<RADIX

  def this(first: Double, step: Double, initNumBins: Int) = 
      this(step, Range(0, initNumBins-1)
      .scanLeft(first)((s, n) => s + step)
      ./:(new mutable.ArrayBuffer[Bin])(
              (vals, x) => vals += (new Bin(x)) )
       )

  def this(params: HistogramParams) = 
      this(params.first, params.step, params.initNumBins)
  
  def train(x: Double, y: Double): Unit
  final def predict(x: Double): Double
   // ….
}

The class Histogram has three constructors, for convenience's sake: 
  • Instantiation of an histogram using an estimate of the _x value of the first bin, the width of the bins, step and the initial number of bins, initNumBins. This constructor is invoked for the first batch of data .
  • Instantiation of an histogram using an initial given step and an existing array of bins/buckets of width step. This constructor is invoked for subsequent batch of data (line 8-12).
  • Instantiation of an histogram using its parameters params (line 15).
The parameters for the histogram are encapsulated in the following HistogramParams class described in a previous post.

I added an implementation of the concatenation of array bins which is required by the reducer implemented by a fold action on Spark data nodes (line 6)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
object Histogram {
  final val EPS = 1e-12
  final val RADIX = 8
  final def sqr(x: Double): Double = x*x

  def +(itBins1: Array[Bin], itBins2: Array[Bin]): Array[Bin]={
    val histBins = new mutable.ArrayBuffer[Bin]
    itBins1.scanLeft(histBins)((newBins, bin) => newBins +=bin)
    itBins2.scanLeft(histBins)((newBins, bin) => newBins +=bin)
    histBins.toArray
  }
}


Test data generation

For testing purpose, we need to generate a data set of very large (or even infinite) size. For this purpose we load an existing data set from a file which is duplicated then modified with a random uniform noise to create a large number of batch to be recursively processed by Spark data nodes. 

The amplitude of the randomization (synthetic noise) is significant enough to generate different data point while preserving the original data pattern so the algorithm can be easily validated.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final class DataGenerator(sourceName: String, nTasks: Int) {
  private final val DELIM = " "
  private final val RATIO = 0.05
  var datasetSize: Int = _

    // Generate the RDD from an existing data set template
    //  and a noise function
  def apply(sc: SparkContext): RDD[(Float, Float)] = {
    val r = new Random(System.currentTimeMillis + 
           Random.nextLong)
    val src = Source.fromFile(sourceName)
    val input = src.getLines.map(_.split(DELIM))
      ./:(new mutable.ArrayBuffer[(Float, Float)])
        ((buf, xy) => {
         val x = addNoise(xy(0).trim.toFloat, r)
         val y = addNoise(xy(1).trim.toFloat, r)
         buf += ((x, y))
    })

    datasetSize = input.size
    val data_rdd = sc.makeRDD(input, nTasks)
    src.close
    data_rdd
  }

  private def addNoise(value: Float, r: Random): Float = 
      value*(1.0 + RATIO*(r.nextDouble - 0.5)).toFloat
}

The data is generated for a specific number of tasks nTasks and loading the data from a local file sourceName (line 1). The generation of the histogram is implemented by the method apply. Once loaded from file (line 12), the input is generated locally as an array of (Float, Float) (line 12 -13).
Next, the algorithm adds randomization addNoise (line 15, 16) before converting into a RDD (line 21).


Apache Spark client

The first step is to create a Spark configuration and context for a given number of concurrent tasks.

1
2
3
4
5
6
val numTasks: Int = 12

val conf = new SparkConf().setAppName("Simple Application")
   .setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

The main goal of the Spark driver is to apply a tail recursive function across all the batches of data. The histogram is computed/updated on the Spark data node, then reduced to a single histogram which is then broadcasted to all the nodes to be updated with the next batch of data.


The RDDs are generated by an instance of the DataGenerator class as input (line 1). The input is loaded recursively into multiple partitions using the Spark method mapPartitionsWithIndex (line 11). Each partition generates a new array of bins (line30) once the class Histogram is instantiated (line 17) then trained (line 24). The data is then grouped by partition index (line 28). finally the fold method is invoked to generate the array of bins (line 30).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
val input = new DataGenerator("small.data", numTasks)

@scala.annotation.tailrec
def execute(iter: Int, 
    initParam_brdcast: Broadcast[HistogramParams]): ArrayBuffer[Bin]={
  val initParam = initParam_brdcast.value

  if( iter <= 0)
    initParam.values
  else {
    val hist_rdd = input(sc).mapPartitionsWithIndex(
     (idx: Int,  it: Iterator[(Float, Float)]) => {

      // The first invocation used the broadcasted parameters for
      //  the histogram and the subsequent invocation update 
      // the distribution of histogram bins
      val histogram =
        if (initParam.values.length == 0) 
           new Histogram(initParam)
        else 
           new Histogram(step, initParam.values)

      // Train the histogram with a new batch of data
     it.foreach { case (x, y) => histogram.train(x, y) }
         
     histogram.getValues.map((idx, _)).iterator
     })
     .groupBy(_._1)
     .map { case (n, itBins) => itBins.map(_._2).toArray }
     .fold(Array[Bin]())(Histogram +)

    execute(iter-1, sc.broadcast(
     HistogramParams(0.0, initParam.step, 0, new ArrayBuffer[Bin]++hist_rdd)
    )
  }
}

Note:
The invocation of the recursive method execute on the Apache Spark driver (line 4) broadcasts the current (recently updated) parameters of the histogram initParms_brdcast as one of the arguments of the recursive call (line 5). Upon receiving the broadcast values, the partitions load the next batch of data, generates and processes the RDD input.apply(sc) and update the histogram parameters (line 11).

Evaluation

Let's write a test driver to generate the Histogram using Spark RDD. After their initialization (line 1-4), the parameters of the histogram are simply broadcasted to the remote data/worker nodes (line 7,8).

1
2
3
4
5
6
7
8
9
val initNumBins = 2048
val range = (-1.0, 2.0)
var step = (range._2 - range._1)/initNumBins
val maxRecursions = 4
   
val hist = execute(maxRecursions, 
    sc.broadcast(
         HistogramParams(range._1 + 0.5 * step, step, initNumBins))
  )


References

Thursday, September 10, 2015

Histogram-Based Approximation in Scala

Target audience: Beginner
Estimated reading time: 4'


Introduction to a simple function approximation algorithm using a dynamically resizable histogram in Scala.


Table of contents


Overview

A typical function approximation consists of finding a model that fit a given data set. Let's consider the following data set {x, y} for which a simple model f: y = f(x) has to be approximated.


The black dots represent the data set and the red line the model y = f(x)
There are multiple approaches to approximate a model or a function to fit a given data set. The list includes

  • Splines
  • Least square regression
  • Levenberg-Marquardt
  • K-nearest neighbors
  • Histogram
  • Polynomial interpolation
  • Logistic regression
  • Neural Networks
  • Tensor products
  • ... and many more
Interpolation using dynamically resizable histograms is a very simple and effective method to approximate a data set.

Histogram class

An histogram consists of array or sequence of bins. A bin is defined with three parameters:
  • _x Center of the bin
  • _y Average value for the bin
  • count Number of data points in the bin
The distribution of histogram bins is illustrated in the following chart

Let's look at an implementation of the Bin class. The constructor takes two values:
  • _x mid point of the bin (or bucket)
  • _y current frequency for this bin
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Bin(
   var _x: Double, 
   var _y: Double =0.0) extends Serializable {
  var count: Int = 0

  def += (y: Double): Unit = {
    val newCount = count + 1
    _y = if(count == 0) y else (_y*count + y)/newCount
    count = newCount
  }
 
  def + (next: Bin): this.type = {
    val newCount = count + next.count
    if(newCount > 0) 
     _y = (_y*count + next._y*next.count)/newCount
    this
  }

  def + (next: Array[Bin]): this.type = {
    val newCount = next.aggregate(count)(
      (s,nxt) => s +nxt.count, _ + _
    )
    if( newCount > 0) {
      _y = next./:(_y*count)(
         (s, nxt) => s + nxt._y*nxt.count)/newCount
      count = newCount
    }
    this
  }
}

The method += (y: Double): Unit adds a new value y for this bin (line 6). It recomputes the average frequency _y (line 8). The method + (next: Bin): this.type (line 12) adds the content of another bin, next to this bin (line 15). Finally, the method + (next: Array[Bin]): this.type (line 19) merges an array of bins into this bin (line 23-26).
Next let's create a class, Histogram (line 1) to manage the array of bins. The constructor for the histogram has four parameters
  • maxNumBins maximum number of bin (line 2)
  • min expected minimum value in the data set (line 3)
  • max expected maximum value in the data set (line 4)
  • optional smoothing weights (line 5)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final class Histogram(
    maxNumBins: Long, 
    var min: Double = -1.0, 
    var max: Double = 1.0, 
    weights: Array[Float] = Array.empty[Float]) {
 
  val initNumBins: Int = (maxNumBins>>RADIX).toInt
  private var step = (max-min)/initNumBins
  
  private[this] var values = Range(0, initNumBins-1)
        .scanLeft(min + 0.5*step)((s, n) => s + step)
        ./:(new ArrayBuffer[Bin])(
          (vals, x) => vals += (new Bin(x)) 
        )
 
  def train(x: Double, y: Double): Unit = {
    <<(x)
    values(index(x)) += y
    if( values.size >= maxNumBins) >>
   }
 
   final def predict(x: Double): Double = {
      if( x < min || x > max) Double.NaN 
      else if(weights.length > 0) smoothPredict(x)
      else values(index(x))._y
   }
    // ... ancillary methods
 }

Implementation

The two main methods are
  • train (line 16) which updates a model (histogram) with each new data point from the training set. The histogram expands when a new data point exceeds the current boundary (min, max) of the histogram (line 19).
  • predict (line 22) which predicts the value y of the new observation x. The prediction relies on an interpolation (weighted moving average) (line 24) in the case the user specifies an array of weights in the histogram constructor.
The method Histogram.index extracts the index of the bin containing a data point (x, y).

1
2
3
4
5
6
private def index(x: Double): Int = {
    val idx = ((x - min)/step).floor.toInt
    if( idx < 1) 0 
    else if (idx >= values.size) values.size -1 
    else idx
}

This implementation of the dynamically resizable histogram, the array of bins expends by adding new bins if the new data point from the training set has a x value that is either greater that the current maximum value or lower than the current minimum value. The width of the bins, step does not change, only the current number of bins. The number of bins expanded until the maximum number of bins, maxNumBins. The method Histogram.<< Implements the expansion of the histogram for a constant bin width.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
private def >>(x: Double): Unit = 
  if(x > max) {
    values.appendAll(generate(max, x))
    max = values.last._x + step*0.5
  }
  else if(x < min) {
    values.prependAll(generate(min, x))
    min = values.head._x - step*0.5
  }
 
final def generate(limit: Double, x: Double): Array[Bin] = 
  if( x > limit) {
     val nBins = ((x-limit)/step).ceil.toInt
     var z = limit - step*0.5
     Array.tabulate(nBins)(_ => {z += step; new Bin(z) })
  }
  else {
     val nBins = ((limit-x)/step).ceil.toInt
     var z = limit + step*0.5
     Array.tabulate(nBins)(_ => {z -= step; new Bin(z) })
  }

Once the maximum number of bins maxNumBins is reached, the histogram is resized by halving the current width of the bin step. The consolidation of the histogram bins is implemented by the method Histogram.>>

1
2
3
4
private def -- : Unit = 
   values = (0 until values.size-1 by 2 ./:(new ArrayBuffer[Bin])( (ab, n) => 
             ab += (values(n) + values(n+1))
   )

Testing

Finally, the predictive method is used to compute the accuracy of the model, through the validate method. The histogram class is tested by loading a data set from file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def validate(
    hist: Histogram, 
    fileName: String, 
    eps: Double): Try[Double] = Try {
  val src = Source.fromFile(fileName)
  val fields = src.getLines.map( _.split(DELIM))
  
  val counts = fields./:((0L, 0L))((s, xy) => 
    if( Math.abs(hist.predict(xy(0).trim.toFloat)-xy(1).toFloat) < eps) 
       (s._1 + 1L, s._2 + 1L) 
    else 
       (s._1, s._2 + 1L)
  )
  
  val accuracy = counts._1.toDouble/counts._2
  src.close
  accuracy
}


References

  • Introduction to Machine Learning Chap 8 Nonparametric methods / Nonparametric density estimation E. Alpaydin- MIT press 2004
  • Scala Cookbook A. Alexander O'Reilly 2013
  • Histogram-based approximation using Apache Spark
  • Introduction to Machine Learning Chap 8 Nonparametric methods / Nonparametric density estimation E. Alpaydin- MIT press 2004
  • github.com/patnicolas