Showing posts with label MLlib. Show all posts
Showing posts with label MLlib. Show all posts

Sunday, February 23, 2020

Implement Kernel Density Estimator in Spark

Target audience: Intermediate
Estimated reading time: 4'

This article introduces a very powerful, non-parametric method to extract an empirical continuous probability density function from a dataset: Multivariate Kernel Density Estimation (KDE), also known as the Parzen’s windowAt its core the KDE is a smooth approximation of an histogram [ref 1] 


Table of contents

Follow me on LinkedIn
Notes
  • This article requires a basic knowledge in Apache spark MLlib framework and understanding of statistics and/or machine learning.
  • This implementation relies on Spark 2.3.1 and Scala 2.12.4

Introduction

KDE is one of the most well-known approaches to estimate the underlying probability density function of a dataset. KDE will learn the shape of the density from the data automatically. This flexibility arising from its non- parametric nature makes KDE a very popular approach for data drawn from a complicated distribution.

The KDE algorithm takes a parameter, bandwidth, that affects how “smooth” the resulting curve is. For a set of observations y, and given a kernel function K and a bandwidth, the estimation of the density function f, can be expressed as.


This post addresses the limitations of the current implementation of KDE in Apache Spark for the multi-variate features.

Spark implementation

Apache Spark is a fast and general-purpose cluster computing solution that provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.
The Apache Spark ecosystems includes a machine learning library, MLlib.

The implementation of the kernel density estimation in the current version of Apache Spark MLlib library, 2.3.1 
[ref 2]org.apache.spark.mllib.stats.KernelDensity has two important limitations:
  • It is a univariate estimation
  • The estimation is performed on a sequence of observations, not an RDD or data set, putting computation load on the Spark driver.
An example of application of KDE using Apache Spark MLlib 2.3.1 ... 

val sample = sparkSession.sparkContext.parallelize(data) 
 
val kd = new KernelDensity().setSample(sample).setBandwidth(3.0)
 
val densities = kd.estimate(Array(-2.0, 5.0))

The method setSample specifies the training set but the KDE is actually trained when the method estimate is invoked on the driver. 

Multivariate KDE

The purpose of this post is to extend the current functionality of the KDE by supporting multi-dimensional features and allows the developers to apply the estimation to a dataset. This implementation is restricted to the Normal distribution although it can easily be extended to other kernel functions. 
We assume 
  • The reference to the current Spark session is implicit (line 1)
  • The encoding of a row for serialization of the task is provided (line 1)
The method estimate has 3 arguments 
  • TrainingDS training dataset (line 9)
  • Validation validation set (line 10)
  • bandwidth size of the Parzen window
The validation set has to be broadcast to each worker nodes (line 14). This should not be a problem as the size of the validation set is expected of reasonable size. 
The training set is passed to each partitions as iterator through a mapPartitions (line 17). The probability densities and count are computed through a Scala aggregate method with a zero function of type, (Array[Double], Long) (line 23). The sequence operator invokes the multinomial normal distribution (line 29). 
  
The combiner (3rd argument of the aggregate) relies on the BLAS vectorization z = <- a.x+y dxapy (line 38). BLAS library [ref 3has 3 levels (1D, 2D and 3D arrays). Blas library
The vector of densities is scaled with invCount using the decal BLAS level 1 method (line 45).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final class KDE(implicit sparkSession: SparkSession, 
      encoder: Encoder[Row]) {
 
  /**
    * Applied the trained KDE to a set of validation data
    * @param trainingDS  Training data sets
    * @param validationRdd Validation data sets
    * @return Datasets of probability densities
    */
  def estimate(
    trainingDS: Dataset[Obs], 
    validationDS: Dataset[Obs], 
    bandwidth: Double = 1.0): Dataset[Double] = {
    import math._, sparkSession.implicits._
    val validation_brdcast = sparkSession.sparkContext
            .broadcast[Array[Obs]](validationDS.collect)

    trainingDS.mapPartitions((iter: Iterator[Obs]) => {
       val seqObs = iter.toArray
       val scale = 0.5 * seqObs.size* log(2 * Pi)
       val validation = validation_brdcast.value

       val (densities, count) = seqObs.aggregate(
         (new Array[Double](validation.length), 0L) ) (
           {        // seqOp (U, T) => U
            
             case ((x, z), y) => {
                var i = 0
                while (i < validation.length) {   
                 // Call the pdf function for the normal distribution
                    x(i) += multiNorm(y, bandwidth, scale, validation(i))
                    i += 1
                }
                (x, z + 1)  // Update  count & validation values
             }
          },
          {         // combOp: (U, U) => U
             case ((u, z), (v, t)) => { 
                // Combiner calls vectorization z <- a.x + y
                blas.daxpy(validation.length, 1.0, v, 1, u, 1)
                (u, z + t)
             }
          }
      )

      val invCount: Double = 1.0 / count
      blas.dscal(validation.length, invCount, densities, 1)  
          // Rescale the density using LINPACK z <- a.x
      densities.iterator
    })
  }
}

The companion singleton is used to define the multinomial normal distribution (line 5). The type of observations (feature) is Array[Double].

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
final object KDE {
    import math._
    type Obs = Array[Double]

    @throws(classOf[IllegalArgumentException])
    def multiNorm(
         means: Obs, 
         bandWidth: Double, 
         scale: Double, 
         x: Obs): Double = {
      require(x.length == means.length, 
           "Dimension of means and observations differs")

       exp(
          -scale - (0 until means.length).map( 
             n => {
                val sx = (means(n) - x(n)) / bandWidth
                -0.5 * sx * sx
             }
       ).sum
    )
  }
}

Application

This simple application requires that the spark context (SparkSession) to be defined as well as an explicit encoding of Row using Kryo serializer. The implicit conversion are made available by importing sparkSession.implicits.
The training set is a sequence of key-value pairs (lines 3-14). The validation set is synthetically generated by multiplying the data in the training value with 2.0 (line 17).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
implicit val sparkSession: SparkSession =    
       confToSessionFromFile(LocalSparkConf)
implicit val encoder = Encoders.kryo[Row]
import sparkSession.implicits._

val trainingData = Seq[(String, Array[Double])](
     ("A", Array[Double](1.0, 0.6)), ("B", Array[Double](2.0, 0.6)), 
     ("C", Array[Double](1.5, 9.5)), ("D", Array[Double](4.5, 0.7)), 
     ("E", Array[Double](0.4, 1.5)), ("F", Array[Double](2.1, 0.6)),
     ("G", Array[Double](0.5, 6.3)), ("H", Array[Double](1.5, 0.1)), 
     ("I", Array[Double](1.2, 3.0)), ("B", Array[Double](3.1, 1.1))
  ).toDS

  val validationData = trainingData
      .map { case (key, values) => values.map(_ *2.0) }

  val kde = new KDE
  val result = kde.estimate(trainingData.map(_._2),validationData)

  println(s"result: ${result.collect.mkString(", ")}")

  sparkSession.close


  val data = Seq[Double](1.0, 5.6)
  val sample = sparkSession.sparkContext.parallelize(data)
  val kd = new KernelDensity().setSample(sample) .setBandwidth(3.0)
  val densities = kd.estimate(Array(-2.0, 5.0))


Note: There are excellent research papers highlighting the statistical foundation behind KDE as well as recent advances [ref 4].

Thank you for reading this article. For more information ...

References

[3BLAS
Environment Scala: 2.12.4,  Java JDK 1.8, Apache Spark 2.3.1, OpenBLAS 0.3.4


---------------------------

Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Wednesday, February 7, 2018

Implementing AdaGrad Optimizer in Spark

Target audience: Intermediate
Estimated reading time: 5'

Have you ever desired a Gradient Descent approach with greater flexibility? If that's the case, Adaptive Gradient Descent (AdaGrad) could be a suitable alternative.


What you will learn: The workings of the Adaptive Gradient algorithm, how it improves upon Stochastic Gradient Descent, and its implementation using Apache Spark.



Table of contents
Follow me on LinkedIn
Notes
  • This post assumes that reader has rudimentary knowledge of the Scala API of Apache Spark and basic understanding of machine learning.
  • The code associated with this article is written using Scala 2.12.11 and Spark 3.1.0

Background

The optimization algorithm known as Stochastic Gradient Descent (SGD) is frequently employed to minimize the loss function during the training of various machine learning models, including support vector machines, logistic regression, and back-propagation neural networks. Typically, in its most basic form, SGD calculates the gradient using a singular learning rate.

It's quite usual to find that the features of a model exhibit significant variance across different observations. Under such circumstances, an adaptive gradient (AdaGrad) algorithm that assigns a unique learning rate to each feature could be an effective solution. There exist numerous methods to develop an algorithm that assigns a distinct learning rate to every feature. 
Accidentally, Spark ML library, MLlib, does not include an implementation of AdaGrad.
This article discusses the AdaGrad algorithm and an implementation for Apache Spark.

Stochastic Gradient Descent (SGD)

Apache Spark stands out as a rapid and versatile cluster computing system, offering advanced APIs in Java, Scala, Python, and R, along with an enhanced engine capable of supporting general execution graphs.

Within the Apache Spark ecosystem lies MLlib, a machine learning library. The stochastic gradient descent optimizer within this library serves as a randomized variant of the (batched) gradient descent algorithm, which is employed for minimizing a continuous differentiable objective function. In the realm of supervised machine learning, this objective function typically manifests as a loss function, such as logistic loss, sum of least squares, and others. \[L(w)=\frac{1}{n}\sum_{I=1}^{n}(y_{i}-f(x_{i}|w))^{2}\] The objective function L is expressed as the summation of differentiable functions. In supervised learning, the loss related to a specific feature is defined as a continuous, differentiable, convex function. \[L(w)=\sum_{i=1}^{n}L_{i}(w)\] In supervised learning, the vector w represents the vector of weights (or model parameters). At each iteration of the stochastic gradient descent, the weights are updated using the formula \[w_{t+1}=w_{t}-\eta \sum_{i=0}^{n}\frac{\partial L}{\partial w_{i, t}}\] Stochastic Gradient Descent (SGD) aims to reduce the loss function, which represents the difference between the model's predicted values and the expected values. In each iteration, SGD picks a small group of observations, referred to as a mini-batch, for the model's training. This repetitive procedure is designed to progressively approach the true global minimum of the loss function.

Adaptive Gradient Descent

The core concept of AdaGrad revolves around the strategy of boosting the learning rate for sparse features (or model parameters) while reducing it for more dense features. As a result, AdaGrad enhances the efficiency of converging towards the minimum loss in models with sparse features, particularly when these sparse features hold significant information. \[w_{t+1}=w_{t} -\frac{1}{\sqrt{\sum_{t=1}^{T}\bigtriangledown _{ti}^{t} + \varepsilon }}\frac{\partial L}{\partial w_{ti}}\]

SGD in Apache Spark

The Apache spark MLlib library has two implementations of SGD
  • Generic Gradient Descent and related classes in the mllib.optimization package
  • SGD bundled with classifier or regression algorithms such as LogisticRegressionWithSGD, LassoWithSGD, SVMWithSGD or RidgeRegressionWithSGD

We plan to utilize the optimization package to modify the stochastic gradient descent. Our goal is to use the mllib.optimization.GradientDescent template class from MLlib and create a custom Updater that implements an adaptive gradient with per-feature learning rates.

This Updater is responsible for "updating the model's weights" (in models like Logistic Regression or SVM) by multiplying the current learning rate with the partial derivative of the loss with respect to each weight, as previously described. We'll name our custom Updater as AdaGradUpdater, which will handle the model weight updates using the adaptive gradient approach. Following this, we'll initialize the SGD in the following manner.
   val adaSGD = new GradientDescent.
                    .setUpdater(new AdaGradUpdater)
                    .setStepSize(0.01)
                    . .....
The class AdaGradUpdater has to implement the generic compute method
  Updater.compute(
      oldWeights: Vector, 
      gradient: Vector, 
      stepSize: Double, 
      iter: Int, 
      regCoefs: Double
  ): (Vector, Double)
The method returns the tuple (vector of new weights, loss). Let's implement the AdaGrad algorithm


Implementation of AdaGrad

As mentioned earlier, the implementation of AdaGrad consists of overriding the method Updater.compute.

The computation of the learning rate requires us to record the past values of the square value of the gradient (previous steps) for this particular weight, in the array gradientHistory (line 2). First we define the method += to update the gradient history (lines 26-367. The first call to the method creates the gradient history (line 30).
The next step consists of converting the existing (old) weights into a Breeze dense vector brzWeights (line 13). The array of the new learning rates is computed as the inverseVelocity coefficient (line 40).

The learning rates are zipped with the old weights (line 14) to update the weights newWeights as a new dense vector(line 15-17). The linear algebra (matrix computation) on the Spark data node is actually performed by the LINPACK library under the cover through calls to brzAxpy (line 20) and brzNorm (line 21).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 final class AdaGradL2Updater(dimension: Int) extends Updater {
  var gradientsHistory: Array[Double] = _

  override def compute(
       weightsOld: Vector,
       gradient: Vector,
       stepSize: Double,
       iter: Int,
       regParam: Double
  ): (Vector, Double) = {

    +=(gradient)
    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
    val sumSquareDerivative = inverseVelocity.zip(brzWeights.toArray)
    val newWeights: BV[Double] = 
       new DenseVector[Double](sumSquareDerivative.view.map {
          case (coef, weight) => weight * (1.0 -regParam * coef)
       }

    brzAxpy(-1.0, gradient.toBreeze, newWeights)
    val norm = brzNorm(brzWeights, 2.0)

    (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
  }


  private def +=(gradient: Vector): Unit = {
    val grad = gradient.toArray
    
    grad.view.zipWithIndex.foreach {
      case (g, index) => {
        if(gradientsHistory == null)
          cgradientsHistory = Array.fill(grad.length)(0.0)

        val existingGradient = gradientsHistory(index)
        gradientsHistory.update(index, existingGradient + g*g)
      }
    }
  }


  def inverseVelocity = gradientsHistory.map(1.0/Math.sqrt(_))
}

Analysis

Given a logistic regression model to classifier of time series pattern, we compute and compare the training binary cross-entropy loss without regularization using the default MLlib SGD and our implementation of AdaGrad.

The AdaGrad algorithm has its limitation. The accumulation can grow very large over time, causing the learning rate to shrink and become infinitesimally small, which effectively stops the parameter from updating.

AdaGrad is not the only alternative to traditional SGD. RMS and Adam are worth investigating. 

Thank you for reading this article. For more information ...

References


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Monday, March 7, 2016

Weighted Logistic Loss for Imbalanced Dataset in Spark

Target audience: Intermediate
Estimated reading time: 6'

How to apply corrective weight for the training of a logistic regression with imbalanced data set using Apache Spark MLlib?


Table of contents
Follow me on LinkedIn

Some applications such as spam or online targeting have an imbalanced data set. The number of observations associated to one label is very small (minority class) compared to the number of observations associated to the other labels. 

Overview

Let's consider the case of intrusion detection system that identifies an cyber attack through a Virtual Private Network (VPN) in an enterprise. The objective is to classify every session based on one or several TCP connections as potentially harmful intrusion or normal transaction knowing that a very small fraction of attempt to connect and authenticate are actual deliberate attacks (less than 1 for each million of connections).
There are few points to consider:
  • The number of reported attack is very likely very small (< 10). Therefore, the very small set of labeled security breach is plagued with high variance
  • A very large and expensive labeling operation is required, if even available, to generate a fairly stable negative class (security breach)
  • The predictor can be extremely accurate (as measured by F1-score, Area under the ROC curve or PR curve) by always classifying any new attempt to connect to the VPN as harmless. In the case of 1 reported attack per 1 million VPN session, the prediction would be accurate 99.999% of the time.

There are several approaches to address the problem of imbalanced training and validation sets. These list of the most common known techniques onclude
  • Sub-sampling the majority class (i.e. harmless VPN sessions): It reduces the number of labels for the normal sessions while preserving the labeled reported attacks.
  • Over-sampling the minority class: This technique generates synthetic sampling using bootstrapped samples based on k Nearest Neighbors algorithm
  • Application of weights differential to the logistic loss used in the training of the model
This post focuses on the third option or re-balancing (weighting) of the logistic loss. The implementation of this technique is specific to Apache Spark and more specifically the ML implementation of the logistic regression (not MLlib) that leverages data frames.
It is assumed the reader is somewhat familiar the Apache Spark, data frame and its machine learning module MLlib.

Weighting the logistic loss

Let's consider the logistic loss commonly used in training a binary model f with feature x and label y: \[logLoss = - \sum_{i=0}^{n-1}[y_i .log(f(x_i)) + (1 - y_i) .log(1 - f(x_i))]\] The first component of the loss function is related to the minority observations (security breach through a VPN: label = 1) while the a second component represents the loss related to the majority observation (harmless VPN sessions: label = 0)
Let's consider the ratio, r of number of observations related to the majority label over the total number of observations: \[r = \frac{ i: (y_i = 1)}{i : y_i}\] The logistic loss can be then rebalanced as \[logloss = -\sum_{i=0}^{n-1} [r.y_i.log(f(x_i)) + (1-r).(1-y_i).log(f(x_i))]\] The next step is to implement the weighting of the binomial logistic regression classifier using Apache Spark.

Spark implementation

Apache Spark is a open source framework for in-memory processing of large datasets (think as Hadoop on steroids). Apache Spark framework contains a machine learning module known as MLlib. The objective is to modify/override the train method of the LogisticRegression
One simple option is to sub-class LogisticRegression class in the mllib/ml package. However the logistic loss is actually computed in the private class LogisticAggregator which cannot be overridden.
However if you browse through the ml.classification.LogisticRegression.train Scala code, you notice that the class Instance that encapsulates labeled data for the computation of the loss and the gradient of loss has three parameters
  • label: Double
  • feature: linalg.Vector
  • weight: Double
The plan is to use this 3rd parameter weight as the balancing weight ratio r. This is simply accomplished by adding an extra column weightCol to the input data frame dataset and define its value as
  • balancingRatio r for label = 1
  • 1 - balancingRatio (1 - r) for label = 0
A simple implementation of the Weighted Logistic Regression using Apache Spark MLlib implementation of the Logistic Regression :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final val BalancingWeightColumnName: String = "weightCol"
final val BalanceRatioBounds = (0.001, 0.999)
final val ImportanceWeightNormalization = 2.0

class WeightedLogisticRegression(balanceRatio: Double = 0.5) 
   extends LogisticRegression(UUID.randomUUID.toString) 
      with Logging {

  this.setWeightCol(BalancingWeightColumnName)

  private[this] val balancingRatio = 
    if (balanceRatio < BalanceRatioBounds._1) 
       BalanceRatioBounds._1
    else if (balanceRatio > BalanceRatioBounds._2) 
       BalanceRatioBounds._2
    else
       balanceRatio

  override protected def train(dataset: DataFrame): 
      LogisticRegressionModel = {
    val newDataset = dataset.withColumn("weightCol", lit(0.0))

    val weightedDataset: RDD[Row] = dataset.map(row => {
      val w = if (row(0) == 1.0) balancingRatio 
         else 1.0 - balancingRatio
      Row(row.getAs[Double](0), row.getAs[Vector](1), 
           ImportanceWeightNormalization * w)
    })

    val weightedDataFrame = dataset.sqlContext
        .createDataFrame(weightedDataset, newDataset.schema)
    super.train(weightedDataFrame)
  }
}

Notes
  • The balancing ratio has to be normalized by a factor ImportanceWeightNormalization = 2: The factor is require to produce a weight of 1 for both the majority and minority classes from a fully balanced ratio of 0.5.
  • The actual balancing ratio needs to be constrained within an acceptable range BalanceRatioBounds to prevent for the minority class to have an outsize influence of the weights of the logistic regression model. In the extreme case, there may not even be a single observation in the minority class (security breach through the VPN). These minimum and maximum values are highly dependent on the type of application.
Here is an example of application of the WeightedLogisticRegression on a training data frame labeledData. The number of data points (or observations) associated to label = 1 is extracted through a simple filter.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val numPositives = trainingDF.filter("label == 1.0").count
val datasetSize = labeledData.count
val balanceRatio = (datasetSize- numPositives).toDouble/datasetSize

val lr = new WeightedLogisticRegression(balanceRatio)
          .setMaxIter(20)
          .setElasticNetParam(0)
          .setFitIntercept(true)
          .setTol(1E-6)
          .setRegParam(1E-2)

val model = lr.fit(trainingDF)

One simple validation of this implementation of the weighted logistic regression on Apache Spark is to verify that the weights of the logistic regression model generated with the WeightedLogisticRegression with a balancing ratio of 0.5 are very similar to the weights generated by the logistic regression model of the Apache Spark MLlib (ml.classification.LogisticRegressionModel)

Note: This implementation relies on Apache Spark 1.6.0. The latest implementation of the Logistic Regression in beta release of Apache Spark 2.0 does not allow to override the LogisticRegression outside the scope of Apache Spark MLlib.

References