Friday, October 9, 2015

Optimizers for Logistic Regression in Apache Spark

Target audience: Intermediate
Estimated reading time: 5'

If you're finding it challenging to choose an optimizer for logistic regression, you're not alone. This article delves into a comparison of various optimizers used in logistic regression within Apache Spark MLlib, providing insights and guidance.

Table of contents
Follow me on LinkedIn

What you will learn: How to select the most appropriate optimizer for the logistic regression in Spark.

Notes:
  • The original article was written using Spark 1.5.1 and reworked with Spark 3.4.0
  • Environments: Spark 3.4.0,  Scala 2.13.1, JDK 12

Overview

This article presents a comparison between the stochastic gradient descent (SGD) and the quasi-Newton Limited memory BFGS (L-BFGS) optimizer for binomial classification using logistic regression in Apache Spark MLlib [ref 1]. The MLlib library in Apache Spark 3.x offers two prominent optimizers for binomial classification through logistic regression:
  • Stochastic Gradient Descent (SGD) [ref 2]
  • The Limited Memory version of the Broyden-Fletcher-Goldfarb-Shanno algorithm (L-BFGS) [ref 3].
Both of these optimizers are explored in detail to understand their application and effectiveness in this context.

SGD

Gradient descent is a repetitive process that begins at a random position on a function f and progressively moves down its slope in increments until it arrives at the function's minimum point.
Given a set of data po ints (vectors) {xi} \[x_{i+1}=x_{i}-\alpha \triangledown f(x_{i}) \ \ \ \alpha :learning\ rate \]
The efficiency of the gradient descent algorithm can be enhanced by incorporating randomness into the selection of data points. Stochastic Gradient Descent (SGD) selects a new data point from the training set at each iteration, significantly reducing computational demands.


L-BFGS

The Broyden-Fletcher-Goldfarb-Shanno (BFGS) algorithm is a process used iteratively to address optimization problems that are nonlinear and unconstrained. This method calculates the direction for descent by using curvature data to adjust the gradient. It operates based on estimating the Hessian matrix, which is associated with the loss function [ref 4].
Given the value at iteration k, xk and the gradient gk \[g_{k}=\triangledown f(x_{k})\] Let's define the following intermediate values:\[s_{k}=x_{k+1}-x_{k} \ \ \ y_{k}=g_{k+1}=g_{k} \ \ \ \rho _{k}=\frac{1}{y_{k}^{b}.s_{k}}\], then he Hessian matrix is updated for the next iteration as \[H_{k+1}=\left ( I-\rho_{k} s_{k}y_{k}^{T}\right ).H_{k}.\left ( I-\rho_{k} y_{k}s_{k}^{T}\right ) +\rho_{k} s_{k}s_{k}^{T}\]The Limited-Memory BFGS is a variant of the Broyden-Fletcher-Goldfarb-Shanno algorithm that streamlines the process by keeping only a small number of vectors. These vectors implicitly stand for the approximation, thereby minimizing the requirement for large memory allocation during the iterative optimization steps.


Logistic regression

Logistic regression is a classification technique designed to categorize observations into distinct classes. There are two varieties of logistic regression models:
  • Binary Classifier: This type categorizes into two exclusive classes.
  • Multi-class Classifier: This type deals with multiple exclusive classes.
Logistic regression is widely favored in discriminative supervised learning due to its simplicity and intuitiveness. Its functioning is based on the logistic function [ref 5].

In the case of the generic classification problem, the probability that on observation x belong to a class C is computed as \[p(C|x)=\frac{1}{1+e^{-w_{0}-w^{T}x}}\] where w are the weights or model coefficients.

Apache Spark MLlib has two implementations of the logistic regression as a binary classifier 
  • org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS using the L-BFGS optimizer
  • org.apache.spark.mllib.classification.LogisticRegressionWithSGD using the SGD optimizer

Implementation

Data generation

The Apache Spark API documentation (scaladoc) can be found on the Apache Spark API website [ref 6]. To assess and contrast the two implementations of logistic regression, we'll generate a synthetic training set. 
This training set for binomial classification includes:
  • Two datasets of observations, each having 3 features. These follow data distributions with the same standard deviation but different means.
  • Two labels (or expected outcomes) {0, 1}, one corresponding to each Gaussian distribution.
The diagram below displays the training set for a singular feature.


Fig.1 Illustration of two distributions of data for logistic regression

The margin of separation between the two groups of observations of 3 dimension is computed as mean(first group) - mean (second group). As the margin increases the accuracy of the binomial classification is expected to increase. 

final val SIGMA = 2.0

class DataGenerator(numTasks: Int)(implicit ss: SparkSession) {
  def f(mean: Double): Double = mean + SIGMA*(Random.nextDouble - 0.5)

  def apply(half: Int, mu: Double): Array[LabeledPoint] = {
       // 1.  Generates data with 1.0 and mu mean
     val trainObs =ArrayBuffer.fill(half)(Array[Double](f(1.0),f(1.0),f(1.0))) ++
                 ArrayBuffer.fill(half)(Array[Double](f(mu),f(mu),f(mu)))

       // 2. Generate the labels for the two cases
     val labels = ArrayBuffer.fill(half)(0.0) ++ ArrayBuffer.fill(half)(1.0)

       // 3. Generated the labeled data points for training
     labels.zip(trainObs).map { 
         case (y, ar) =>  LabeledPoint(y, new DenseVector(ar))
     }.toArray
  }
}

The method apply generates the two groups of half observations following normal distribution of mean 1.0 and 1.0 + mu. (# 1).
Next we create two sets of labels 0 and 1 (# 2) that are used to generated the Apache Spark labeled points (# 3). 
Apache Spark LogisticRegression classes process LabeledPoint instances which are generated in this particular case from DenseVector wrappers of the observations.

Training

The first step consists of initializing the Apache spark environment, using SparkConf and SparkContext classes. 

val numTasks: Int = 64

val conf = new SparkConf().setAppName("LogitRegr").setMaster(s"local[$numTasks]")

// Instantiate a Spark session so it can be passed as 
// an implicit argument to classes and methods
implicit val sparkSession = SparkSession.builder.config(conf).getOrCreate()
sparkSession.setLogLevel("ERROR")

  // Training and validation code here .....
sparkSession.stop


The next step is to generate the training and validation set. The validation data, validationSet, is used at a later stage for comparing the accuracy of the respective model. 

val halfTrainSet = 32000
val dataGenerator = new DataGenerator(numTasks)(sparkSession)
    
// Split data into training and validation set
val trainSet = dataGenerator(halfTrainSet, mean)
val validationSet = dataGenerator(halfTrainSet, mean)


It is now time to instantiate the two logistic regression classifiers and generate two distinct models. You need to make sure that the parameters (tolerance, number of iterations) are identical for both models.
This implementation uses the Logistic regression from MLlib that uses a pre-canned stochastic gradient descent. A customized gradient descent can be defined by using the standalone SGD class from MLlib.In this example, the optimization parameters are purely arbitrary. MLlib uses RDD as input for training and validation set while the logistic regression in ML uses instances of DataFrame.

val logRegrSGD = new LogisticRegressionWithSGD 
logRegrSGD.optimizer.setNumIterations(1000) 
logRegrSGD.optimizer.setConvergenceTol(0.02) 

// Generate the RDD
val inputRDD = sc.makeRDD(trainingSet, numTasks) 
logisticRegression.setIntercept(true) 
val model = logisticRegression.run(inputRDD)


Validation

Now it is time to use the validation set to compute the mean sum of square error and the accuracy of each predictor for different values of margin.
We need to define and implement a validation framework or class, simple but relevant enough for our evaluation. The first step is to specify the quality metrics as follows
  • metrics produced by the Spark logistic regression
  • muse Mean sum of square errors
  • accuracy accuracy of the classification
The quality metrics are defined in the Quality class as described in the following code snippet.


case class Quality(
   metrics: Array[(Double, Double)], 
   msse: Double, 
   accuracy: Double) {

 override def toString: String =
    s"Metrics: ${metrics.mkString(",")}\n
    |msse = ${Math.sqrt(msse)} accuracy = $accuracy"
}



Let's implement our validation class, BinomialValidation for the binomial classification. The validation is created using the spark context sc, the logistic regression model generated through training and the number of partitions or tasks used in the data nodes.

final class BinomialValidation(
   ss: SparkSession, 
   model: LogisticRegressionModel, 
   numTasks: Int) {

 def metrics(validationSet: Array[LabeledPoint]): Quality = {
   val featuresLabels = validationSet.map( lbPt => 
       (lbPt.label, lbPt.features)).unzip
   val predicted_rdd = model.predict(    
         sc.makeRDD(featuresLabels._2, numTasks)
   )

   // Zip features with labels 
   val scoreAndLabels = sc.makeRDD(featuresLabels._1, numTasks).zip(predicted_rdd)
  
   val successes = scoreAndLabels
               .map{ case(e,p) => Math.abs(e-p) }
               .filter( _ < 0.1)

    // Compute the mean sum of square error s
   val msse = scoreAndLabels
           .map{ case (e,p) => (e-p)*(e-p)}
           .sum

     // Leverage the default Spark classification metrics for binary classifiers
   val metrics = new BinaryClassificationMetrics(scoreAndLabels)

   Quality(metrics.fMeasureByThreshold().collect, 
               msse, 
               successes.count.toDouble/validationSet.length)
  }
}

The method metrics converts the validation set, validationSet into a RDD after segregating the expected values from the observations (unzip). The results of the prediction, prediction_rdd is then zipped with the labeled values into the evaluation set, scoreAndLabels from which the different quality metrics such as successes and muse are extracted.
The computation of metrics is actually performed by the BinaryClassificationMetrics MLlib class. Finally, the validation is applied on the logistic model with a convergence tolerance 0.1

model.setThreshold(0.1)
val validator = new BinomialValidation(sc, model, numTasks)
val quality = validator.metrics(validationSet)



Results
Several studies comparing SGD and BFGS optimizers have been done [ref 7]. The fact that the L-BFGS optimizer provides a significant more accurate result (or lower mean sum of square errors) that the stochastic gradient descent is not a surprise. However, the lack of convergence of the SGD version merit further investigation.

Note: This post is a brief comparison of the two optimizer in terms of accuracy on a simple synthetic data set. It is important to keep in mind that the stochastic gradient descent has better performance overall than L-BFGS or any quasi-Newton method for that matter, because it does not require the estimation of the hessian metric (second order derivative).


Fig 2.  SGD vs. L-BFGS  Mean Sum of Square Errors function of 
margin between mean of Gaussian Distribution




Fig 3.  SGD vs. L-BFGS  Accuracy function of 
margin between mean of Gaussian Distribution


References

[7] Comparing Stochastic Gradient Descent And Batch Gradient Descent
[8
Machine Learning: A probabilistic perspective Chapter 8 Logistic Regression" K. Murphy - MIT Press 2012



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3







Thursday, September 10, 2015

Histogram-Based Approximation in Scala

Target audience: Beginner
Estimated reading time: 4'


Introduction to a simple function approximation algorithm using a dynamically resizable histogram in Scala.


Table of contents


Overview

A typical function approximation consists of finding a model that fit a given data set. Let's consider the following data set {x, y} for which a simple model f: y = f(x) has to be approximated.


The black dots represent the data set and the red line the model y = f(x)
There are multiple approaches to approximate a model or a function to fit a given data set. The list includes

  • Splines
  • Least square regression
  • Levenberg-Marquardt
  • K-nearest neighbors
  • Histogram
  • Polynomial interpolation
  • Logistic regression
  • Neural Networks
  • Tensor products
  • ... and many more
Interpolation using dynamically resizable histograms is a very simple and effective method to approximate a data set.

Histogram class

An histogram consists of array or sequence of bins. A bin is defined with three parameters:
  • _x Center of the bin
  • _y Average value for the bin
  • count Number of data points in the bin
The distribution of histogram bins is illustrated in the following chart

Let's look at an implementation of the Bin class. The constructor takes two values:
  • _x mid point of the bin (or bucket)
  • _y current frequency for this bin
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Bin(
   var _x: Double, 
   var _y: Double =0.0) extends Serializable {
  var count: Int = 0

  def += (y: Double): Unit = {
    val newCount = count + 1
    _y = if(count == 0) y else (_y*count + y)/newCount
    count = newCount
  }
 
  def + (next: Bin): this.type = {
    val newCount = count + next.count
    if(newCount > 0) 
     _y = (_y*count + next._y*next.count)/newCount
    this
  }

  def + (next: Array[Bin]): this.type = {
    val newCount = next.aggregate(count)(
      (s,nxt) => s +nxt.count, _ + _
    )
    if( newCount > 0) {
      _y = next./:(_y*count)(
         (s, nxt) => s + nxt._y*nxt.count)/newCount
      count = newCount
    }
    this
  }
}

The method += (y: Double): Unit adds a new value y for this bin (line 6). It recomputes the average frequency _y (line 8). The method + (next: Bin): this.type (line 12) adds the content of another bin, next to this bin (line 15). Finally, the method + (next: Array[Bin]): this.type (line 19) merges an array of bins into this bin (line 23-26).
Next let's create a class, Histogram (line 1) to manage the array of bins. The constructor for the histogram has four parameters
  • maxNumBins maximum number of bin (line 2)
  • min expected minimum value in the data set (line 3)
  • max expected maximum value in the data set (line 4)
  • optional smoothing weights (line 5)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final class Histogram(
    maxNumBins: Long, 
    var min: Double = -1.0, 
    var max: Double = 1.0, 
    weights: Array[Float] = Array.empty[Float]) {
 
  val initNumBins: Int = (maxNumBins>>RADIX).toInt
  private var step = (max-min)/initNumBins
  
  private[this] var values = Range(0, initNumBins-1)
        .scanLeft(min + 0.5*step)((s, n) => s + step)
        ./:(new ArrayBuffer[Bin])(
          (vals, x) => vals += (new Bin(x)) 
        )
 
  def train(x: Double, y: Double): Unit = {
    <<(x)
    values(index(x)) += y
    if( values.size >= maxNumBins) >>
   }
 
   final def predict(x: Double): Double = {
      if( x < min || x > max) Double.NaN 
      else if(weights.length > 0) smoothPredict(x)
      else values(index(x))._y
   }
    // ... ancillary methods
 }

Implementation

The two main methods are
  • train (line 16) which updates a model (histogram) with each new data point from the training set. The histogram expands when a new data point exceeds the current boundary (min, max) of the histogram (line 19).
  • predict (line 22) which predicts the value y of the new observation x. The prediction relies on an interpolation (weighted moving average) (line 24) in the case the user specifies an array of weights in the histogram constructor.
The method Histogram.index extracts the index of the bin containing a data point (x, y).

1
2
3
4
5
6
private def index(x: Double): Int = {
    val idx = ((x - min)/step).floor.toInt
    if( idx < 1) 0 
    else if (idx >= values.size) values.size -1 
    else idx
}

This implementation of the dynamically resizable histogram, the array of bins expends by adding new bins if the new data point from the training set has a x value that is either greater that the current maximum value or lower than the current minimum value. The width of the bins, step does not change, only the current number of bins. The number of bins expanded until the maximum number of bins, maxNumBins. The method Histogram.<< Implements the expansion of the histogram for a constant bin width.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
private def >>(x: Double): Unit = 
  if(x > max) {
    values.appendAll(generate(max, x))
    max = values.last._x + step*0.5
  }
  else if(x < min) {
    values.prependAll(generate(min, x))
    min = values.head._x - step*0.5
  }
 
final def generate(limit: Double, x: Double): Array[Bin] = 
  if( x > limit) {
     val nBins = ((x-limit)/step).ceil.toInt
     var z = limit - step*0.5
     Array.tabulate(nBins)(_ => {z += step; new Bin(z) })
  }
  else {
     val nBins = ((limit-x)/step).ceil.toInt
     var z = limit + step*0.5
     Array.tabulate(nBins)(_ => {z -= step; new Bin(z) })
  }

Once the maximum number of bins maxNumBins is reached, the histogram is resized by halving the current width of the bin step. The consolidation of the histogram bins is implemented by the method Histogram.>>

1
2
3
4
private def -- : Unit = 
   values = (0 until values.size-1 by 2 ./:(new ArrayBuffer[Bin])( (ab, n) => 
             ab += (values(n) + values(n+1))
   )

Testing

Finally, the predictive method is used to compute the accuracy of the model, through the validate method. The histogram class is tested by loading a data set from file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def validate(
    hist: Histogram, 
    fileName: String, 
    eps: Double): Try[Double] = Try {
  val src = Source.fromFile(fileName)
  val fields = src.getLines.map( _.split(DELIM))
  
  val counts = fields./:((0L, 0L))((s, xy) => 
    if( Math.abs(hist.predict(xy(0).trim.toFloat)-xy(1).toFloat) < eps) 
       (s._1 + 1L, s._2 + 1L) 
    else 
       (s._1, s._2 + 1L)
  )
  
  val accuracy = counts._1.toDouble/counts._2
  src.close
  accuracy
}


References

  • Introduction to Machine Learning Chap 8 Nonparametric methods / Nonparametric density estimation E. Alpaydin- MIT press 2004
  • Scala Cookbook A. Alexander O'Reilly 2013
  • Histogram-based approximation using Apache Spark
  • Introduction to Machine Learning Chap 8 Nonparametric methods / Nonparametric density estimation E. Alpaydin- MIT press 2004
  • github.com/patnicolas

Friday, June 19, 2015

Scala View Bound vs. Context Bound

Target audience: Intermediate
Estimated reading time: 15'


This post illustrates the conversion of class using view bound parameter types into class using context bound types.

Overview

There have been some discussion of deprecating view bound in Scala 2.12. Binding a parameterized type T to a predefined and known type using view (i.e. implicit function conversion) is quite convenient. I have used (or even abuse) this construct in writing machine leaning-based applications.
There are few options for replacing view bound class parameter types with any kind of ad-hoc polymophism techniques. I chose to convert my legacy code by binding the class parameter type to a context instead of a view.

Converting class parameter type binding

First, let's look at binding the parameterized type, T of a class Classifier to the primitive type Double (line 3). The binding is actually implemented by defining an implicit conversion function f: T => Double (line 6) as described in the following code snippet. 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def sqr(x: Double): Double = x*x

class Classifier[T <: AnyVal](
   xt: Vector[Array[T]], 
   yt: Vector[Double])
  (implicit f: T => Double) {
 
 def loss(weights: Array[T]): Double = {
   xt.zip(yt).map{ case(x, y) => 
     y - x.zip(weights).map{ case(_x, w)
       => _x*w}.sum }.map(sqr(_)).sum
}

In this example, the observed features and the model parameters weights have the same type: array of element T (line 8). The input time series xt is a vector of observations and the labels (or expected outcome) yt is a vector of single variable observation of type Double (line 9). The implicit conversion apply to the features and weights. Scala provides developers with shorter alternative notation T <% Uas follows: 

class Classifier[T <% Double](
   xt: Vector[Array[T]], 
   yt: Vector[Double]) { }

Replacing the binding of the class parameters for many classes would be tedious and error prone. One solution is to create a template or pattern that can be applied to several section of the code base.
The process of binding the type T of a class parameter to another known parameter type consists of defining a value (or context) Feature for the type T (line 1). The context binding is an implicit value conversion T => Feature[T] (line 3). 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
trait Feature[T]{ def apply(x: T): Array[Double] }
 
class Classifier[T : Feature](
   xt: Vector[T],
   yt: Vector[Double]) {
  
 def loss(weights: T): Double = {
   xt.zip(yt).map{case(x, y) => {
     val mapper: Feature[T] = implicitly[Feature[T]]
     y - mapper(x).zip(mapper(weights)).map{ case(x, w) =>x*w}.sum
    }}.map(sqr(_)).sum
  }
}

The Feature trait defines the context for the conversion which is actually implemented by the apply method (line 1). The view binding implicit function T => Double defined in the first implementation of the Classifier class is replaced by the mapping (or conversion) method: Feature.apply: T => Array[Double]
The mapping function mapper is implicitly declared using the implicitly.

The notation T: Feature specifies the implicit value, feature: It is equivalent to the more descriptive declaration of the Classifier class: 

1
2
3
class Classifier[T](
   xt: Vector[T],
   yt: Vector[Double])(implicit feature: Feature[T])

Important note
A class parameterized type T cannot be bound to a primitive type P using a context: the developer has to define either an implicit view (T => P) or a context (or wrapper) W for the type T => W[P].

References