Monday, March 7, 2016

Weighted Logistic Loss for Imbalanced Dataset in Spark

Target audience: Intermediate
Estimated reading time: 6'

How to apply corrective weight for the training of a logistic regression with imbalanced data set using Apache Spark MLlib?


Table of contents
Follow me on LinkedIn

Some applications such as spam or online targeting have an imbalanced data set. The number of observations associated to one label is very small (minority class) compared to the number of observations associated to the other labels. 

Overview

Let's consider the case of intrusion detection system that identifies an cyber attack through a Virtual Private Network (VPN) in an enterprise. The objective is to classify every session based on one or several TCP connections as potentially harmful intrusion or normal transaction knowing that a very small fraction of attempt to connect and authenticate are actual deliberate attacks (less than 1 for each million of connections).
There are few points to consider:
  • The number of reported attack is very likely very small (< 10). Therefore, the very small set of labeled security breach is plagued with high variance
  • A very large and expensive labeling operation is required, if even available, to generate a fairly stable negative class (security breach)
  • The predictor can be extremely accurate (as measured by F1-score, Area under the ROC curve or PR curve) by always classifying any new attempt to connect to the VPN as harmless. In the case of 1 reported attack per 1 million VPN session, the prediction would be accurate 99.999% of the time.

There are several approaches to address the problem of imbalanced training and validation sets. These list of the most common known techniques onclude
  • Sub-sampling the majority class (i.e. harmless VPN sessions): It reduces the number of labels for the normal sessions while preserving the labeled reported attacks.
  • Over-sampling the minority class: This technique generates synthetic sampling using bootstrapped samples based on k Nearest Neighbors algorithm
  • Application of weights differential to the logistic loss used in the training of the model
This post focuses on the third option or re-balancing (weighting) of the logistic loss. The implementation of this technique is specific to Apache Spark and more specifically the ML implementation of the logistic regression (not MLlib) that leverages data frames.
It is assumed the reader is somewhat familiar the Apache Spark, data frame and its machine learning module MLlib.

Weighting the logistic loss

Let's consider the logistic loss commonly used in training a binary model f with feature x and label y: \[logLoss = - \sum_{i=0}^{n-1}[y_i .log(f(x_i)) + (1 - y_i) .log(1 - f(x_i))]\] The first component of the loss function is related to the minority observations (security breach through a VPN: label = 1) while the a second component represents the loss related to the majority observation (harmless VPN sessions: label = 0)
Let's consider the ratio, r of number of observations related to the majority label over the total number of observations: \[r = \frac{ i: (y_i = 1)}{i : y_i}\] The logistic loss can be then rebalanced as \[logloss = -\sum_{i=0}^{n-1} [r.y_i.log(f(x_i)) + (1-r).(1-y_i).log(f(x_i))]\] The next step is to implement the weighting of the binomial logistic regression classifier using Apache Spark.

Spark implementation

Apache Spark is a open source framework for in-memory processing of large datasets (think as Hadoop on steroids). Apache Spark framework contains a machine learning module known as MLlib. The objective is to modify/override the train method of the LogisticRegression
One simple option is to sub-class LogisticRegression class in the mllib/ml package. However the logistic loss is actually computed in the private class LogisticAggregator which cannot be overridden.
However if you browse through the ml.classification.LogisticRegression.train Scala code, you notice that the class Instance that encapsulates labeled data for the computation of the loss and the gradient of loss has three parameters
  • label: Double
  • feature: linalg.Vector
  • weight: Double
The plan is to use this 3rd parameter weight as the balancing weight ratio r. This is simply accomplished by adding an extra column weightCol to the input data frame dataset and define its value as
  • balancingRatio r for label = 1
  • 1 - balancingRatio (1 - r) for label = 0
A simple implementation of the Weighted Logistic Regression using Apache Spark MLlib implementation of the Logistic Regression :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final val BalancingWeightColumnName: String = "weightCol"
final val BalanceRatioBounds = (0.001, 0.999)
final val ImportanceWeightNormalization = 2.0

class WeightedLogisticRegression(balanceRatio: Double = 0.5) 
   extends LogisticRegression(UUID.randomUUID.toString) 
      with Logging {

  this.setWeightCol(BalancingWeightColumnName)

  private[this] val balancingRatio = 
    if (balanceRatio < BalanceRatioBounds._1) 
       BalanceRatioBounds._1
    else if (balanceRatio > BalanceRatioBounds._2) 
       BalanceRatioBounds._2
    else
       balanceRatio

  override protected def train(dataset: DataFrame): 
      LogisticRegressionModel = {
    val newDataset = dataset.withColumn("weightCol", lit(0.0))

    val weightedDataset: RDD[Row] = dataset.map(row => {
      val w = if (row(0) == 1.0) balancingRatio 
         else 1.0 - balancingRatio
      Row(row.getAs[Double](0), row.getAs[Vector](1), 
           ImportanceWeightNormalization * w)
    })

    val weightedDataFrame = dataset.sqlContext
        .createDataFrame(weightedDataset, newDataset.schema)
    super.train(weightedDataFrame)
  }
}

Notes
  • The balancing ratio has to be normalized by a factor ImportanceWeightNormalization = 2: The factor is require to produce a weight of 1 for both the majority and minority classes from a fully balanced ratio of 0.5.
  • The actual balancing ratio needs to be constrained within an acceptable range BalanceRatioBounds to prevent for the minority class to have an outsize influence of the weights of the logistic regression model. In the extreme case, there may not even be a single observation in the minority class (security breach through the VPN). These minimum and maximum values are highly dependent on the type of application.
Here is an example of application of the WeightedLogisticRegression on a training data frame labeledData. The number of data points (or observations) associated to label = 1 is extracted through a simple filter.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val numPositives = trainingDF.filter("label == 1.0").count
val datasetSize = labeledData.count
val balanceRatio = (datasetSize- numPositives).toDouble/datasetSize

val lr = new WeightedLogisticRegression(balanceRatio)
          .setMaxIter(20)
          .setElasticNetParam(0)
          .setFitIntercept(true)
          .setTol(1E-6)
          .setRegParam(1E-2)

val model = lr.fit(trainingDF)

One simple validation of this implementation of the weighted logistic regression on Apache Spark is to verify that the weights of the logistic regression model generated with the WeightedLogisticRegression with a balancing ratio of 0.5 are very similar to the weights generated by the logistic regression model of the Apache Spark MLlib (ml.classification.LogisticRegressionModel)

Note: This implementation relies on Apache Spark 1.6.0. The latest implementation of the Logistic Regression in beta release of Apache Spark 2.0 does not allow to override the LogisticRegression outside the scope of Apache Spark MLlib.

References

Thursday, February 4, 2016

Revisit Newton-Raphson

Target audience: Intermediate
Estimated reading time: 3'

The Newton-Raphson method stands out for its straightforwardness, both conceptually and in execution. We'll delve into two distinct Scala implementations and wrap up the discussion by weighing the pros and cons of the Newton-Raphson approach.


Table of contents
Follow me on LinkedIn

The Newton-Raphson is a well-know technique to compute the root of a function, f(x) = 0 or the minimum/maximum of a function using its derivative f'(x) = 0.

First implementation

Let's dive into the computation of the root x of a function f. A function is defined by its Taylor series of derivatives as follow:
\[f(s)=\sum_{0}^{\infty }\frac{f^{(n)}(x)}{n!}(s-x)^{n}\] In case of the root f(s), the equation becomes
\[0=f(x)+f'(x)(x-s)+O((x-s)^{2}f"(s))\] resulting to
\[x_{n+1}=x_{n}- \frac{f(x_{n})}{f'(x_{n})}\]

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
final class NewtonRaphson(
   f:   Double => Double,
   df:  Double => Double,
   dff: Option[Double => Double] = None
) {
   final val EPS = 0.01

   @scala.annotation.tailrec
   def root(z: Double): Double = dff.map(_dff => {
       val nextX = x - f(x) / df(x)
       if (Math.abs(nextX - x) < EPS) nextX 
       else root(nextX)
    }
}

The constructor (lines 2 to 4) takes 3 arguments:
  • Function which root is to be computed
  • First order derivative
  • Optional second order derivative that is not used in this first implementation
The formula is implemented by the method root (lines 9 to 12). Let's apply the root method to a concrete example, with function f and its derivative ff:

1
2
3
4
5
val f = (x: Double) => Math.pow(x, 5.0) - 2.0*x
val ff = (x: Double) => 5.0*Math.pow(x,4.0) -2.0
 
val nr1 = new NewtonRaphson(f, ff)
val z1 = nr1.root(7.9)

So far so good. However it is assumed that the function has a single root, or in the case of a maximum/minimum, its derivative has single root, at least in the vicinity of the initial data point. What about computing the root of a function which may have no or several root?
Let's look at the following function g and its derivative gg


1
2
3
4
val g = (x: Double) => Math.exp(0.5 * x)
val gg = (x: Double) => 0.5 * g(x)

val nr3 = new NewtonRaphson(g, gg)

The call to the Newton-Raphson method nr5 will diverge. In this particular case, the computation generates a Double.NaN, at each iteration.
There are several options to guarantee that the method will properly exit in the case no root exists. Let's look at one solution that leverages on the second derivative.

Second derivative to the rescue

We need to go back to basics: in its simplest form, the Newton-Raphson does not take into account the derivative of order higher than 1. The second order derivative, f" can be used as rough approximation of the error on the approximation of the root. The error san be evaluated at each iteration against a convergence criteria EPS.
\[\Delta \varepsilon = \varepsilon _{n+1}- \varepsilon _{n} \sim \frac{f'(x_{n})}{f"(x_{n})} < EPS\] \[x_{n+1}=x_{n}-\frac{f(x_{n})}{f'(x_{n})}(1 + \Delta \varepsilon)\] The approximation of the error is also used to validate whether the algorithm converges toward a solution (root) or starts to diverge. Let's implement this variation of the Newton-Raphson using the second order derivative as a convergence criteria.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@scala.annotation.tailrec
def root(z: Double): Double = dff.map(_dff => {

   @scala.annotation.tailrec
   def nr2ndOrder(xe: (Double, Double)): (Double, Double) = {
      val x = xe._1
      val eps = xe._2
      
      val nextEps = Math.abs(df(x) / _dff(x))
      val nextX = (x - f(x) *(1.0 + nextEps)/df(x))

      // rules to exit recursion
      if (eps > 0.0 && nextEps >= eps)
         (-1.0, -1.0)
      else if (Math.abs(nextEps - eps) < EPS) 
         (nextX, nextEps)
      else 
         nr2ndOrder((nextX, nextEps))
    }
  
    nr2ndOrder((z, 0.0))._1
}
 ).getOrElse(DIVERGE)

Once again, the computation of the root is implemented as an efficient tail recursion defined in the method nr2ndOrder (line 5)
The convergence criteria is implemented as follow:
  • if error, eps increases, exit the recursion => Diverge (lines 13 and 22)
  • if eps decreases with a value within the convergence criteria => Converged (line 15)
  • if eps decreases with a value higher than the convergence criteria => Recurse (line 18)

Summary

Let's compare the convergence of the simplest Newton-Raphson method with the convergence its variant using the 2nd order derivative, using the function f defined in the first paragraph.

Clearly, the adaptive step size using the second order derivative speeds up the convergence toward the root of the function f. The selection of the appropriate method comes down to the trade-off between the overhead of the computation of the second derivative and the larger number of iterations or recursions required to find the root within a predefined convergence criteria.

References

Wednesday, December 2, 2015

Fast Kullback-Leibler Divergence Using Spark

Target audience: Intermediate
Estimated reading time: 5'

In my role as a data engineer, I've often encountered the challenge of measuring the discrepancy between two probability distributions over identical variables, such as when calculating the loss function for variational auto-encoders or generative adversarial networks. 

This article delves into the Kullback-Leibler divergence and how to implement it using Apache Spark for distributed computing.

Note: The source code relies on Spark 3.3.1 and Scala 2.12.4

What you will learn: How to implement the Kullback-Leibler divergence on Apache Spark with Scala


Overview

The Kullback-Leibler divergence, originating from information theory and often linked to Information Gain, is also referred to as the relative entropy between two distributions [ref 1]. It quantifies the difference in the distribution of random values, specifically in their probability density functions.

Let's consider two distribution P and Q with probability density functions p and q
\[D_{KL}(P||Q)= - \int_{-\infty }^{+\infty }p(x).log\frac{p(x)}{q(x)}\] The formula can be interpreted as the Information lost when the distribution Q is used to approximate the distribution P
It is obvious that the computation if not symmetric: \[D_{KL}(P||Q)\neq D_{KL}(Q||P)\]The Kullback-Leibler divergence plays a role in calculating Mutual Information, a method for feature extraction, and is a member of the F-Divergences family.
In this article, we aim to:
  1. Craft an implementation of the Kullback-Leibler divergence in Scala, leveraging the Apache Spark framework for distributed computation.
  2. Showcase how the Kullback-Leibler divergence can be utilized to contrast various continuous probability density functions.

Scala implementation

First, we'll establish the Kullback-Leibler formula to assess the difference between a dataset {xi, yi} (which is likely produced by a certain random variable or distribution) and a specified continuous distribution with a probability density function, f.
We rely on the foldLeft Scala method to compute the summation of the cross entropy value for each data point [ref 2]

object KullbackLiebler {
   final val EPS = 1e-10
   type DATASET = Iterator[(Double, Double)]

   def execute(      // #1
      xy: DATASET, 
      f: Double => Double): Double = {

      val z = xy.filter{ case(x, y) => abs(y) > EPS}
      - z.foldLeft(0.0){ 
         case(s, (x, y)) => {
            val px = f(x)
            s + px*log(px/y)}
         }
      }. 

   def execute(     // #2
     xy: DATASET, 
     fs: Iterable[Double=>Double]): Iterable[Double] =  fs.map(execute(xy, _))
}

The divergence methods execute are defined as static (object methods)
The first invocation of the method execute (# 1)  computes the dissimilarity between a distribution xy with the distribution with a probability density f.
The second execute (# 2) method computes the KL divergence between a distribution xy with a sequence fs of probability density functions.

Next let's define some of the continuous random distributions used to evaluate the distribution represented by the dataset xyNormal, beta, logNormal, Gamma, LogGamma and ChiSquare

  // One Gaussian distribution
val gauss = (x: Double) => INV_PI*exp(-x*x/2.0)

   // Uniform distribution
val uniform = (x: Double) => x

  // Log normal distribution
val logNormal = (x: Double) => { 
   val lx = log(x)
   INV_PI/x*exp(-lx*lx) 
}
  // Gamma distribution
val gamma = (x: Double, n: Int) => 
    exp(-x)*pow(x, n)/fact(n)

  // Log Gamma distribution
val logGamma = (x: Double, alpha: Int, beta: Int) =>
   exp(beta*x)*exp(-exp(x)/alpha)/(pow(alpha, beta)*fact(beta-1))

  // Simple computation of m! (for beta)
def fact(m: Int): Int = if(m < 2) 1 else m*fact(m-1)

  // Normalization factor for Beta
val cBeta = (n: Int, m: Int) => {
    val f = if(n < 2) 1 else fact(n-1)
    val g = if(m < 2) 1 else fact(m-1)
    f*g/fact(n+m -1).toDouble
}

  // Beta distribution
val beta = (x: Double, alpha: Int, beta: Int) =>
   pow(x, alpha-1)*pow(x, beta-1)/cBeta(alpha, beta)

  // Chi-Square distribution
val chiSquare = (x: Double, k: Int) => {
   val k_2 = k >>1
   pow(x, k_2-1)*exp(-0.5*x) /((1 << k_2)*fact(k_2))
}

Some of these probability density functions are parameterized (i.e. log gamma has an alpha and beta parameters). The probability density functions are implemented as Scala partially applied functions with predefined parameter values, as illustrated in the following code snippet.

val gamma2 = gamma( _ : Double, 2)
val beta25 = beta(_: Double, 2, 5) val chiSquare4 = chiSquare(_: Double, 4)

Notes:
  • The implementation of the probability density functions in the code snippet is not optimized for performance. 
  • Please refer to your favorite Statistics handbook to learn more about these probability distributions.

Spark to the rescue

Apache Spark is a free, open-source framework for cluster computing, specifically designed to process data in real time via distributed computing [ref 3]. Its primary applications include:
  • Analytics: Spark's capability to quickly produce responses allows for interactive data handling, rather than relying solely on predefined queries.
  • Data Integration: Often, the data from various systems is inconsistent and cannot be combined for analysis directly. To obtain consistent data, processes like Extract, Transform, and Load (ETL) are employed. Spark streamlines this ETL process, making it more cost-effective and time-efficient.
  • Streaming: Managing real-time data, such as log files, is challenging. Spark excels in processing these data streams and can identify and block potentially fraudulent activities.
  • Machine Learning: The growing volume of data has made machine learning techniques more viable and accurate. Spark's ability to store data in memory and execute repeated queries swiftly facilitates the use of machine learning algorithms.

Computing the Kullback-Leibler divergence on a sizable dataset with many random distributions can lead to delays and considerable use of computational power. One approach to tackle this is by leveraging the Apache Spark framework for parallel processing of the divergence.

To do this:
  • Segment the primary dataset.
  • Distribute the initial sequence of probability density functions.
  • Apply the Kullback-Leibler formula to each segment using mapPartitions.
  • Retrieve and combine the divergence value from every segment.
final val NUM_DATA_POINTS = 10000000.    // #1
val numTasks: Int = 128                                   // #2
  
val conf = new SparkConf()                            // #3
     .setAppName("Kullback-Liebler")
     .setMaster(s"local[$numTasks]")

val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

Once we define the number of input data points (# 1) and the number of concurrent tasks (# 2), the Apache Spark context sc is configured with an instance of SparkConf (# 3).
Next let's implement the broadcasting mechanism for the batch or sequence of probability density functions.

lazy val pdfs = Map[Int, Double => Double](
  1 -> uniform, 
  2 -> logNormal, 
  3 -> gamma1, 
  4 -> gamma2, 
  5 -> gamma4, 
  6 -> logGamma12, 
  7 -> logGamma22, 
  8 -> beta22, 
  9 -> beta25, 
  10 -> chiSquare2, 
  11 -> chiSquare4
)

val pdfs_broadcast = sc.broadcast[Iterable[Int]](pdfs.map(_._1))

The pdfs_broadcast variable broadcasts the sequence of probability density functions, pdfs, across all partitions. 
Such value broadcasting is extensively employed in the machine learning library MLlib to send model coefficients (or weights) to the data node for gradient and loss function computations.

The mapPartitions method converts an array of value pairs {x, y} into an array of length equivalent to pdfs.size. This new array holds the KL divergence values corresponding to each distribution being assessed.

val kl_rdd  = master_rdd.mapPartitions(
  (it:DATASET) => {
     val pdfsList = pdfs_broadcast.value.map( n => pdfs.get(n).get)
     execute(it, pdfsList).iterator
  }
)

In the end, the divergence values from each partitions are collected onto the Spark master then aggregated using the Scala fold operator foldLeft.

val kl_master = kl_rdd.collect

val divergences = (0 until kl_master.size by pdfs.size)
  .foldLeft(Array.fill(pdfs.size)(0.0))( (s, n) => {
    (0 until pdfs.size).foreach(j =>
       s.update(j, kl_master(n+j)))
       s
   }
).map( _ / kl_master.length)

Conclusion

We conducted the test to gauge the capability of the Kullback-Leibler divergence in assessing the differences among multiple probability distributions. We chose the Gaussian distribution as our reference point against which other distributions were contrasted.
The comparative findings can be seen as follows:
fig 1. KL divergence between various distribution and a normal distribution


As anticipated, the Gaussian and Normal distributions show great similarity. Yet, there's a notable divergence when comparing the Gaussian with the most advanced orders of the Gamma and Beta distributions. This distinction becomes even more apparent when examining the graphs of their respective probability density functions.

References

[1] Pattern Recognition and Machine Learning - 1.6.1 Relative entropy and mutual information C. Bishop - Springer Science - 2006


-------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3