Showing posts with label Stochastic gradient descent. Show all posts
Showing posts with label Stochastic gradient descent. Show all posts

Wednesday, February 7, 2018

Implementing AdaGrad Optimizer in Spark

Target audience: Intermediate
Estimated reading time: 5'

Have you ever desired a Gradient Descent approach with greater flexibility? If that's the case, Adaptive Gradient Descent (AdaGrad) could be a suitable alternative.


What you will learn: The workings of the Adaptive Gradient algorithm, how it improves upon Stochastic Gradient Descent, and its implementation using Apache Spark.



Table of contents
Follow me on LinkedIn
Notes
  • This post assumes that reader has rudimentary knowledge of the Scala API of Apache Spark and basic understanding of machine learning.
  • The code associated with this article is written using Scala 2.12.11 and Spark 3.1.0

Background

The optimization algorithm known as Stochastic Gradient Descent (SGD) is frequently employed to minimize the loss function during the training of various machine learning models, including support vector machines, logistic regression, and back-propagation neural networks. Typically, in its most basic form, SGD calculates the gradient using a singular learning rate.

It's quite usual to find that the features of a model exhibit significant variance across different observations. Under such circumstances, an adaptive gradient (AdaGrad) algorithm that assigns a unique learning rate to each feature could be an effective solution. There exist numerous methods to develop an algorithm that assigns a distinct learning rate to every feature. 
Accidentally, Spark ML library, MLlib, does not include an implementation of AdaGrad.
This article discusses the AdaGrad algorithm and an implementation for Apache Spark.

Stochastic Gradient Descent (SGD)

Apache Spark stands out as a rapid and versatile cluster computing system, offering advanced APIs in Java, Scala, Python, and R, along with an enhanced engine capable of supporting general execution graphs.

Within the Apache Spark ecosystem lies MLlib, a machine learning library. The stochastic gradient descent optimizer within this library serves as a randomized variant of the (batched) gradient descent algorithm, which is employed for minimizing a continuous differentiable objective function. In the realm of supervised machine learning, this objective function typically manifests as a loss function, such as logistic loss, sum of least squares, and others. \[L(w)=\frac{1}{n}\sum_{I=1}^{n}(y_{i}-f(x_{i}|w))^{2}\] The objective function L is expressed as the summation of differentiable functions. In supervised learning, the loss related to a specific feature is defined as a continuous, differentiable, convex function. \[L(w)=\sum_{i=1}^{n}L_{i}(w)\] In supervised learning, the vector w represents the vector of weights (or model parameters). At each iteration of the stochastic gradient descent, the weights are updated using the formula \[w_{t+1}=w_{t}-\eta \sum_{i=0}^{n}\frac{\partial L}{\partial w_{i, t}}\] Stochastic Gradient Descent (SGD) aims to reduce the loss function, which represents the difference between the model's predicted values and the expected values. In each iteration, SGD picks a small group of observations, referred to as a mini-batch, for the model's training. This repetitive procedure is designed to progressively approach the true global minimum of the loss function.

Adaptive Gradient Descent

The core concept of AdaGrad revolves around the strategy of boosting the learning rate for sparse features (or model parameters) while reducing it for more dense features. As a result, AdaGrad enhances the efficiency of converging towards the minimum loss in models with sparse features, particularly when these sparse features hold significant information. \[w_{t+1}=w_{t} -\frac{1}{\sqrt{\sum_{t=1}^{T}\bigtriangledown _{ti}^{t} + \varepsilon }}\frac{\partial L}{\partial w_{ti}}\]

SGD in Apache Spark

The Apache spark MLlib library has two implementations of SGD
  • Generic Gradient Descent and related classes in the mllib.optimization package
  • SGD bundled with classifier or regression algorithms such as LogisticRegressionWithSGD, LassoWithSGD, SVMWithSGD or RidgeRegressionWithSGD

We plan to utilize the optimization package to modify the stochastic gradient descent. Our goal is to use the mllib.optimization.GradientDescent template class from MLlib and create a custom Updater that implements an adaptive gradient with per-feature learning rates.

This Updater is responsible for "updating the model's weights" (in models like Logistic Regression or SVM) by multiplying the current learning rate with the partial derivative of the loss with respect to each weight, as previously described. We'll name our custom Updater as AdaGradUpdater, which will handle the model weight updates using the adaptive gradient approach. Following this, we'll initialize the SGD in the following manner.
   val adaSGD = new GradientDescent.
                    .setUpdater(new AdaGradUpdater)
                    .setStepSize(0.01)
                    . .....
The class AdaGradUpdater has to implement the generic compute method
  Updater.compute(
      oldWeights: Vector, 
      gradient: Vector, 
      stepSize: Double, 
      iter: Int, 
      regCoefs: Double
  ): (Vector, Double)
The method returns the tuple (vector of new weights, loss). Let's implement the AdaGrad algorithm


Implementation of AdaGrad

As mentioned earlier, the implementation of AdaGrad consists of overriding the method Updater.compute.

The computation of the learning rate requires us to record the past values of the square value of the gradient (previous steps) for this particular weight, in the array gradientHistory (line 2). First we define the method += to update the gradient history (lines 26-367. The first call to the method creates the gradient history (line 30).
The next step consists of converting the existing (old) weights into a Breeze dense vector brzWeights (line 13). The array of the new learning rates is computed as the inverseVelocity coefficient (line 40).

The learning rates are zipped with the old weights (line 14) to update the weights newWeights as a new dense vector(line 15-17). The linear algebra (matrix computation) on the Spark data node is actually performed by the LINPACK library under the cover through calls to brzAxpy (line 20) and brzNorm (line 21).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 final class AdaGradL2Updater(dimension: Int) extends Updater {
  var gradientsHistory: Array[Double] = _

  override def compute(
       weightsOld: Vector,
       gradient: Vector,
       stepSize: Double,
       iter: Int,
       regParam: Double
  ): (Vector, Double) = {

    +=(gradient)
    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
    val sumSquareDerivative = inverseVelocity.zip(brzWeights.toArray)
    val newWeights: BV[Double] = 
       new DenseVector[Double](sumSquareDerivative.view.map {
          case (coef, weight) => weight * (1.0 -regParam * coef)
       }

    brzAxpy(-1.0, gradient.toBreeze, newWeights)
    val norm = brzNorm(brzWeights, 2.0)

    (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
  }


  private def +=(gradient: Vector): Unit = {
    val grad = gradient.toArray
    
    grad.view.zipWithIndex.foreach {
      case (g, index) => {
        if(gradientsHistory == null)
          cgradientsHistory = Array.fill(grad.length)(0.0)

        val existingGradient = gradientsHistory(index)
        gradientsHistory.update(index, existingGradient + g*g)
      }
    }
  }


  def inverseVelocity = gradientsHistory.map(1.0/Math.sqrt(_))
}

Analysis

Given a logistic regression model to classifier of time series pattern, we compute and compare the training binary cross-entropy loss without regularization using the default MLlib SGD and our implementation of AdaGrad.

The AdaGrad algorithm has its limitation. The accumulation can grow very large over time, causing the learning rate to shrink and become infinitesimally small, which effectively stops the parameter from updating.

AdaGrad is not the only alternative to traditional SGD. RMS and Adam are worth investigating. 

Thank you for reading this article. For more information ...

References


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Friday, October 9, 2015

Optimizers for Logistic Regression in Apache Spark

Target audience: Intermediate
Estimated reading time: 5'

If you're finding it challenging to choose an optimizer for logistic regression, you're not alone. This article delves into a comparison of various optimizers used in logistic regression within Apache Spark MLlib, providing insights and guidance.

Table of contents
Follow me on LinkedIn

What you will learn: How to select the most appropriate optimizer for the logistic regression in Spark.

Notes:
  • The original article was written using Spark 1.5.1 and reworked with Spark 3.4.0
  • Environments: Spark 3.4.0,  Scala 2.13.1, JDK 12

Overview

This article presents a comparison between the stochastic gradient descent (SGD) and the quasi-Newton Limited memory BFGS (L-BFGS) optimizer for binomial classification using logistic regression in Apache Spark MLlib [ref 1]. The MLlib library in Apache Spark 3.x offers two prominent optimizers for binomial classification through logistic regression:
  • Stochastic Gradient Descent (SGD) [ref 2]
  • The Limited Memory version of the Broyden-Fletcher-Goldfarb-Shanno algorithm (L-BFGS) [ref 3].
Both of these optimizers are explored in detail to understand their application and effectiveness in this context.

SGD

Gradient descent is a repetitive process that begins at a random position on a function f and progressively moves down its slope in increments until it arrives at the function's minimum point.
Given a set of data po ints (vectors) {xi} \[x_{i+1}=x_{i}-\alpha \triangledown f(x_{i}) \ \ \ \alpha :learning\ rate \]
The efficiency of the gradient descent algorithm can be enhanced by incorporating randomness into the selection of data points. Stochastic Gradient Descent (SGD) selects a new data point from the training set at each iteration, significantly reducing computational demands.


L-BFGS

The Broyden-Fletcher-Goldfarb-Shanno (BFGS) algorithm is a process used iteratively to address optimization problems that are nonlinear and unconstrained. This method calculates the direction for descent by using curvature data to adjust the gradient. It operates based on estimating the Hessian matrix, which is associated with the loss function [ref 4].
Given the value at iteration k, xk and the gradient gk \[g_{k}=\triangledown f(x_{k})\] Let's define the following intermediate values:\[s_{k}=x_{k+1}-x_{k} \ \ \ y_{k}=g_{k+1}=g_{k} \ \ \ \rho _{k}=\frac{1}{y_{k}^{b}.s_{k}}\], then he Hessian matrix is updated for the next iteration as \[H_{k+1}=\left ( I-\rho_{k} s_{k}y_{k}^{T}\right ).H_{k}.\left ( I-\rho_{k} y_{k}s_{k}^{T}\right ) +\rho_{k} s_{k}s_{k}^{T}\]The Limited-Memory BFGS is a variant of the Broyden-Fletcher-Goldfarb-Shanno algorithm that streamlines the process by keeping only a small number of vectors. These vectors implicitly stand for the approximation, thereby minimizing the requirement for large memory allocation during the iterative optimization steps.


Logistic regression

Logistic regression is a classification technique designed to categorize observations into distinct classes. There are two varieties of logistic regression models:
  • Binary Classifier: This type categorizes into two exclusive classes.
  • Multi-class Classifier: This type deals with multiple exclusive classes.
Logistic regression is widely favored in discriminative supervised learning due to its simplicity and intuitiveness. Its functioning is based on the logistic function [ref 5].

In the case of the generic classification problem, the probability that on observation x belong to a class C is computed as \[p(C|x)=\frac{1}{1+e^{-w_{0}-w^{T}x}}\] where w are the weights or model coefficients.

Apache Spark MLlib has two implementations of the logistic regression as a binary classifier 
  • org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS using the L-BFGS optimizer
  • org.apache.spark.mllib.classification.LogisticRegressionWithSGD using the SGD optimizer

Implementation

Data generation

The Apache Spark API documentation (scaladoc) can be found on the Apache Spark API website [ref 6]. To assess and contrast the two implementations of logistic regression, we'll generate a synthetic training set. 
This training set for binomial classification includes:
  • Two datasets of observations, each having 3 features. These follow data distributions with the same standard deviation but different means.
  • Two labels (or expected outcomes) {0, 1}, one corresponding to each Gaussian distribution.
The diagram below displays the training set for a singular feature.


Fig.1 Illustration of two distributions of data for logistic regression

The margin of separation between the two groups of observations of 3 dimension is computed as mean(first group) - mean (second group). As the margin increases the accuracy of the binomial classification is expected to increase. 

final val SIGMA = 2.0

class DataGenerator(numTasks: Int)(implicit ss: SparkSession) {
  def f(mean: Double): Double = mean + SIGMA*(Random.nextDouble - 0.5)

  def apply(half: Int, mu: Double): Array[LabeledPoint] = {
       // 1.  Generates data with 1.0 and mu mean
     val trainObs =ArrayBuffer.fill(half)(Array[Double](f(1.0),f(1.0),f(1.0))) ++
                 ArrayBuffer.fill(half)(Array[Double](f(mu),f(mu),f(mu)))

       // 2. Generate the labels for the two cases
     val labels = ArrayBuffer.fill(half)(0.0) ++ ArrayBuffer.fill(half)(1.0)

       // 3. Generated the labeled data points for training
     labels.zip(trainObs).map { 
         case (y, ar) =>  LabeledPoint(y, new DenseVector(ar))
     }.toArray
  }
}

The method apply generates the two groups of half observations following normal distribution of mean 1.0 and 1.0 + mu. (# 1).
Next we create two sets of labels 0 and 1 (# 2) that are used to generated the Apache Spark labeled points (# 3). 
Apache Spark LogisticRegression classes process LabeledPoint instances which are generated in this particular case from DenseVector wrappers of the observations.

Training

The first step consists of initializing the Apache spark environment, using SparkConf and SparkContext classes. 

val numTasks: Int = 64

val conf = new SparkConf().setAppName("LogitRegr").setMaster(s"local[$numTasks]")

// Instantiate a Spark session so it can be passed as 
// an implicit argument to classes and methods
implicit val sparkSession = SparkSession.builder.config(conf).getOrCreate()
sparkSession.setLogLevel("ERROR")

  // Training and validation code here .....
sparkSession.stop


The next step is to generate the training and validation set. The validation data, validationSet, is used at a later stage for comparing the accuracy of the respective model. 

val halfTrainSet = 32000
val dataGenerator = new DataGenerator(numTasks)(sparkSession)
    
// Split data into training and validation set
val trainSet = dataGenerator(halfTrainSet, mean)
val validationSet = dataGenerator(halfTrainSet, mean)


It is now time to instantiate the two logistic regression classifiers and generate two distinct models. You need to make sure that the parameters (tolerance, number of iterations) are identical for both models.
This implementation uses the Logistic regression from MLlib that uses a pre-canned stochastic gradient descent. A customized gradient descent can be defined by using the standalone SGD class from MLlib.In this example, the optimization parameters are purely arbitrary. MLlib uses RDD as input for training and validation set while the logistic regression in ML uses instances of DataFrame.

val logRegrSGD = new LogisticRegressionWithSGD 
logRegrSGD.optimizer.setNumIterations(1000) 
logRegrSGD.optimizer.setConvergenceTol(0.02) 

// Generate the RDD
val inputRDD = sc.makeRDD(trainingSet, numTasks) 
logisticRegression.setIntercept(true) 
val model = logisticRegression.run(inputRDD)


Validation

Now it is time to use the validation set to compute the mean sum of square error and the accuracy of each predictor for different values of margin.
We need to define and implement a validation framework or class, simple but relevant enough for our evaluation. The first step is to specify the quality metrics as follows
  • metrics produced by the Spark logistic regression
  • muse Mean sum of square errors
  • accuracy accuracy of the classification
The quality metrics are defined in the Quality class as described in the following code snippet.


case class Quality(
   metrics: Array[(Double, Double)], 
   msse: Double, 
   accuracy: Double) {

 override def toString: String =
    s"Metrics: ${metrics.mkString(",")}\n
    |msse = ${Math.sqrt(msse)} accuracy = $accuracy"
}



Let's implement our validation class, BinomialValidation for the binomial classification. The validation is created using the spark context sc, the logistic regression model generated through training and the number of partitions or tasks used in the data nodes.

final class BinomialValidation(
   ss: SparkSession, 
   model: LogisticRegressionModel, 
   numTasks: Int) {

 def metrics(validationSet: Array[LabeledPoint]): Quality = {
   val featuresLabels = validationSet.map( lbPt => 
       (lbPt.label, lbPt.features)).unzip
   val predicted_rdd = model.predict(    
         sc.makeRDD(featuresLabels._2, numTasks)
   )

   // Zip features with labels 
   val scoreAndLabels = sc.makeRDD(featuresLabels._1, numTasks).zip(predicted_rdd)
  
   val successes = scoreAndLabels
               .map{ case(e,p) => Math.abs(e-p) }
               .filter( _ < 0.1)

    // Compute the mean sum of square error s
   val msse = scoreAndLabels
           .map{ case (e,p) => (e-p)*(e-p)}
           .sum

     // Leverage the default Spark classification metrics for binary classifiers
   val metrics = new BinaryClassificationMetrics(scoreAndLabels)

   Quality(metrics.fMeasureByThreshold().collect, 
               msse, 
               successes.count.toDouble/validationSet.length)
  }
}

The method metrics converts the validation set, validationSet into a RDD after segregating the expected values from the observations (unzip). The results of the prediction, prediction_rdd is then zipped with the labeled values into the evaluation set, scoreAndLabels from which the different quality metrics such as successes and muse are extracted.
The computation of metrics is actually performed by the BinaryClassificationMetrics MLlib class. Finally, the validation is applied on the logistic model with a convergence tolerance 0.1

model.setThreshold(0.1)
val validator = new BinomialValidation(sc, model, numTasks)
val quality = validator.metrics(validationSet)



Results
Several studies comparing SGD and BFGS optimizers have been done [ref 7]. The fact that the L-BFGS optimizer provides a significant more accurate result (or lower mean sum of square errors) that the stochastic gradient descent is not a surprise. However, the lack of convergence of the SGD version merit further investigation.

Note: This post is a brief comparison of the two optimizer in terms of accuracy on a simple synthetic data set. It is important to keep in mind that the stochastic gradient descent has better performance overall than L-BFGS or any quasi-Newton method for that matter, because it does not require the estimation of the hessian metric (second order derivative).


Fig 2.  SGD vs. L-BFGS  Mean Sum of Square Errors function of 
margin between mean of Gaussian Distribution




Fig 3.  SGD vs. L-BFGS  Accuracy function of 
margin between mean of Gaussian Distribution


References

[7] Comparing Stochastic Gradient Descent And Batch Gradient Descent
[8
Machine Learning: A probabilistic perspective Chapter 8 Logistic Regression" K. Murphy - MIT Press 2012



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3