Showing posts with label statistics. Show all posts
Showing posts with label statistics. Show all posts

Sunday, February 23, 2020

Implement Kernel Density Estimator in Spark

Target audience: Intermediate
Estimated reading time: 4'

This article introduces a very powerful, non-parametric method to extract an empirical continuous probability density function from a dataset: Multivariate Kernel Density Estimation (KDE), also known as the Parzen’s windowAt its core the KDE is a smooth approximation of an histogram [ref 1] 


Table of contents

Follow me on LinkedIn
Notes
  • This article requires a basic knowledge in Apache spark MLlib framework and understanding of statistics and/or machine learning.
  • This implementation relies on Spark 2.3.1 and Scala 2.12.4

Introduction

KDE is one of the most well-known approaches to estimate the underlying probability density function of a dataset. KDE will learn the shape of the density from the data automatically. This flexibility arising from its non- parametric nature makes KDE a very popular approach for data drawn from a complicated distribution.

The KDE algorithm takes a parameter, bandwidth, that affects how “smooth” the resulting curve is. For a set of observations y, and given a kernel function K and a bandwidth, the estimation of the density function f, can be expressed as.


This post addresses the limitations of the current implementation of KDE in Apache Spark for the multi-variate features.

Spark implementation

Apache Spark is a fast and general-purpose cluster computing solution that provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.
The Apache Spark ecosystems includes a machine learning library, MLlib.

The implementation of the kernel density estimation in the current version of Apache Spark MLlib library, 2.3.1 
[ref 2]org.apache.spark.mllib.stats.KernelDensity has two important limitations:
  • It is a univariate estimation
  • The estimation is performed on a sequence of observations, not an RDD or data set, putting computation load on the Spark driver.
An example of application of KDE using Apache Spark MLlib 2.3.1 ... 

val sample = sparkSession.sparkContext.parallelize(data) 
 
val kd = new KernelDensity().setSample(sample).setBandwidth(3.0)
 
val densities = kd.estimate(Array(-2.0, 5.0))

The method setSample specifies the training set but the KDE is actually trained when the method estimate is invoked on the driver. 

Multivariate KDE

The purpose of this post is to extend the current functionality of the KDE by supporting multi-dimensional features and allows the developers to apply the estimation to a dataset. This implementation is restricted to the Normal distribution although it can easily be extended to other kernel functions. 
We assume 
  • The reference to the current Spark session is implicit (line 1)
  • The encoding of a row for serialization of the task is provided (line 1)
The method estimate has 3 arguments 
  • TrainingDS training dataset (line 9)
  • Validation validation set (line 10)
  • bandwidth size of the Parzen window
The validation set has to be broadcast to each worker nodes (line 14). This should not be a problem as the size of the validation set is expected of reasonable size. 
The training set is passed to each partitions as iterator through a mapPartitions (line 17). The probability densities and count are computed through a Scala aggregate method with a zero function of type, (Array[Double], Long) (line 23). The sequence operator invokes the multinomial normal distribution (line 29). 
  
The combiner (3rd argument of the aggregate) relies on the BLAS vectorization z = <- a.x+y dxapy (line 38). BLAS library [ref 3has 3 levels (1D, 2D and 3D arrays). Blas library
The vector of densities is scaled with invCount using the decal BLAS level 1 method (line 45).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final class KDE(implicit sparkSession: SparkSession, 
      encoder: Encoder[Row]) {
 
  /**
    * Applied the trained KDE to a set of validation data
    * @param trainingDS  Training data sets
    * @param validationRdd Validation data sets
    * @return Datasets of probability densities
    */
  def estimate(
    trainingDS: Dataset[Obs], 
    validationDS: Dataset[Obs], 
    bandwidth: Double = 1.0): Dataset[Double] = {
    import math._, sparkSession.implicits._
    val validation_brdcast = sparkSession.sparkContext
            .broadcast[Array[Obs]](validationDS.collect)

    trainingDS.mapPartitions((iter: Iterator[Obs]) => {
       val seqObs = iter.toArray
       val scale = 0.5 * seqObs.size* log(2 * Pi)
       val validation = validation_brdcast.value

       val (densities, count) = seqObs.aggregate(
         (new Array[Double](validation.length), 0L) ) (
           {        // seqOp (U, T) => U
            
             case ((x, z), y) => {
                var i = 0
                while (i < validation.length) {   
                 // Call the pdf function for the normal distribution
                    x(i) += multiNorm(y, bandwidth, scale, validation(i))
                    i += 1
                }
                (x, z + 1)  // Update  count & validation values
             }
          },
          {         // combOp: (U, U) => U
             case ((u, z), (v, t)) => { 
                // Combiner calls vectorization z <- a.x + y
                blas.daxpy(validation.length, 1.0, v, 1, u, 1)
                (u, z + t)
             }
          }
      )

      val invCount: Double = 1.0 / count
      blas.dscal(validation.length, invCount, densities, 1)  
          // Rescale the density using LINPACK z <- a.x
      densities.iterator
    })
  }
}

The companion singleton is used to define the multinomial normal distribution (line 5). The type of observations (feature) is Array[Double].

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
final object KDE {
    import math._
    type Obs = Array[Double]

    @throws(classOf[IllegalArgumentException])
    def multiNorm(
         means: Obs, 
         bandWidth: Double, 
         scale: Double, 
         x: Obs): Double = {
      require(x.length == means.length, 
           "Dimension of means and observations differs")

       exp(
          -scale - (0 until means.length).map( 
             n => {
                val sx = (means(n) - x(n)) / bandWidth
                -0.5 * sx * sx
             }
       ).sum
    )
  }
}

Application

This simple application requires that the spark context (SparkSession) to be defined as well as an explicit encoding of Row using Kryo serializer. The implicit conversion are made available by importing sparkSession.implicits.
The training set is a sequence of key-value pairs (lines 3-14). The validation set is synthetically generated by multiplying the data in the training value with 2.0 (line 17).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
implicit val sparkSession: SparkSession =    
       confToSessionFromFile(LocalSparkConf)
implicit val encoder = Encoders.kryo[Row]
import sparkSession.implicits._

val trainingData = Seq[(String, Array[Double])](
     ("A", Array[Double](1.0, 0.6)), ("B", Array[Double](2.0, 0.6)), 
     ("C", Array[Double](1.5, 9.5)), ("D", Array[Double](4.5, 0.7)), 
     ("E", Array[Double](0.4, 1.5)), ("F", Array[Double](2.1, 0.6)),
     ("G", Array[Double](0.5, 6.3)), ("H", Array[Double](1.5, 0.1)), 
     ("I", Array[Double](1.2, 3.0)), ("B", Array[Double](3.1, 1.1))
  ).toDS

  val validationData = trainingData
      .map { case (key, values) => values.map(_ *2.0) }

  val kde = new KDE
  val result = kde.estimate(trainingData.map(_._2),validationData)

  println(s"result: ${result.collect.mkString(", ")}")

  sparkSession.close


  val data = Seq[Double](1.0, 5.6)
  val sample = sparkSession.sparkContext.parallelize(data)
  val kd = new KernelDensity().setSample(sample) .setBandwidth(3.0)
  val densities = kd.estimate(Array(-2.0, 5.0))


Note: There are excellent research papers highlighting the statistical foundation behind KDE as well as recent advances [ref 4].

Thank you for reading this article. For more information ...

References

[3BLAS
Environment Scala: 2.12.4,  Java JDK 1.8, Apache Spark 2.3.1, OpenBLAS 0.3.4


---------------------------

Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Monday, January 16, 2017

Histogram-Based Approximation Using Apache Spark

Target audience: Intermediate
Estimated reading time: 5'

This post illustrates the histogram-based function approximation for very large data sets using Apache Spark and Scala.

Follow me on LinkedIn

Note: Implementation relies on Scala 2.11.8, Spark 2.0

Overview

The previous post introduces and describes a model to approximate or fit a function to a given data set, using an histogram implemented in Scala. (see Histogram-based approximation using Scala).
This post re-implements the dynamic resizable histogram using Apache Spark with limited changes in the original code base.


Histogram

The Apache Spark version of histogram-based approximation reuses the classes Bin and Histogram described in the previous post. We made some minor changes to the constructors of the Histogram. As a reminder the Histogram class was defined as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
final class Histogram(var step: Double, 
     private var values: mutable.ArrayBuffer[Bin]) {
  var min = values.head._x - 0.5*step
  var max = values.last._x + 0.5*step

  private val maxNumBins = (values.length+1)<<RADIX

  def this(first: Double, step: Double, initNumBins: Int) = 
      this(step, Range(0, initNumBins-1)
      .scanLeft(first)((s, n) => s + step)
      ./:(new mutable.ArrayBuffer[Bin])(
              (vals, x) => vals += (new Bin(x)) )
       )

  def this(params: HistogramParams) = 
      this(params.first, params.step, params.initNumBins)
  
  def train(x: Double, y: Double): Unit
  final def predict(x: Double): Double
   // ….
}

The class Histogram has three constructors, for convenience's sake: 
  • Instantiation of an histogram using an estimate of the _x value of the first bin, the width of the bins, step and the initial number of bins, initNumBins. This constructor is invoked for the first batch of data .
  • Instantiation of an histogram using an initial given step and an existing array of bins/buckets of width step. This constructor is invoked for subsequent batch of data (line 8-12).
  • Instantiation of an histogram using its parameters params (line 15).
The parameters for the histogram are encapsulated in the following HistogramParams class described in a previous post.

I added an implementation of the concatenation of array bins which is required by the reducer implemented by a fold action on Spark data nodes (line 6)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
object Histogram {
  final val EPS = 1e-12
  final val RADIX = 8
  final def sqr(x: Double): Double = x*x

  def +(itBins1: Array[Bin], itBins2: Array[Bin]): Array[Bin]={
    val histBins = new mutable.ArrayBuffer[Bin]
    itBins1.scanLeft(histBins)((newBins, bin) => newBins +=bin)
    itBins2.scanLeft(histBins)((newBins, bin) => newBins +=bin)
    histBins.toArray
  }
}


Test data generation

For testing purpose, we need to generate a data set of very large (or even infinite) size. For this purpose we load an existing data set from a file which is duplicated then modified with a random uniform noise to create a large number of batch to be recursively processed by Spark data nodes. 

The amplitude of the randomization (synthetic noise) is significant enough to generate different data point while preserving the original data pattern so the algorithm can be easily validated.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final class DataGenerator(sourceName: String, nTasks: Int) {
  private final val DELIM = " "
  private final val RATIO = 0.05
  var datasetSize: Int = _

    // Generate the RDD from an existing data set template
    //  and a noise function
  def apply(sc: SparkContext): RDD[(Float, Float)] = {
    val r = new Random(System.currentTimeMillis + 
           Random.nextLong)
    val src = Source.fromFile(sourceName)
    val input = src.getLines.map(_.split(DELIM))
      ./:(new mutable.ArrayBuffer[(Float, Float)])
        ((buf, xy) => {
         val x = addNoise(xy(0).trim.toFloat, r)
         val y = addNoise(xy(1).trim.toFloat, r)
         buf += ((x, y))
    })

    datasetSize = input.size
    val data_rdd = sc.makeRDD(input, nTasks)
    src.close
    data_rdd
  }

  private def addNoise(value: Float, r: Random): Float = 
      value*(1.0 + RATIO*(r.nextDouble - 0.5)).toFloat
}

The data is generated for a specific number of tasks nTasks and loading the data from a local file sourceName (line 1). The generation of the histogram is implemented by the method apply. Once loaded from file (line 12), the input is generated locally as an array of (Float, Float) (line 12 -13).
Next, the algorithm adds randomization addNoise (line 15, 16) before converting into a RDD (line 21).


Apache Spark client

The first step is to create a Spark configuration and context for a given number of concurrent tasks.

1
2
3
4
5
6
val numTasks: Int = 12

val conf = new SparkConf().setAppName("Simple Application")
   .setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

The main goal of the Spark driver is to apply a tail recursive function across all the batches of data. The histogram is computed/updated on the Spark data node, then reduced to a single histogram which is then broadcasted to all the nodes to be updated with the next batch of data.


The RDDs are generated by an instance of the DataGenerator class as input (line 1). The input is loaded recursively into multiple partitions using the Spark method mapPartitionsWithIndex (line 11). Each partition generates a new array of bins (line30) once the class Histogram is instantiated (line 17) then trained (line 24). The data is then grouped by partition index (line 28). finally the fold method is invoked to generate the array of bins (line 30).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
val input = new DataGenerator("small.data", numTasks)

@scala.annotation.tailrec
def execute(iter: Int, 
    initParam_brdcast: Broadcast[HistogramParams]): ArrayBuffer[Bin]={
  val initParam = initParam_brdcast.value

  if( iter <= 0)
    initParam.values
  else {
    val hist_rdd = input(sc).mapPartitionsWithIndex(
     (idx: Int,  it: Iterator[(Float, Float)]) => {

      // The first invocation used the broadcasted parameters for
      //  the histogram and the subsequent invocation update 
      // the distribution of histogram bins
      val histogram =
        if (initParam.values.length == 0) 
           new Histogram(initParam)
        else 
           new Histogram(step, initParam.values)

      // Train the histogram with a new batch of data
     it.foreach { case (x, y) => histogram.train(x, y) }
         
     histogram.getValues.map((idx, _)).iterator
     })
     .groupBy(_._1)
     .map { case (n, itBins) => itBins.map(_._2).toArray }
     .fold(Array[Bin]())(Histogram +)

    execute(iter-1, sc.broadcast(
     HistogramParams(0.0, initParam.step, 0, new ArrayBuffer[Bin]++hist_rdd)
    )
  }
}

Note:
The invocation of the recursive method execute on the Apache Spark driver (line 4) broadcasts the current (recently updated) parameters of the histogram initParms_brdcast as one of the arguments of the recursive call (line 5). Upon receiving the broadcast values, the partitions load the next batch of data, generates and processes the RDD input.apply(sc) and update the histogram parameters (line 11).

Evaluation

Let's write a test driver to generate the Histogram using Spark RDD. After their initialization (line 1-4), the parameters of the histogram are simply broadcasted to the remote data/worker nodes (line 7,8).

1
2
3
4
5
6
7
8
9
val initNumBins = 2048
val range = (-1.0, 2.0)
var step = (range._2 - range._1)/initNumBins
val maxRecursions = 4
   
val hist = execute(maxRecursions, 
    sc.broadcast(
         HistogramParams(range._1 + 0.5 * step, step, initNumBins))
  )


References