Showing posts with label mapPartitions. Show all posts
Showing posts with label mapPartitions. Show all posts

Monday, January 16, 2017

Histogram-Based Approximation Using Apache Spark

Target audience: Intermediate
Estimated reading time: 5'

This post illustrates the histogram-based function approximation for very large data sets using Apache Spark and Scala.

Follow me on LinkedIn

Note: Implementation relies on Scala 2.11.8, Spark 2.0

Overview

The previous post introduces and describes a model to approximate or fit a function to a given data set, using an histogram implemented in Scala. (see Histogram-based approximation using Scala).
This post re-implements the dynamic resizable histogram using Apache Spark with limited changes in the original code base.


Histogram

The Apache Spark version of histogram-based approximation reuses the classes Bin and Histogram described in the previous post. We made some minor changes to the constructors of the Histogram. As a reminder the Histogram class was defined as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
final class Histogram(var step: Double, 
     private var values: mutable.ArrayBuffer[Bin]) {
  var min = values.head._x - 0.5*step
  var max = values.last._x + 0.5*step

  private val maxNumBins = (values.length+1)<<RADIX

  def this(first: Double, step: Double, initNumBins: Int) = 
      this(step, Range(0, initNumBins-1)
      .scanLeft(first)((s, n) => s + step)
      ./:(new mutable.ArrayBuffer[Bin])(
              (vals, x) => vals += (new Bin(x)) )
       )

  def this(params: HistogramParams) = 
      this(params.first, params.step, params.initNumBins)
  
  def train(x: Double, y: Double): Unit
  final def predict(x: Double): Double
   // ….
}

The class Histogram has three constructors, for convenience's sake: 
  • Instantiation of an histogram using an estimate of the _x value of the first bin, the width of the bins, step and the initial number of bins, initNumBins. This constructor is invoked for the first batch of data .
  • Instantiation of an histogram using an initial given step and an existing array of bins/buckets of width step. This constructor is invoked for subsequent batch of data (line 8-12).
  • Instantiation of an histogram using its parameters params (line 15).
The parameters for the histogram are encapsulated in the following HistogramParams class described in a previous post.

I added an implementation of the concatenation of array bins which is required by the reducer implemented by a fold action on Spark data nodes (line 6)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
object Histogram {
  final val EPS = 1e-12
  final val RADIX = 8
  final def sqr(x: Double): Double = x*x

  def +(itBins1: Array[Bin], itBins2: Array[Bin]): Array[Bin]={
    val histBins = new mutable.ArrayBuffer[Bin]
    itBins1.scanLeft(histBins)((newBins, bin) => newBins +=bin)
    itBins2.scanLeft(histBins)((newBins, bin) => newBins +=bin)
    histBins.toArray
  }
}


Test data generation

For testing purpose, we need to generate a data set of very large (or even infinite) size. For this purpose we load an existing data set from a file which is duplicated then modified with a random uniform noise to create a large number of batch to be recursively processed by Spark data nodes. 

The amplitude of the randomization (synthetic noise) is significant enough to generate different data point while preserving the original data pattern so the algorithm can be easily validated.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final class DataGenerator(sourceName: String, nTasks: Int) {
  private final val DELIM = " "
  private final val RATIO = 0.05
  var datasetSize: Int = _

    // Generate the RDD from an existing data set template
    //  and a noise function
  def apply(sc: SparkContext): RDD[(Float, Float)] = {
    val r = new Random(System.currentTimeMillis + 
           Random.nextLong)
    val src = Source.fromFile(sourceName)
    val input = src.getLines.map(_.split(DELIM))
      ./:(new mutable.ArrayBuffer[(Float, Float)])
        ((buf, xy) => {
         val x = addNoise(xy(0).trim.toFloat, r)
         val y = addNoise(xy(1).trim.toFloat, r)
         buf += ((x, y))
    })

    datasetSize = input.size
    val data_rdd = sc.makeRDD(input, nTasks)
    src.close
    data_rdd
  }

  private def addNoise(value: Float, r: Random): Float = 
      value*(1.0 + RATIO*(r.nextDouble - 0.5)).toFloat
}

The data is generated for a specific number of tasks nTasks and loading the data from a local file sourceName (line 1). The generation of the histogram is implemented by the method apply. Once loaded from file (line 12), the input is generated locally as an array of (Float, Float) (line 12 -13).
Next, the algorithm adds randomization addNoise (line 15, 16) before converting into a RDD (line 21).


Apache Spark client

The first step is to create a Spark configuration and context for a given number of concurrent tasks.

1
2
3
4
5
6
val numTasks: Int = 12

val conf = new SparkConf().setAppName("Simple Application")
   .setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

The main goal of the Spark driver is to apply a tail recursive function across all the batches of data. The histogram is computed/updated on the Spark data node, then reduced to a single histogram which is then broadcasted to all the nodes to be updated with the next batch of data.


The RDDs are generated by an instance of the DataGenerator class as input (line 1). The input is loaded recursively into multiple partitions using the Spark method mapPartitionsWithIndex (line 11). Each partition generates a new array of bins (line30) once the class Histogram is instantiated (line 17) then trained (line 24). The data is then grouped by partition index (line 28). finally the fold method is invoked to generate the array of bins (line 30).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
val input = new DataGenerator("small.data", numTasks)

@scala.annotation.tailrec
def execute(iter: Int, 
    initParam_brdcast: Broadcast[HistogramParams]): ArrayBuffer[Bin]={
  val initParam = initParam_brdcast.value

  if( iter <= 0)
    initParam.values
  else {
    val hist_rdd = input(sc).mapPartitionsWithIndex(
     (idx: Int,  it: Iterator[(Float, Float)]) => {

      // The first invocation used the broadcasted parameters for
      //  the histogram and the subsequent invocation update 
      // the distribution of histogram bins
      val histogram =
        if (initParam.values.length == 0) 
           new Histogram(initParam)
        else 
           new Histogram(step, initParam.values)

      // Train the histogram with a new batch of data
     it.foreach { case (x, y) => histogram.train(x, y) }
         
     histogram.getValues.map((idx, _)).iterator
     })
     .groupBy(_._1)
     .map { case (n, itBins) => itBins.map(_._2).toArray }
     .fold(Array[Bin]())(Histogram +)

    execute(iter-1, sc.broadcast(
     HistogramParams(0.0, initParam.step, 0, new ArrayBuffer[Bin]++hist_rdd)
    )
  }
}

Note:
The invocation of the recursive method execute on the Apache Spark driver (line 4) broadcasts the current (recently updated) parameters of the histogram initParms_brdcast as one of the arguments of the recursive call (line 5). Upon receiving the broadcast values, the partitions load the next batch of data, generates and processes the RDD input.apply(sc) and update the histogram parameters (line 11).

Evaluation

Let's write a test driver to generate the Histogram using Spark RDD. After their initialization (line 1-4), the parameters of the histogram are simply broadcasted to the remote data/worker nodes (line 7,8).

1
2
3
4
5
6
7
8
9
val initNumBins = 2048
val range = (-1.0, 2.0)
var step = (range._2 - range._1)/initNumBins
val maxRecursions = 4
   
val hist = execute(maxRecursions, 
    sc.broadcast(
         HistogramParams(range._1 + 0.5 * step, step, initNumBins))
  )


References

Wednesday, December 2, 2015

Fast Kullback-Leibler Divergence Using Spark

Target audience: Intermediate
Estimated reading time: 5'

In my role as a data engineer, I've often encountered the challenge of measuring the discrepancy between two probability distributions over identical variables, such as when calculating the loss function for variational auto-encoders or generative adversarial networks. 

This article delves into the Kullback-Leibler divergence and how to implement it using Apache Spark for distributed computing.

Note: The source code relies on Spark 3.3.1 and Scala 2.12.4

What you will learn: How to implement the Kullback-Leibler divergence on Apache Spark with Scala


Overview

The Kullback-Leibler divergence, originating from information theory and often linked to Information Gain, is also referred to as the relative entropy between two distributions [ref 1]. It quantifies the difference in the distribution of random values, specifically in their probability density functions.

Let's consider two distribution P and Q with probability density functions p and q
\[D_{KL}(P||Q)= - \int_{-\infty }^{+\infty }p(x).log\frac{p(x)}{q(x)}\] The formula can be interpreted as the Information lost when the distribution Q is used to approximate the distribution P
It is obvious that the computation if not symmetric: \[D_{KL}(P||Q)\neq D_{KL}(Q||P)\]The Kullback-Leibler divergence plays a role in calculating Mutual Information, a method for feature extraction, and is a member of the F-Divergences family.
In this article, we aim to:
  1. Craft an implementation of the Kullback-Leibler divergence in Scala, leveraging the Apache Spark framework for distributed computation.
  2. Showcase how the Kullback-Leibler divergence can be utilized to contrast various continuous probability density functions.

Scala implementation

First, we'll establish the Kullback-Leibler formula to assess the difference between a dataset {xi, yi} (which is likely produced by a certain random variable or distribution) and a specified continuous distribution with a probability density function, f.
We rely on the foldLeft Scala method to compute the summation of the cross entropy value for each data point [ref 2]

object KullbackLiebler {
   final val EPS = 1e-10
   type DATASET = Iterator[(Double, Double)]

   def execute(      // #1
      xy: DATASET, 
      f: Double => Double): Double = {

      val z = xy.filter{ case(x, y) => abs(y) > EPS}
      - z.foldLeft(0.0){ 
         case(s, (x, y)) => {
            val px = f(x)
            s + px*log(px/y)}
         }
      }. 

   def execute(     // #2
     xy: DATASET, 
     fs: Iterable[Double=>Double]): Iterable[Double] =  fs.map(execute(xy, _))
}

The divergence methods execute are defined as static (object methods)
The first invocation of the method execute (# 1)  computes the dissimilarity between a distribution xy with the distribution with a probability density f.
The second execute (# 2) method computes the KL divergence between a distribution xy with a sequence fs of probability density functions.

Next let's define some of the continuous random distributions used to evaluate the distribution represented by the dataset xyNormal, beta, logNormal, Gamma, LogGamma and ChiSquare

  // One Gaussian distribution
val gauss = (x: Double) => INV_PI*exp(-x*x/2.0)

   // Uniform distribution
val uniform = (x: Double) => x

  // Log normal distribution
val logNormal = (x: Double) => { 
   val lx = log(x)
   INV_PI/x*exp(-lx*lx) 
}
  // Gamma distribution
val gamma = (x: Double, n: Int) => 
    exp(-x)*pow(x, n)/fact(n)

  // Log Gamma distribution
val logGamma = (x: Double, alpha: Int, beta: Int) =>
   exp(beta*x)*exp(-exp(x)/alpha)/(pow(alpha, beta)*fact(beta-1))

  // Simple computation of m! (for beta)
def fact(m: Int): Int = if(m < 2) 1 else m*fact(m-1)

  // Normalization factor for Beta
val cBeta = (n: Int, m: Int) => {
    val f = if(n < 2) 1 else fact(n-1)
    val g = if(m < 2) 1 else fact(m-1)
    f*g/fact(n+m -1).toDouble
}

  // Beta distribution
val beta = (x: Double, alpha: Int, beta: Int) =>
   pow(x, alpha-1)*pow(x, beta-1)/cBeta(alpha, beta)

  // Chi-Square distribution
val chiSquare = (x: Double, k: Int) => {
   val k_2 = k >>1
   pow(x, k_2-1)*exp(-0.5*x) /((1 << k_2)*fact(k_2))
}

Some of these probability density functions are parameterized (i.e. log gamma has an alpha and beta parameters). The probability density functions are implemented as Scala partially applied functions with predefined parameter values, as illustrated in the following code snippet.

val gamma2 = gamma( _ : Double, 2)
val beta25 = beta(_: Double, 2, 5) val chiSquare4 = chiSquare(_: Double, 4)

Notes:
  • The implementation of the probability density functions in the code snippet is not optimized for performance. 
  • Please refer to your favorite Statistics handbook to learn more about these probability distributions.

Spark to the rescue

Apache Spark is a free, open-source framework for cluster computing, specifically designed to process data in real time via distributed computing [ref 3]. Its primary applications include:
  • Analytics: Spark's capability to quickly produce responses allows for interactive data handling, rather than relying solely on predefined queries.
  • Data Integration: Often, the data from various systems is inconsistent and cannot be combined for analysis directly. To obtain consistent data, processes like Extract, Transform, and Load (ETL) are employed. Spark streamlines this ETL process, making it more cost-effective and time-efficient.
  • Streaming: Managing real-time data, such as log files, is challenging. Spark excels in processing these data streams and can identify and block potentially fraudulent activities.
  • Machine Learning: The growing volume of data has made machine learning techniques more viable and accurate. Spark's ability to store data in memory and execute repeated queries swiftly facilitates the use of machine learning algorithms.

Computing the Kullback-Leibler divergence on a sizable dataset with many random distributions can lead to delays and considerable use of computational power. One approach to tackle this is by leveraging the Apache Spark framework for parallel processing of the divergence.

To do this:
  • Segment the primary dataset.
  • Distribute the initial sequence of probability density functions.
  • Apply the Kullback-Leibler formula to each segment using mapPartitions.
  • Retrieve and combine the divergence value from every segment.
final val NUM_DATA_POINTS = 10000000.    // #1
val numTasks: Int = 128                                   // #2
  
val conf = new SparkConf()                            // #3
     .setAppName("Kullback-Liebler")
     .setMaster(s"local[$numTasks]")

val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

Once we define the number of input data points (# 1) and the number of concurrent tasks (# 2), the Apache Spark context sc is configured with an instance of SparkConf (# 3).
Next let's implement the broadcasting mechanism for the batch or sequence of probability density functions.

lazy val pdfs = Map[Int, Double => Double](
  1 -> uniform, 
  2 -> logNormal, 
  3 -> gamma1, 
  4 -> gamma2, 
  5 -> gamma4, 
  6 -> logGamma12, 
  7 -> logGamma22, 
  8 -> beta22, 
  9 -> beta25, 
  10 -> chiSquare2, 
  11 -> chiSquare4
)

val pdfs_broadcast = sc.broadcast[Iterable[Int]](pdfs.map(_._1))

The pdfs_broadcast variable broadcasts the sequence of probability density functions, pdfs, across all partitions. 
Such value broadcasting is extensively employed in the machine learning library MLlib to send model coefficients (or weights) to the data node for gradient and loss function computations.

The mapPartitions method converts an array of value pairs {x, y} into an array of length equivalent to pdfs.size. This new array holds the KL divergence values corresponding to each distribution being assessed.

val kl_rdd  = master_rdd.mapPartitions(
  (it:DATASET) => {
     val pdfsList = pdfs_broadcast.value.map( n => pdfs.get(n).get)
     execute(it, pdfsList).iterator
  }
)

In the end, the divergence values from each partitions are collected onto the Spark master then aggregated using the Scala fold operator foldLeft.

val kl_master = kl_rdd.collect

val divergences = (0 until kl_master.size by pdfs.size)
  .foldLeft(Array.fill(pdfs.size)(0.0))( (s, n) => {
    (0 until pdfs.size).foreach(j =>
       s.update(j, kl_master(n+j)))
       s
   }
).map( _ / kl_master.length)

Conclusion

We conducted the test to gauge the capability of the Kullback-Leibler divergence in assessing the differences among multiple probability distributions. We chose the Gaussian distribution as our reference point against which other distributions were contrasted.
The comparative findings can be seen as follows:
fig 1. KL divergence between various distribution and a normal distribution


As anticipated, the Gaussian and Normal distributions show great similarity. Yet, there's a notable divergence when comparing the Gaussian with the most advanced orders of the Gamma and Beta distributions. This distinction becomes even more apparent when examining the graphs of their respective probability density functions.

References

[1] Pattern Recognition and Machine Learning - 1.6.1 Relative entropy and mutual information C. Bishop - Springer Science - 2006


-------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3