Showing posts with label MLlib. Show all posts
Showing posts with label MLlib. Show all posts

Friday, October 9, 2015

Optimizers for Logistic Regression in Apache Spark

Target audience: Intermediate
Estimated reading time: 5'

If you're finding it challenging to choose an optimizer for logistic regression, you're not alone. This article delves into a comparison of various optimizers used in logistic regression within Apache Spark MLlib, providing insights and guidance.

Table of contents
Follow me on LinkedIn

What you will learn: How to select the most appropriate optimizer for the logistic regression in Spark.

Notes:
  • The original article was written using Spark 1.5.1 and reworked with Spark 3.4.0
  • Environments: Spark 3.4.0,  Scala 2.13.1, JDK 12

Overview

This article presents a comparison between the stochastic gradient descent (SGD) and the quasi-Newton Limited memory BFGS (L-BFGS) optimizer for binomial classification using logistic regression in Apache Spark MLlib [ref 1]. The MLlib library in Apache Spark 3.x offers two prominent optimizers for binomial classification through logistic regression:
  • Stochastic Gradient Descent (SGD) [ref 2]
  • The Limited Memory version of the Broyden-Fletcher-Goldfarb-Shanno algorithm (L-BFGS) [ref 3].
Both of these optimizers are explored in detail to understand their application and effectiveness in this context.

SGD

Gradient descent is a repetitive process that begins at a random position on a function f and progressively moves down its slope in increments until it arrives at the function's minimum point.
Given a set of data po ints (vectors) {xi} \[x_{i+1}=x_{i}-\alpha \triangledown f(x_{i}) \ \ \ \alpha :learning\ rate \]
The efficiency of the gradient descent algorithm can be enhanced by incorporating randomness into the selection of data points. Stochastic Gradient Descent (SGD) selects a new data point from the training set at each iteration, significantly reducing computational demands.


L-BFGS

The Broyden-Fletcher-Goldfarb-Shanno (BFGS) algorithm is a process used iteratively to address optimization problems that are nonlinear and unconstrained. This method calculates the direction for descent by using curvature data to adjust the gradient. It operates based on estimating the Hessian matrix, which is associated with the loss function [ref 4].
Given the value at iteration k, xk and the gradient gk \[g_{k}=\triangledown f(x_{k})\] Let's define the following intermediate values:\[s_{k}=x_{k+1}-x_{k} \ \ \ y_{k}=g_{k+1}=g_{k} \ \ \ \rho _{k}=\frac{1}{y_{k}^{b}.s_{k}}\], then he Hessian matrix is updated for the next iteration as \[H_{k+1}=\left ( I-\rho_{k} s_{k}y_{k}^{T}\right ).H_{k}.\left ( I-\rho_{k} y_{k}s_{k}^{T}\right ) +\rho_{k} s_{k}s_{k}^{T}\]The Limited-Memory BFGS is a variant of the Broyden-Fletcher-Goldfarb-Shanno algorithm that streamlines the process by keeping only a small number of vectors. These vectors implicitly stand for the approximation, thereby minimizing the requirement for large memory allocation during the iterative optimization steps.


Logistic regression

Logistic regression is a classification technique designed to categorize observations into distinct classes. There are two varieties of logistic regression models:
  • Binary Classifier: This type categorizes into two exclusive classes.
  • Multi-class Classifier: This type deals with multiple exclusive classes.
Logistic regression is widely favored in discriminative supervised learning due to its simplicity and intuitiveness. Its functioning is based on the logistic function [ref 5].

In the case of the generic classification problem, the probability that on observation x belong to a class C is computed as \[p(C|x)=\frac{1}{1+e^{-w_{0}-w^{T}x}}\] where w are the weights or model coefficients.

Apache Spark MLlib has two implementations of the logistic regression as a binary classifier 
  • org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS using the L-BFGS optimizer
  • org.apache.spark.mllib.classification.LogisticRegressionWithSGD using the SGD optimizer

Implementation

Data generation

The Apache Spark API documentation (scaladoc) can be found on the Apache Spark API website [ref 6]. To assess and contrast the two implementations of logistic regression, we'll generate a synthetic training set. 
This training set for binomial classification includes:
  • Two datasets of observations, each having 3 features. These follow data distributions with the same standard deviation but different means.
  • Two labels (or expected outcomes) {0, 1}, one corresponding to each Gaussian distribution.
The diagram below displays the training set for a singular feature.


Fig.1 Illustration of two distributions of data for logistic regression

The margin of separation between the two groups of observations of 3 dimension is computed as mean(first group) - mean (second group). As the margin increases the accuracy of the binomial classification is expected to increase. 

final val SIGMA = 2.0

class DataGenerator(numTasks: Int)(implicit ss: SparkSession) {
  def f(mean: Double): Double = mean + SIGMA*(Random.nextDouble - 0.5)

  def apply(half: Int, mu: Double): Array[LabeledPoint] = {
       // 1.  Generates data with 1.0 and mu mean
     val trainObs =ArrayBuffer.fill(half)(Array[Double](f(1.0),f(1.0),f(1.0))) ++
                 ArrayBuffer.fill(half)(Array[Double](f(mu),f(mu),f(mu)))

       // 2. Generate the labels for the two cases
     val labels = ArrayBuffer.fill(half)(0.0) ++ ArrayBuffer.fill(half)(1.0)

       // 3. Generated the labeled data points for training
     labels.zip(trainObs).map { 
         case (y, ar) =>  LabeledPoint(y, new DenseVector(ar))
     }.toArray
  }
}

The method apply generates the two groups of half observations following normal distribution of mean 1.0 and 1.0 + mu. (# 1).
Next we create two sets of labels 0 and 1 (# 2) that are used to generated the Apache Spark labeled points (# 3). 
Apache Spark LogisticRegression classes process LabeledPoint instances which are generated in this particular case from DenseVector wrappers of the observations.

Training

The first step consists of initializing the Apache spark environment, using SparkConf and SparkContext classes. 

val numTasks: Int = 64

val conf = new SparkConf().setAppName("LogitRegr").setMaster(s"local[$numTasks]")

// Instantiate a Spark session so it can be passed as 
// an implicit argument to classes and methods
implicit val sparkSession = SparkSession.builder.config(conf).getOrCreate()
sparkSession.setLogLevel("ERROR")

  // Training and validation code here .....
sparkSession.stop


The next step is to generate the training and validation set. The validation data, validationSet, is used at a later stage for comparing the accuracy of the respective model. 

val halfTrainSet = 32000
val dataGenerator = new DataGenerator(numTasks)(sparkSession)
    
// Split data into training and validation set
val trainSet = dataGenerator(halfTrainSet, mean)
val validationSet = dataGenerator(halfTrainSet, mean)


It is now time to instantiate the two logistic regression classifiers and generate two distinct models. You need to make sure that the parameters (tolerance, number of iterations) are identical for both models.
This implementation uses the Logistic regression from MLlib that uses a pre-canned stochastic gradient descent. A customized gradient descent can be defined by using the standalone SGD class from MLlib.In this example, the optimization parameters are purely arbitrary. MLlib uses RDD as input for training and validation set while the logistic regression in ML uses instances of DataFrame.

val logRegrSGD = new LogisticRegressionWithSGD 
logRegrSGD.optimizer.setNumIterations(1000) 
logRegrSGD.optimizer.setConvergenceTol(0.02) 

// Generate the RDD
val inputRDD = sc.makeRDD(trainingSet, numTasks) 
logisticRegression.setIntercept(true) 
val model = logisticRegression.run(inputRDD)


Validation

Now it is time to use the validation set to compute the mean sum of square error and the accuracy of each predictor for different values of margin.
We need to define and implement a validation framework or class, simple but relevant enough for our evaluation. The first step is to specify the quality metrics as follows
  • metrics produced by the Spark logistic regression
  • muse Mean sum of square errors
  • accuracy accuracy of the classification
The quality metrics are defined in the Quality class as described in the following code snippet.


case class Quality(
   metrics: Array[(Double, Double)], 
   msse: Double, 
   accuracy: Double) {

 override def toString: String =
    s"Metrics: ${metrics.mkString(",")}\n
    |msse = ${Math.sqrt(msse)} accuracy = $accuracy"
}



Let's implement our validation class, BinomialValidation for the binomial classification. The validation is created using the spark context sc, the logistic regression model generated through training and the number of partitions or tasks used in the data nodes.

final class BinomialValidation(
   ss: SparkSession, 
   model: LogisticRegressionModel, 
   numTasks: Int) {

 def metrics(validationSet: Array[LabeledPoint]): Quality = {
   val featuresLabels = validationSet.map( lbPt => 
       (lbPt.label, lbPt.features)).unzip
   val predicted_rdd = model.predict(    
         sc.makeRDD(featuresLabels._2, numTasks)
   )

   // Zip features with labels 
   val scoreAndLabels = sc.makeRDD(featuresLabels._1, numTasks).zip(predicted_rdd)
  
   val successes = scoreAndLabels
               .map{ case(e,p) => Math.abs(e-p) }
               .filter( _ < 0.1)

    // Compute the mean sum of square error s
   val msse = scoreAndLabels
           .map{ case (e,p) => (e-p)*(e-p)}
           .sum

     // Leverage the default Spark classification metrics for binary classifiers
   val metrics = new BinaryClassificationMetrics(scoreAndLabels)

   Quality(metrics.fMeasureByThreshold().collect, 
               msse, 
               successes.count.toDouble/validationSet.length)
  }
}

The method metrics converts the validation set, validationSet into a RDD after segregating the expected values from the observations (unzip). The results of the prediction, prediction_rdd is then zipped with the labeled values into the evaluation set, scoreAndLabels from which the different quality metrics such as successes and muse are extracted.
The computation of metrics is actually performed by the BinaryClassificationMetrics MLlib class. Finally, the validation is applied on the logistic model with a convergence tolerance 0.1

model.setThreshold(0.1)
val validator = new BinomialValidation(sc, model, numTasks)
val quality = validator.metrics(validationSet)



Results
Several studies comparing SGD and BFGS optimizers have been done [ref 7]. The fact that the L-BFGS optimizer provides a significant more accurate result (or lower mean sum of square errors) that the stochastic gradient descent is not a surprise. However, the lack of convergence of the SGD version merit further investigation.

Note: This post is a brief comparison of the two optimizer in terms of accuracy on a simple synthetic data set. It is important to keep in mind that the stochastic gradient descent has better performance overall than L-BFGS or any quasi-Newton method for that matter, because it does not require the estimation of the hessian metric (second order derivative).


Fig 2.  SGD vs. L-BFGS  Mean Sum of Square Errors function of 
margin between mean of Gaussian Distribution




Fig 3.  SGD vs. L-BFGS  Accuracy function of 
margin between mean of Gaussian Distribution


References

[7] Comparing Stochastic Gradient Descent And Batch Gradient Descent
[8
Machine Learning: A probabilistic perspective Chapter 8 Logistic Regression" K. Murphy - MIT Press 2012



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3







Saturday, November 29, 2014

Apache Spark/MLlib for K-means

Target audience: Intermediate
Estimated reading time: 5'


This post illustrates the Apache Spark MLlib library with the plain-vanilla K-means clustering (unsupervised) algorithm.


Table of contents


Overview

Apache Spark attempts to address the limitation of Hadoop in terms of performance and real-time processing by implementing in-memory iterative computing, which is critical to most discriminative machine learning algorithms. Numerous benchmark tests have been performed and published to evaluate the performance improvement of Spark relative to Hadoop. In case of iterative algorithms, the time per iteration can be reduced by a ratio of 1:10 or more.
The core element of Spark is Resilient Distributed Datasets (RDD), which is a collection of elements partitioned across the nodes of a cluster and/or CPU cores of servers. An RDD can be created from local data structures such as list, array or hash tables, from the local file system or the Hadoop distributed file system (HDFS).

Note: The code presented in this post uses Apache Spark version 1.3.1. There is no guarantee that the implementation of the K-means in this post will be compatible with future version of Apache Spark.

Apache Spark RDDs

The operations on an RDD in Spark are very similar to the Scala higher order methods. These operations are performed concurrently over each partition. Operations on RDD can be classified as:
* Transformation: convert, manipulate and filter the elements of an RDD on each partition
* Action: aggregate, collect or reduce the elements of the RDD from all partitions

An RDD can persist, be serialized and cached for future computation. Spark provides a large array of pre-built transforms and actions which go well beyond the basic map-reduce paradigm. Those methods on RDDs are a natural extension of the Scala collections making code migration seamless for Scala developers.

Apache Spark supports fault-tolerant operations by allowing RDDs to persist both in memory and in the file systems. Persistency enables automatic recovery from node failures. The resiliency of Spark relies on the supervisory strategy of the underlying Akka actors, the persistency of their mailboxes and replication schemes of HDFS.
Spark is initialized through its context. For instance, a local Spark deployment on 8 cores, with 2 Gbytes allocated for data processing (RDDs) in memory only storage level and 512 Mbytes for the master process is defined by creating a spark configuration instance of type SparkConf

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
 
val sparkConf = new SparkConf()
            .setMaster("local[8]")
            .setAppName("SparkKMeans")
            .set("spark.executor.memory", "2048m")
            .set("spark.storageLevel", "MEMORY_ONLY")
            .set("spark.driver.memory", "512M")
            .set("spark.default.parallelism", "16")
 
implicit val sc = new SparkContext(sparkConf))



Apache Spark MLlib

MLlib is a scalable machine learning library built on top of Spark. As of version 1.0, the library is a work in progress. The main components of the library are:
  • Classification algorithms, including logistic regression, Naïve Bayes and support vector machines
  • Clustering limited to K-means in version 1.0
  • L1 & L1 Regularization
  • Optimization techniques such as gradient descent, logistic gradient and stochastic gradient descent and L-BFGS
  • Linear algebra such as Singular Value Decomposition
  • Data generator for K-means, logistic regression and support vector machines.
The machine learning byte code is conveniently included in the spark assembly jar file built with the simple build tool, sbt.
Let's consider the K-means clustering components bundled with Apache Spark MLlib. The K-means configuration parameters are:
  • K Number of clusters (line 4)
  • maxNumIters Maximum number of iterations for the minimizing the reconstruction error< (line 5)/li>
  • numRuns Number of runs or episode used for training the clusters (line 6)
  • caching Specify whether the resulting RDD has to be cached in memory (line 7)
  • xt The array of data points (type Array[Double]) (line 8)
  • sc Implicit Spark context
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
 
class SparkKMeans(
    K: Int, 
    maxNumIters: Int, 
    numRuns: Int,
    caching: Boolean,
    xt: Array[Array[Double]]) (implicit sc: SparkContext) {
   
 
  def train: Try[KMeansModel] = {
    val kmeans = new KMeans
    kmeans.setK(K)
    kmeans.setMaxIterations(maxNumIters)
    kmeans.setRuns(numRuns)
   
    val rdd = sc.parallelize(xt.map(new DenseVector(_)))
    rdd.persist(StorageLevel.MEMORY_ONLY)
    if( caching )
       rdd.cache
    kmeans.run(rdd)
  }
}

The clustering model is created by the train method (line 11). Once the Spark/MLlib K-means is instantiated and initialized (lined 12 -15), the ipnt data set xt is converted into a DenseVector then converted into a RDD (line 17). Finally the input RDD is fed to the Kmeans (kmeans.run)