Sunday, February 23, 2020

Implement Kernel Density Estimator in Spark

Target audience: Intermediate
Estimated reading time: 4'

This article introduces a very powerful, non-parametric method to extract an empirical continuous probability density function from a dataset: Multivariate Kernel Density Estimation (KDE), also known as the Parzen’s windowAt its core the KDE is a smooth approximation of an histogram [ref 1] 


Table of contents

Follow me on LinkedIn
Notes
  • This article requires a basic knowledge in Apache spark MLlib framework and understanding of statistics and/or machine learning.
  • This implementation relies on Spark 2.3.1 and Scala 2.12.4

Introduction

KDE is one of the most well-known approaches to estimate the underlying probability density function of a dataset. KDE will learn the shape of the density from the data automatically. This flexibility arising from its non- parametric nature makes KDE a very popular approach for data drawn from a complicated distribution.

The KDE algorithm takes a parameter, bandwidth, that affects how “smooth” the resulting curve is. For a set of observations y, and given a kernel function K and a bandwidth, the estimation of the density function f, can be expressed as.


This post addresses the limitations of the current implementation of KDE in Apache Spark for the multi-variate features.

Spark implementation

Apache Spark is a fast and general-purpose cluster computing solution that provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.
The Apache Spark ecosystems includes a machine learning library, MLlib.

The implementation of the kernel density estimation in the current version of Apache Spark MLlib library, 2.3.1 
[ref 2]org.apache.spark.mllib.stats.KernelDensity has two important limitations:
  • It is a univariate estimation
  • The estimation is performed on a sequence of observations, not an RDD or data set, putting computation load on the Spark driver.
An example of application of KDE using Apache Spark MLlib 2.3.1 ... 

val sample = sparkSession.sparkContext.parallelize(data) 
 
val kd = new KernelDensity().setSample(sample).setBandwidth(3.0)
 
val densities = kd.estimate(Array(-2.0, 5.0))

The method setSample specifies the training set but the KDE is actually trained when the method estimate is invoked on the driver. 

Multivariate KDE

The purpose of this post is to extend the current functionality of the KDE by supporting multi-dimensional features and allows the developers to apply the estimation to a dataset. This implementation is restricted to the Normal distribution although it can easily be extended to other kernel functions. 
We assume 
  • The reference to the current Spark session is implicit (line 1)
  • The encoding of a row for serialization of the task is provided (line 1)
The method estimate has 3 arguments 
  • TrainingDS training dataset (line 9)
  • Validation validation set (line 10)
  • bandwidth size of the Parzen window
The validation set has to be broadcast to each worker nodes (line 14). This should not be a problem as the size of the validation set is expected of reasonable size. 
The training set is passed to each partitions as iterator through a mapPartitions (line 17). The probability densities and count are computed through a Scala aggregate method with a zero function of type, (Array[Double], Long) (line 23). The sequence operator invokes the multinomial normal distribution (line 29). 
  
The combiner (3rd argument of the aggregate) relies on the BLAS vectorization z = <- a.x+y dxapy (line 38). BLAS library [ref 3has 3 levels (1D, 2D and 3D arrays). Blas library
The vector of densities is scaled with invCount using the decal BLAS level 1 method (line 45).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final class KDE(implicit sparkSession: SparkSession, 
      encoder: Encoder[Row]) {
 
  /**
    * Applied the trained KDE to a set of validation data
    * @param trainingDS  Training data sets
    * @param validationRdd Validation data sets
    * @return Datasets of probability densities
    */
  def estimate(
    trainingDS: Dataset[Obs], 
    validationDS: Dataset[Obs], 
    bandwidth: Double = 1.0): Dataset[Double] = {
    import math._, sparkSession.implicits._
    val validation_brdcast = sparkSession.sparkContext
            .broadcast[Array[Obs]](validationDS.collect)

    trainingDS.mapPartitions((iter: Iterator[Obs]) => {
       val seqObs = iter.toArray
       val scale = 0.5 * seqObs.size* log(2 * Pi)
       val validation = validation_brdcast.value

       val (densities, count) = seqObs.aggregate(
         (new Array[Double](validation.length), 0L) ) (
           {        // seqOp (U, T) => U
            
             case ((x, z), y) => {
                var i = 0
                while (i < validation.length) {   
                 // Call the pdf function for the normal distribution
                    x(i) += multiNorm(y, bandwidth, scale, validation(i))
                    i += 1
                }
                (x, z + 1)  // Update  count & validation values
             }
          },
          {         // combOp: (U, U) => U
             case ((u, z), (v, t)) => { 
                // Combiner calls vectorization z <- a.x + y
                blas.daxpy(validation.length, 1.0, v, 1, u, 1)
                (u, z + t)
             }
          }
      )

      val invCount: Double = 1.0 / count
      blas.dscal(validation.length, invCount, densities, 1)  
          // Rescale the density using LINPACK z <- a.x
      densities.iterator
    })
  }
}

The companion singleton is used to define the multinomial normal distribution (line 5). The type of observations (feature) is Array[Double].

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
final object KDE {
    import math._
    type Obs = Array[Double]

    @throws(classOf[IllegalArgumentException])
    def multiNorm(
         means: Obs, 
         bandWidth: Double, 
         scale: Double, 
         x: Obs): Double = {
      require(x.length == means.length, 
           "Dimension of means and observations differs")

       exp(
          -scale - (0 until means.length).map( 
             n => {
                val sx = (means(n) - x(n)) / bandWidth
                -0.5 * sx * sx
             }
       ).sum
    )
  }
}

Application

This simple application requires that the spark context (SparkSession) to be defined as well as an explicit encoding of Row using Kryo serializer. The implicit conversion are made available by importing sparkSession.implicits.
The training set is a sequence of key-value pairs (lines 3-14). The validation set is synthetically generated by multiplying the data in the training value with 2.0 (line 17).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
implicit val sparkSession: SparkSession =    
       confToSessionFromFile(LocalSparkConf)
implicit val encoder = Encoders.kryo[Row]
import sparkSession.implicits._

val trainingData = Seq[(String, Array[Double])](
     ("A", Array[Double](1.0, 0.6)), ("B", Array[Double](2.0, 0.6)), 
     ("C", Array[Double](1.5, 9.5)), ("D", Array[Double](4.5, 0.7)), 
     ("E", Array[Double](0.4, 1.5)), ("F", Array[Double](2.1, 0.6)),
     ("G", Array[Double](0.5, 6.3)), ("H", Array[Double](1.5, 0.1)), 
     ("I", Array[Double](1.2, 3.0)), ("B", Array[Double](3.1, 1.1))
  ).toDS

  val validationData = trainingData
      .map { case (key, values) => values.map(_ *2.0) }

  val kde = new KDE
  val result = kde.estimate(trainingData.map(_._2),validationData)

  println(s"result: ${result.collect.mkString(", ")}")

  sparkSession.close


  val data = Seq[Double](1.0, 5.6)
  val sample = sparkSession.sparkContext.parallelize(data)
  val kd = new KernelDensity().setSample(sample) .setBandwidth(3.0)
  val densities = kd.estimate(Array(-2.0, 5.0))


Note: There are excellent research papers highlighting the statistical foundation behind KDE as well as recent advances [ref 4].

Thank you for reading this article. For more information ...

References

[3BLAS
Environment Scala: 2.12.4,  Java JDK 1.8, Apache Spark 2.3.1, OpenBLAS 0.3.4


---------------------------

Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Wednesday, December 11, 2019

Variance Annotation in Scala 2.x

Target audience: Intermediate
Estimated reading time: 3'

The purpose of this post, is to introduce the basics of parameterized classes, upper and lower bounds and variance sub-typing in Scala.
Table of contents
Overview
Follow me on LinkedIn

Note: The code associated with this article is written using Scala 2.12.4

Overview

Scala is a statically typed language similar to Java and C++. Most of those concepts have been already introduced and commonly in Java with generics and type parameters wildcard. However Scala added some useful innovation regarding sub-typing covariance and contra-variance. API designers and library developers are facing the dilemma of choosing between parameterized types and abstract types and which parameters should be invariant, covariant or contra-variant.


Sub-classing

Like in Java, generic or parameterized type have been introduced to overcome the limitation of polymorphism. Moreover, understanding of concept behind the Scala type system help developers decipher and fix the occasional compilation failure related to typing.
Let's consider a simple collection class ,Container that adds and retrieves an array of items of type Item as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class Item
case class SubItem() extends Item
 
class Container {
  def add(items: Array[Item]) : Unit =  items.foreach( add(_) )

  def retrieve(items: Array[Item]) : Unit =
    Range(0, items.size).foreach(items.update(_, retrieve))

  def add(item: Item) : Unit = { }
  def retrieve : Item = new SubItem
}

The client code can invoke the add (lines 5 & 10) and retrieve (lines 7 & 11) methods passing an array of elements of type Item. However passing an array of type SubItem inherited from Item (line 2) will generate a compiling error!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
val subItemsContainer = new Container
val array = Array[SubItem](new SubItem())
 
  // Compile error:
  // "type mismatch;  found   : Array[SubItem]  
  // required: Array[Item] 
  // Note: SubItem <: Item, but class Array is 
  // invariant in type T..."
 subItemsContainer.add(array)
 subItemsContainer.retrieve(Array[SubItem](new SubItem))


The Scala compiler throws a compiling error because the class Container does not have the adequate information about the sub type Item into SubItem in the add method (line 9). The invocation of the retrieve method, passing an argument of type SubItem generates also a compiler error. 
Scala provides developers with upper bounds and lower bounds sub-typing which is quite similar to the type variance available in Java.

Variance to the Rescue

The type T upper bounded by the type Item as defined by the following notation (line 1). T <: Item can be processed as a 'producer' of type Item. The notation provides the Scala compiler with any type inheriting Item. It should be allowed to be passed as argument of the add method (lines 2 & 7).
Similarly, the type U with a lower bound type SubItem
   U >: SubItem
can be processed as a 'consumer' operation of type SubItem, in the retrieve method (lines 4 & 8).

1
2
3
4
5
6
7
8
9
class Container[T <: Item] {
   def add(items: Array[T]) : Unit = items.foreach( add(_) )
      
   def retrieve[U >: SubItem](items: Array[U]) : Unit = 
       Range(0, items.size)foreach(items.update(_, retrieve))
  
   def add(item: T): Unit = {}
   def retrieve: U = new SubItem
}

Container[T] is a covariant class for the type Item as argument of the method add. The method retrieve[U] is contra-variant for the type SubItem.
This time around the compiler has the all the necessary information to subtype Item and super type SubItem, therefore the following code snippet won't generate a compiling error

val itemsContainer = new ContainerT[SubItem]
itemsContainer.add(Array[SubItem](new SubItem)))
itemsContainer.retrieve[Item](Array[Item](new SubItem))


Scala has a dedicated annotation for unvariance 'T', covariance '+T' and contra-variance '-T'. Using this annotation the container class can be expressed as follow.

1
2
3
4
5
6
7
8
9
class Container[+T] {
   def add(items: Array[T]) : Unit = items.foreach( add(_) )
 
   def retrieve[-U](items: Array[U]) : Unit = 
          Range(0, items.size).foreach(items.update(_, retrieve))
  
   def add(item: T) : Unit = { }
  
   def retrieve : U = new SubItem
}

In a nutshell, variances can be represented as functors:
Covariance: if (B inherit A) then (Container[B] inherit Container[A])
Contra-variance: if( B inherit A) then (Container[A] super class of Container[B])
Univariance: No rule applies.
 

The sub-typing rules above can be represented graphically 


Thank you for reading this article. For more information ...

References



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Sunday, October 20, 2019

Contextual Thompson Sampling

Target audience: Advanced
Estimated reading time: 5'

In this article, we delve into the notion of the multi-armed bandit, also known as the K-armed bandit, with a particular focus on Thompson sampling as applied to the contextual bandit.

Table of contents
       Epsilon-greedy
Follow me on LinkedIn

What you will learn: How to add contextual information as prior to the Thompson sampling algorithm.

Multi-arm bandit problem

The multi-armed bandit problem is a well-known reinforcement learning technique,  widely used for its simplicity. Let's consider a  slot machine with n arms (bandits) with each arm having its own biased probability distribution of success. In its simplest form pulling any one of the arms gives you a  reward of either 1 for success, or 0 for failure. 

Our objective is to pull the arms one-by-one in sequence such that we maximize our total reward collected in the long run. 
Data scientists have developed several solutions to tackle the multi-armed bandit problem for which the 3 most common algorithms are:
  • Epsilon-Greedy
  • Upper confidence bounds
  • Thompson sampling

Epsilon-greedy

The Epsilon-greedy algorithm stands as the most straightforward method for navigating the exploration-exploitation dilemma. In the exploitation phase, it consistently chooses the lever with the highest recorded payoff. Nevertheless, occasionally (with a frequency of ε, where ε is less than 0.1), it opts for a random lever to 'explore'—this allows for the investigation of levers whose payouts are not well-known. Levers with established or confirmed high payouts are selected the majority of the time, specifically (1-ε) of the instances.

Upper Confidence Bound (UCB)

The Upper Confidence Bound (UCB) algorithm is arguably the most popular approach to solving the multi-armed bandit problem. It is occasionally described by the principle of 'Optimism in the Face of Uncertainty': It operates under the assumption that the uncertain average rewards of each option will be on the higher end, according to past outcomes.

Thompson sampling

The Thompson sampling algorithm is essentially a Bayesian optimization method. Its central tenet, known as the probability matching strategy, can be distilled to the concept of 'selecting a lever based on its likelihood of being the optimal choice.' This means the frequency of choosing a specific lever should correspond to its actual chances of being the best lever available.

The following diagram illustrates the difference between the upper confidence bounds and the Thompson sampling.


Fig. 1 Confidence Bounds for Multi-Armed bandit problem

Context anyone?

The methods discussed previously do not presume any specifics about the environment or the agent that is choosing the options. There are two scenarios:
  • Context-Free Bandit: In this scenario, the choice of the option with the highest reward is based entirely on historical data, which includes prior outcomes and the record of rewards (both successes and failures). This data can be represented using a probability distribution like the Bernoulli distribution. For example, the chance of rolling a '6' on a dice is not influenced by who is rolling the dice.
  • Contextual Bandit: Here, the current state of the environment, also known as the context, plays a role in the decision-making process for selecting the option with the highest expected reward. The algorithm takes into account a new context (state), makes a choice (selects an arm), and then observes the result (reward). For instance, in online advertising, the choice of which ad banner to display on a website is influenced by the historical reward data linked to the user's demographic information.
All of the strategies suited to the multi-armed bandit problem can be adapted for use with or without context. The following sections will concentrate on the problem of the contextual multi-armed bandit.

Contextual Thompson sampling (CTS)

Let's dive into the key characteristics of the Thompson sampling
  • We assume the prior distribution on the parameters of the distribution (unknown) of the reward for each arm.
  • At each step, t, the arm is selected according to the posterior probability to be the optimal arm. 
The components of the contextual Thompson sampling are
1. Model of parameters w
2. A prior distribution p(w) of the model
3. History H consisting of a context x and reward r
4. Likelihood or probability p(r|x, w) of a reward given a context x and parameters w
5. Posterior distribution computed using naïve Bayes \[p\left( \tilde{w} | H\right ) = \frac{p\left( H | \tilde{w} \right ) p(\tilde{w})}{p(H)}\]

But how can we model a context?

Actually, a process to select the most rewarding arm is actually a predictor or a learner. Each predictor takes the context, defines as a vector of features and predicts which arm will generate the highest reward.
The predictor is a model that can be defined as
- Linear model
- Generalized linear model (i.e. logistic)
- Neural network

The algorithm is implemented as a linear model (weights w) for estimating the reward from a context x  as \[w^{T}.x\]
The ultimate objective for any reinforcement learning model is to extract a policy which quantifies the behavior of the agent. Given a set X of context xi and a set A of arms, the policy Ï€  is defined by the selection of an arm given a context x
\[\pi : X\rightarrow A\]
Variables initialization\[V_{0} = \lambda I_{d}, \hat{w}=0,b_{0}\]
Iteration
FOR t = 1, ... T \[\begin{matrix} 1: \tilde{w_{t}} \leftarrow \mathbb{N}(\tilde{w_{t}} , \frac{v^{2}}{V_{t}} ) \\ \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] 2: a_{t}^{*} = \arg \max _{j} x_{j,t}^{T} \widehat{w_{t}} \\ \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] 3: a_{t} = \arg \max _{j} x_{j,t}^{T} \tilde{w_{t}} \\ \phantom] \phantom] \phantom] 4: reward \rightarrow r_{t} \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \\ \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] 5: V_{t+1} = V_{t} + x_{a_{t},t}.x_{a_{t},t}^T \\ \phantom] \phantom] \phantom] 6: b_{t+1}= b_{t} + x_{a_{t},t}.r_{t} \\ \phantom] 7: \tilde{w} = V_{t+1}^{T}.b \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \\ \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] \phantom] 8: \mathbb{R}_{t}=x_{a^{*},t}^T.\tilde{w_{t}} - x_{a,t}^T, \tilde{w_{t}} \end{matrix}\] 
The sampling of the normal distribution (line 1) is described in details in the post Multivariate Normal Sampling with Scala. The algorithm computes the maximum reward estimation through sampling the normal distribution (line 2) and play the arm a* associated to it (line 3).
The parameters V and b are updated with the estimated value (line 5 and 6) and the actual reward Rt is computed (line 8) after the weights of the context w are updated (line 7).

Thank you for reading this article. For more information ...


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3