Thursday, May 5, 2016

Bootstrapping With Replacement in Scala

Target audience: Intermediate
Estimated reading time: 6'

Bootstrapping is a method in statistics where random sampling of a dataset is done with replacement. It allows data scientists to determine the sampling distribution for a broad range of probability distributions through this technique.

Background

A primary goal of bootstrapping is to evaluate the precision of various statistical measures like mean, standard deviation, median, mode, or error. These measures, sf, often termed as estimators, serve to approximate a distribution. The most frequently used approximation is called the empirical distribution function. When the data points x are independent and identically distributed (iid), this empirical or approximate distribution can be assessed by employing resampling techniques.

The following diagram captures the essence of bootstrapping by resampling.

Generation of bootstrap replicates by resampling

Each of the B bootstrap samples has the same number of observations or data points as the original data set from which the samples are created. Once the samples are created, a statistical function sf such as mean, mode, median or standard deviation is computed for each sample.
The standard deviation for the B statistics should be similar to the standard deviation of the original data set.

Implementation in Scala

The purpose of this post is to illustrate some basic properties of bootstrapped sampling
  • Profile of the distribution of statistics sf for a given probability distribution
  • Comparison of the standard deviation of the statistics sf with the standard deviation of the original dataset
Let's implement a bootstrap by resampling in Scala, starting with a class Bootstrap.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class Bootstrap(
    numSamples: Int = 1000,
    sf: Vector[Double] => Double,
    inputDistribution: Vector[Double],
    randomizer: Random
) {

  lazy val bootstrappedReplicates: Array[Double] =
     (0 until numSamples)./:( new mutable.ArrayBuffer[Double] )(
        ( buf, _ ) => buf += createBootstrapSample
      ).toArray

  def createBootstrapSample: Double {}

  lazy val mean = bootstrappedReplicates.reduce( _ + _ )/numSamples

  def error: Double = {}
}

The class Bootstrap is instantiated with a pre-defined number of samples, numSamples (line 2), a statistic function sf (line 3), a data set generated by a given distribution inputDistribution (line 4) and a randomizer (line 5).
The computation of the bootstrap replicates, bootstrappedReplicates is central to resampling (lines 8 - 11). As described in the introduction, a replicate, s is computed from a sample of the original data set with the method createBootstrapSample (line 10).

Let's implement the method createBootstrapSample.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def createBootstrapSample: Double =
    sf(
       (0 until inputDistribution.size)./:( new mutable.ArrayBuffer[Double] )(
         ( buf, _ ) => {
            randomizer.setSeed( randomizer.nextLong )
            val randomValueIndex = randomizer.nextInt( inputDistribution.size )
            buf += inputDistribution( randomValueIndex )
         }
       ).toVector
    )

The method createBootstrapSample
- Samples the original dataset using a uniform random function (line 6)
- Applies the statistic function sf to this sample dataset (line 1 & 11)

The last step consists of computing the error (deviation) on the bootstrap replicates

1
2
3
4
5
6
7
8
  def error: Double = {
      val sumOfSquaredDiff = bootstrappedReplicates.reduce(
        (s1: Double, s2: Double) => (s1 - mean) (s1 - mean) +  (s2 - mean)*(s2 - mean)
      )

      Math.sqrt(sumOfSquaredDiff / (numSamples - 1))
  }


Evaluation

The first evaluation consists of comparing the distribution of replicates with the original distribution. To this purpose, we generate an input dataset using
  • Normal distribution
  • LogNormal distribution

Setup

Let's create a method, bootstrapEvaluation to compare the distribution of the bootstrap replicates with the dataset from which the bootstrap samples are generated.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def bootstrapEvaluation( 
     dist: RealDistribution, 
     random: Random, 
     gen: (Double, Double) 
): (Double, Double) = {

    val inputDistribution = (0 until 5000)./:(new ArrayBuffer[(Double, Double)])
     (
       ( buf, _ ) => {
          val x = gen._1 * random.nextDouble - gen._2
          buf += ( ( x, dist.density( x ) ) )
      }
    ).toVector

    val mean = (x: Vector[Double]) => x.sum/x.length
    val bootstrap = new Bootstrap(
        numReplicates,
        mean, 
        inputDistribution.map( _._2 ), 
        new Random( System.currentTimeMillis)
    )

    val meanS = bootstrap.bootstrappedReplicates.sum / 
          bootstrap.bootstrappedReplicates.size
    val sProb = bootstrap.bootstrappedReplicates.map(_ - meanS)
         // .. plotting histogram of distribution sProb
    (bootstrap.mean, bootstrap.error)
  }

We are using the normal and log normal probability density function defined in the Apache Commons Math Java library. These probability density functions are defined in the org.apache.commons.math3.distribution package.

The comparative method bootstrapEvaluation has the following argument:
  • dist: A probability density function used to generate the data set upon which sampling is performed (line 2).
  • random: A random number generator (line 3)
  • gen: A pair of parameters for the linear transform for the generation of random values (a.r + b) (line 4).
The input distribution inputDistribution { (x, pdf(x)} is generated for 5000 data points (lines 7 - 13).
Next the bootstrap is created with the appropriate number of replicates, numReplicates, the mean of the input data set as the statistical function s, the input distribution and the generic random number generator of Scala library, as arguments (lines 16 -20).
Let's plot the distribution the input data set generated from a normal density function.

val (meanNormal, errorNormal) = bootstrap(
    new NormalDistribution, 
    new scala.util.Random, 
    (5.0, 2.5)
)

Normally distributed dataset

The first graph plots the distribution of the input dataset using the Normal distribution.


The second graph illustrates the distribution (histogram) of the replicates s - mean.


The bootstrap replicates sf(x) are also normally distributed. The mean value for the bootstrap replicates is 0.1978 and the error is 0.001691

Dataset with a log normal distribution

We repeat the same process for the lognormal distribution. This time around the dataset to sample from follows a log-normal distribution.

val (meanLogNormal, errorLogNormal) = bootstrap(
    new LogNormalDistribution, 
    new scala.util.Random, 
    (2.0, 0.0)
)



Although the original dataset used for generated the bootstrap samples is normally distributed, the bootstrap replicates sf(x) are normally distributed. The mean for the bootstrap replicates is 0.3801 and the error is 0.002937

The error for the bootstrap resampling from a log normal distribution is twice as much as the error related to the normal distribution
The result is not surprising: The bootstrap replicates follow a normal distribution which matches closely the original dataset created using the same probability density function. 

References

  • Programming in Scala - 3rd edition M Odersky, L. Spoon, B. Venners - Artima - 2016
  • Elements of Statistics Learning: Data mining, Inference and Prediction - 7.11 Bootstrap method Springer - 2001
  • github.com/patnicolas

Friday, April 15, 2016

Manage Spark Context in ScalaTest

Target audience: Beginner
Estimated reading time: 4'

This post describes a methodology to manage the Spark context while testing you application using ScalaTest


Table of contents
Follow me on LinkedIn

Introduction

Debugging Apache Spark application using ScalaTest seems quite simple when dealing with a single test:
  • Specify your Spark context configuration SparkConf
  • Create the Spark context
  • Add test code related to your application
  • Clean up resources (Spark context, Akka context, File handles...)
However, there are cases when you may need to create and close the spark context used across multiple test or on a subset of the tests. The challenge is to make sure that the Spark context is actually close when it is no longer needed.
This post introduces two basic
ScalaTest methods beforeAll and afterAll of the trait BeforeAndAfterAll to manage the context life-cyle of your test application.

Wrapping the Spark context

The objective is to create a small framework that create or retrieve an existing Spark context before executing a test and closing it after the test is completed, independently of the status of the test.
The initialization of the Spark context consist of specifying the configuration for the sequence of tests. Some of these parameters can be dynamically defined through a simple parameterization. The context is created only if it is not already defined within the scope of the test using the getOrCreate method. 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
trait SparkContextWrapper {
  protected[this] var sc: SparkContext = _

  def getContext: SparkContext = {
    val conf = new SparkConf().setAppName(s"Test-App")
      .set("spark.executor.instances", "2")
      .set("spark.driver.memory", "2g")
      .set("spark.executor.memory", "2g")
      .set("spark.executor.cores", "2")
      .set("spark.serializer", 
              "org.apache.spark.serializer.KryoSerializer")
      .set("spark.rdd.compress", "true")
      .set("spark.shuffle.spill", "true")
      .set("spark.shuffle.spill.compress", "true")
      .set("spark.shuffle.memoryFraction", "0.4")
      .set("spark.io.compression.codec", "snappy")
      .set("spark.network.timeout", "600")

    sc = SparkContext.getOrCreate(conf.setMaster("local[4]"))
    sc.setLogLevel("ERROR")
    sc
  }

  def close: Unit = {
    if (sc != null) sc.stop
  }
}

Scala test context

The solution is to let ScalaTest method manage the lifecycle of the Spark context for testing. 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
trait SparkContextForTesting 
     extends SparkContextWrapper with BeforeAndAfterAll { 
  self: Suite =>

  override def beforeAll: Unit = {
    getContext
    super.beforeAll
  }

  override def afterAll: Unit = {
    close
    super.afterAll
  }
}

Note: The BeforeAndAfterAll trait can only be sub-classed by a test suite inheriting the Suite trait. The method beforeAll (line: 5) is automatically called before the first test of your test suite and the method afterAll (line 10)is called after your last test is completed, whether those tests succeeded or not.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class MyTest 
  extends FlatSpec 
    with Matchers 
      with SparkContextForTesting {

 val x: Int = -4
 it should "My first test" in {
   val sqlContext = new SQLContext(sc) 
   // ....
 }
   // ... other tests

 it should "My last test" in {
   // clean up after it completes
 }
}

"That's all folks!

Monday, March 7, 2016

Weighted Logistic Loss for Imbalanced Dataset in Spark

Target audience: Intermediate
Estimated reading time: 6'

How to apply corrective weight for the training of a logistic regression with imbalanced data set using Apache Spark MLlib?


Table of contents
Follow me on LinkedIn

Some applications such as spam or online targeting have an imbalanced data set. The number of observations associated to one label is very small (minority class) compared to the number of observations associated to the other labels. 

Overview

Let's consider the case of intrusion detection system that identifies an cyber attack through a Virtual Private Network (VPN) in an enterprise. The objective is to classify every session based on one or several TCP connections as potentially harmful intrusion or normal transaction knowing that a very small fraction of attempt to connect and authenticate are actual deliberate attacks (less than 1 for each million of connections).
There are few points to consider:
  • The number of reported attack is very likely very small (< 10). Therefore, the very small set of labeled security breach is plagued with high variance
  • A very large and expensive labeling operation is required, if even available, to generate a fairly stable negative class (security breach)
  • The predictor can be extremely accurate (as measured by F1-score, Area under the ROC curve or PR curve) by always classifying any new attempt to connect to the VPN as harmless. In the case of 1 reported attack per 1 million VPN session, the prediction would be accurate 99.999% of the time.

There are several approaches to address the problem of imbalanced training and validation sets. These list of the most common known techniques onclude
  • Sub-sampling the majority class (i.e. harmless VPN sessions): It reduces the number of labels for the normal sessions while preserving the labeled reported attacks.
  • Over-sampling the minority class: This technique generates synthetic sampling using bootstrapped samples based on k Nearest Neighbors algorithm
  • Application of weights differential to the logistic loss used in the training of the model
This post focuses on the third option or re-balancing (weighting) of the logistic loss. The implementation of this technique is specific to Apache Spark and more specifically the ML implementation of the logistic regression (not MLlib) that leverages data frames.
It is assumed the reader is somewhat familiar the Apache Spark, data frame and its machine learning module MLlib.

Weighting the logistic loss

Let's consider the logistic loss commonly used in training a binary model f with feature x and label y: \[logLoss = - \sum_{i=0}^{n-1}[y_i .log(f(x_i)) + (1 - y_i) .log(1 - f(x_i))]\] The first component of the loss function is related to the minority observations (security breach through a VPN: label = 1) while the a second component represents the loss related to the majority observation (harmless VPN sessions: label = 0)
Let's consider the ratio, r of number of observations related to the majority label over the total number of observations: \[r = \frac{ i: (y_i = 1)}{i : y_i}\] The logistic loss can be then rebalanced as \[logloss = -\sum_{i=0}^{n-1} [r.y_i.log(f(x_i)) + (1-r).(1-y_i).log(f(x_i))]\] The next step is to implement the weighting of the binomial logistic regression classifier using Apache Spark.

Spark implementation

Apache Spark is a open source framework for in-memory processing of large datasets (think as Hadoop on steroids). Apache Spark framework contains a machine learning module known as MLlib. The objective is to modify/override the train method of the LogisticRegression
One simple option is to sub-class LogisticRegression class in the mllib/ml package. However the logistic loss is actually computed in the private class LogisticAggregator which cannot be overridden.
However if you browse through the ml.classification.LogisticRegression.train Scala code, you notice that the class Instance that encapsulates labeled data for the computation of the loss and the gradient of loss has three parameters
  • label: Double
  • feature: linalg.Vector
  • weight: Double
The plan is to use this 3rd parameter weight as the balancing weight ratio r. This is simply accomplished by adding an extra column weightCol to the input data frame dataset and define its value as
  • balancingRatio r for label = 1
  • 1 - balancingRatio (1 - r) for label = 0
A simple implementation of the Weighted Logistic Regression using Apache Spark MLlib implementation of the Logistic Regression :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final val BalancingWeightColumnName: String = "weightCol"
final val BalanceRatioBounds = (0.001, 0.999)
final val ImportanceWeightNormalization = 2.0

class WeightedLogisticRegression(balanceRatio: Double = 0.5) 
   extends LogisticRegression(UUID.randomUUID.toString) 
      with Logging {

  this.setWeightCol(BalancingWeightColumnName)

  private[this] val balancingRatio = 
    if (balanceRatio < BalanceRatioBounds._1) 
       BalanceRatioBounds._1
    else if (balanceRatio > BalanceRatioBounds._2) 
       BalanceRatioBounds._2
    else
       balanceRatio

  override protected def train(dataset: DataFrame): 
      LogisticRegressionModel = {
    val newDataset = dataset.withColumn("weightCol", lit(0.0))

    val weightedDataset: RDD[Row] = dataset.map(row => {
      val w = if (row(0) == 1.0) balancingRatio 
         else 1.0 - balancingRatio
      Row(row.getAs[Double](0), row.getAs[Vector](1), 
           ImportanceWeightNormalization * w)
    })

    val weightedDataFrame = dataset.sqlContext
        .createDataFrame(weightedDataset, newDataset.schema)
    super.train(weightedDataFrame)
  }
}

Notes
  • The balancing ratio has to be normalized by a factor ImportanceWeightNormalization = 2: The factor is require to produce a weight of 1 for both the majority and minority classes from a fully balanced ratio of 0.5.
  • The actual balancing ratio needs to be constrained within an acceptable range BalanceRatioBounds to prevent for the minority class to have an outsize influence of the weights of the logistic regression model. In the extreme case, there may not even be a single observation in the minority class (security breach through the VPN). These minimum and maximum values are highly dependent on the type of application.
Here is an example of application of the WeightedLogisticRegression on a training data frame labeledData. The number of data points (or observations) associated to label = 1 is extracted through a simple filter.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val numPositives = trainingDF.filter("label == 1.0").count
val datasetSize = labeledData.count
val balanceRatio = (datasetSize- numPositives).toDouble/datasetSize

val lr = new WeightedLogisticRegression(balanceRatio)
          .setMaxIter(20)
          .setElasticNetParam(0)
          .setFitIntercept(true)
          .setTol(1E-6)
          .setRegParam(1E-2)

val model = lr.fit(trainingDF)

One simple validation of this implementation of the weighted logistic regression on Apache Spark is to verify that the weights of the logistic regression model generated with the WeightedLogisticRegression with a balancing ratio of 0.5 are very similar to the weights generated by the logistic regression model of the Apache Spark MLlib (ml.classification.LogisticRegressionModel)

Note: This implementation relies on Apache Spark 1.6.0. The latest implementation of the Logistic Regression in beta release of Apache Spark 2.0 does not allow to override the LogisticRegression outside the scope of Apache Spark MLlib.

References