Monday, August 8, 2016

Encode Spark ML Pipeline Features

Target audience: Intermediate
Estimated reading time: 6'



Table of contents
Follow me on LinkedIn

Apache spark introduced machine learning (ML) pipeline in version 1.4.0. A pipeline is actually a workflow or sequence of tasks that cleanse, filter, train, classify, predict and validate data set. Those tasks are defined as stage of the pipeline. 
Spark 2.0 extends ML pipelines to support persistency, while the MLlib package is slowly deprecated in favor of the data frame and data set based ML library.
ML pipeline allows data scientist to weave transformers and estimators into a single monotonic classification and predictive models.

This first post related to ML pipeline implement a feature encoding pipeline

ML pipelines 101

This section is a very brief overview of ML pipelines. Further information is available at 
The key ingredient of a ML pipeline are 
  • Transformers are algorithms which can transform one DataFrame into another DataFrame. Transformers are stateless
  • Estimators are algorithm which can be fit on a DataFrame to produce a Transformer (i.e. Estimator.fit)
  • Pipelines are estimators that weave or chain multiple Transformers and Estimators together to specify an ML workflow
  • Pipeline stages are the basic element of the ML workflow. Transformers, estimators and pipelines are pipeline stages
  • Parameters encapsulate the tuning and configuration parameters requires for each Transformers and Estimators.
Note: The following implementation has been tested using Apache Spark 2.0

Features encoding pipeline

Categorical features have multiple values or instances which are represented as a string. Numerical value as categorized or bucketed through a conversion to a string. 
Once converted to a string, the categorical values are converted into category indices using the StringIndex transformer. The resulting sequence of indices is ranked by decreasing order of frequency of the value in the training or validation set.
Next, the indices associated to particular features are encoded into a vector of binary value (0, 1) through the OneHotEncoder transformer. A feature instance is encoded as 1 if is defined in the data point, 0 otherwise.

Finally, the vector of binary values of all the feature are aggregated or assembled through the transformer VectorAssembler
Let's consider the following simple use case: A sales report list the date an item is sold, its id, the sales region and name of the sales person or agent.
   date         id         region    agent
   ---------------------------------------
   07/10/2014   23c9a89d   17        aa4
The encoding pipeline is illustrated in the following diagram:


The data source is a CSV sales report file. Each column or feature is converted to a string index, then encoded as a vector of binary values. Finally, the 4 vectors are assembled into a single features vector.

Scala implementation

The first step is to create a Spark 2.x session. Let's wrap the spark session configuration into a single, reusable trait, SparkSessionManager ,

trait SparkSessionManager {
  protected[this] val sparkSession = SparkSession.builder()
    .appName("LR-Pipeline")
    .config(new SparkConf()
    .set("spark.default.parallelism", "4")
    .set("spark.rdd.compress", "true")
    .set("spark.executor.memory", "8g")
    .set("spark.shuffle.spill", "true")
    .set("spark.shuffle.spill.compress", "true")
    .set("spark.io.compression.codec", "lzf")
    .setMaster("local[4]")).getOrCreate()

  protected def csv2DF(dataFile: String): DataFrame =
    sparkSession.read.option("header", true)
                    .option("inferSchema", true)
                    .csv(dataFile)

  def stop: Unit = sparkSession.stop
}

The SparkSession is instantiated with the same configuration SparkConf as the SparkContext used in Spark 0.x and 1.x versions.
The method csv2DF loads the content of CSV file and generate a data frame or data set. 

The next step consists of creating the encoding workflow or pipeline, wrapped into the DataEncoding

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
trait DataEncoding {
  protected[this] val colNames: Array[String]
  lazy val vecColNames = colNames.map(vector(_))
  
  lazy val pipelineStages: Array[PipelineStage] =
    colNames.map(colName => new StringIndexer()
       .setInputCol(colName)
       .setOutputCol(index(colName))) ++
    colNames.map(colName => new OneHotEncoder()
       .setInputCol(index(colName))
       .setOutputCol(vector(colName))) ++
    Array[PipelineStage](new VectorAssembler()
       .setInputCols(vecColNames).setOutputCol("features"))

  def index(colName: String): String = s"${colName}Index"
  def vector(colName: String): String = s"${colName}Vector"
}

The sequence of names of the columns (features) colNames is an abstract value that needs to be defined for the specific training set (line 2). The first two stages of the data encoding pipeline, String indexing (line 6-8) and encoding (line 9-11) are applied to each of the 4 features. The last stage, assembling the features vector (line 12, 13) is added to the array of the previous stages to complete the pipeline. Each stage is defined by its input column setInputCol and output column/feature setOutputCol.

Let's leverage the Spark session and the data encoding to generate a classification model, ModelFactory. Classification and predictive models are built using ML pipelines as described in a future post. For now let's create a pipeline model using the data encoding stages .

val dataEncoder = new DataEncoding {
  override protected[this] val colNames: Array[String]= 
    Array[String]("date", "id", "region", "agent")
   
  def pipelineModel(df: DataFrame): PipelineModel = 
     new Pipeline().setStages(pipelineStages).fit(df)
  }
  

The pipeline model is generated in two steps:
  • Instantiate the pipeline from the date encoding stages
  • Generate the model from a input or training data frame, df by invoking the fit method.
The next post extends the data encoding pipeline with two estimators: a classifier and a cross validator.

Friday, July 1, 2016

Monte Carlo Integration in Scala

Target audience: Intermediate
Estimated reading time: 5'


This post introduces an overlooked numerical integration method leveraging the ubiquitous Monte Carlo simulation.


Table of contents
Follow me on LinkedIn

What you will learn: How to apply Monte Carlo method to compute intractable integral.

Notes:
  • This article assumes the reader has some basic knowledge of the Scala programming language
  • Scala version: 2.12.4

Introduction

Some functions do not have a closed-form solution for calculating a definite integral, a process called symbolic integration. For such cases, numerous numerical integration techniques are available for continuous functions. Examples include Simpson's formula, the Newton-Cotes quadrature rule, and Gaussian quadrature. These approaches are deterministic in their execution.

On the other hand, the Monte Carlo method of numerical integration adopts a stochastic approach. It uses randomly chosen values to approximate the area under a curve, across a surface, or within any multidimensional space [ref 1]. Demonstrations of the Monte Carlo integration technique often involve applying a uniform random distribution of data points to estimate the integral of a single-variable function.

Basic concept

Let's consider the single variable function illustrated in the following line plot.
\[f(x)=\frac{1}{x}-0.5\]
Definition of outer area for Monte Carlo random simulation

Our goal is to calculate the integral of a function across the range [1, 3]. The Monte Carlo method for numerical integration involves three key steps [ref 2]:
  1. Establish the bounding area for the function based on the minimum and maximum values of x and y. In this instance, the x-values range from [1, 3], and the y-values span from [0.5, -0.12].
  2. Produce random data points distributed within this outer area.
  3. Determine the proportion of these random data points that fall within the area defined by the function, relative to the total number of generated points.

Scala implementation

This implementation relies on Scala features available in 2.11 and later version [ref 3].
Let's define a generic class,
MonteCarloIntegrator to encapsulate these 3 steps:
The class has two arguments (line 1)
  • The function f to sum
  • The number of random data points used in the methodology
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MonteCarloIntegrator(f: Double => Double, numPoints: Int) {

  def integrate(from: Double, to: Double): Double = {
    val (min, max) = getBounds(from, to)
    val width = to - from
    val height = if (min >= 0.0) max else max - min
    val outerArea = width * height
    val randomx = new Random(System.currentTimeMillis)
    val randomy = new Random(System.currentTimeMillis + 42L)

    def randomSquare(randomx: Random, randomy: Random): Double = {
      val numInsideArea = Range(0, numPoints).foldLeft(0)(
        (s, n) => {
        val ptx = randomx.nextDouble * width + from
        val pty = randomy.nextDouble * height
        randomx.setSeed(randomy.nextLong)
        randomy.setSeed(randomx.nextLong)

        s + (if (pty > 0.0 && pty < f(ptx)) 1 
            else if (pty < 0.0 && pty > f(ptx)) -1 
            else 0)
      }
     )
      numInsideArea.toDouble * outerArea / numPoints
    }
    randomSquare(randomx, randomy)
  }
}

The method integrate implements the sum of the function over the interval [from, to] (line 3). The first step is to extract the bounds getBounds of the outer area (line 4) which size is computed on line 7. Each coordinate is assigned a random generator randomx and randomy (lines 8 & 9).
The nested method randomSquare records the number of data points, numInsideArea that falls into the area delimited by the function (line 13 - 21). The sum is computed as the relative number of data points inside the area delimited by the function (line 24).


The method getBounds is described in the following code snippet. It is a simple, although not particularly efficient approach to extract the boundary of the integration. It breaks down the interval into of steps (lines 2 & 3) and collects the minimum and maximum values of the function (lines 7 - 12). 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def getBounds(from: Double, to: Double): (Double, Double) = {
  val numSteps = Math.sqrt(numPoints).floor.toInt
  val stepSize = (to - from) / numSteps

  (0 to numSteps).foldLeft(Double.MaxValue, -Double.MaxValue))(
    (minMax, n) => {
      val y = f(n * stepSize + from)
      updateBounds(y, minMax) match {
        case 0x01 => (y, minMax._2)
        case 0x02 => (minMax._1, y)
        case 0x03 => (y, y)
        case _ => minMax
      }
    }
  )
}


def updateBounds(y: Double, minMax: (Double,Double)): Int = {
   var flag = 0x00
 
   if (y < minMax._1) flag += 0x01
   if (y > minMax._2) flag += 0x02
   flag
}


Precision

You may wonder about the accuracy of the Monte Carlo method and how many randomly generated data points are needed for a decent accuracy. Let's consider the same function \[f(x)=\frac{1}{x}-0.5\] and its indefinite integral, that is used to generate the expected sum for the function f \[\int f(x)dx=log(x)-\frac{1}{2x}+C\] The simple test consists of computing the error between the value produced by the definite integral and the sum from the Monte Carlo method as implemented in the following code snippet. 

val fct =(x: Double) => 2 * x - 1.0
val integral = (x: Double, c: Double) => x*x - x + c)

final val numPoints=10000
val integrator = new MonteCarloIntegrator(fct, numPoints)

val predicted = monteCarloIntegrator.integrate(1.0, 2.0)
val expected = integral(2.0, 0.0) - integral(1.0, 0.0)

Let's run the test 100 times for number of random data points varying between 100 and 50,000.


The Monte Carlo method can achieve a reasonable level of accuracy even with a limited number of data points. In this scenario, the marginal gain in precision doesn't warrant generating a large quantity of random data points beyond 1000.

A more efficient strategy would be to use a termination condition that concludes the summation process as quickly as possible, eliminating the need to guess the ideal number of random data points. In each iteration, a new batch of random data points, totaling 'numPoints', is introduced. One straightforward method for determining convergence is to compare the difference in the sum between two successive iterations:
  sum(existing data points + new data points) - sum(existing data points) < eps
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def randomSquare(randomx: Random, randomy: Random): Double = {
  var oldValue = 0.0

  Range(0, numPoints).takeWhile(
      _ => {
         val numInsideArea = ....
          // ...
            s + ....
          })
   
       val newValue = numInsideArea.toDouble * outerArea / numPoints
       val diff = Math.abs(newValue - oldValue) 
    
      oldValue = newValue
      diff < eps
    }
  )
  oldValue
}
In each iteration, we randomly generate a fresh batch of 'numPoints' data points to improve the accuracy of the overall summation. The termination of this process is governed by the 'takeWhile' method, a higher-order function used in Scala collections (as seen in lines 3 & 12).
It's important to note that this implementation of Monte Carlo integration is straightforward and serves to demonstrate the application of stochastic methods in calculus. For functions with significant inflection points (characterized by extreme second-order derivatives), recursive stratified sampling has proven to be more precise in calculating definite integrals.

References

[1Monte Carlo Integration Dartmouth College - C. Robert, G. Casella
[2]  The Clever Machine: Monte Carlo Approximations Dustin Stansbury
[3] Programming in Scala - 3rd edition M. Odersky, L. Spoon, B. Venners
github.com/patnicolas



-------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3

Thursday, June 16, 2016

Analyze Scala Performance Using Javap

Target audience: Beginner
Estimated reading time: 4'

Table of contents
Overview
Follow me on LinkedIn

Overview

As mentioned in a previous post, iterators in Scala, such as foreach, for & while loop have different performance characteristics. A recurrent question between developers is the relative performance of Scala and Java regarding for and while.

The "for comprehension" closure in Scala is indeed a syntactic sugar wraps around a sequence of flatMap and map methods, Monad in Practice. It is expected to be significantly slower and should not be used as an iterator.
We select the higher method foreach to implement the for iterator in Scala.


Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.

Evaluation

The simple evaluation consists of processing a large number of iterations of a very simple statement which execution does not interfere with the actual performance of the iterator. The code is also is easy to understand. The code for the while loop is described below.

def testRun(numIterations: Int) : Unit = {
  val startTime = System.currentTimeMillis
  var sum: Long = 0
  var i: Int = 0

  while(i < numIterations) {
    var j = 0
    while(j  < 1000) {
      sum += 1
      j += 1
    }
     i += 1
  }
  Console.printf(s"${(System.currentTimeMillis-startTime)}")
}

The test is executed on a 4 cores Intel i7 CPU with java 1.7.02 64-bit and Scala 2.10.2. The chart below, compares the duration of the execution of the for iterator for Java and Scala.


The performance of Scala and Java for executing the while loop are very similar.

The second test consists of comparing the relative performance of Java for loop and Scala foreach higher order method.

def testRun(numIterations: Int) : Unit = {
    var sum: Long = 0
    (0 until numIterations.foreach( sum += _)
}




Analysis using Javap

The first step is to analyze the number of byte-code instructions generated by the Scala compiler. We use the Java Class File Disassembler,  javap, to print out the actual instructions processed by the Java virtual machine.
            javap -c -verbose xxx.class
The sequence of instructions generated for the execution of the while loops are displayed below.

0:   lconst_0
1:   lstore_1
2:   iconst_0
3:   istore_3
4:   iload_3
5:   sipush  1000
8:   if_icmpge       42
11:  iconst_0
12:  istore  4
14:  iload   4
16:  sipush  1000
19:  if_icmpge       35
22:  lload_1
23:  lconst_1
24:  ladd
25:  lstore_1
26:  iload   4
28:  iconst_1
29:  iadd
30:  istore  4
32:  goto    14
35:  iload_3
36:  iconst_1
37:  iadd
38:  istore_3
39:  goto    4
42:  return

Disassembling the Java class for the code with the for iterators produces the following print out:

0:   new     #12; //class scala/runtime/LongRef
3:   dup
4:   lconst_0
5:   invokespecial   #16; //Method scala/runtime/LongRef."<init>":(J)V
8:   astore_1
9:   getstatic       #22; //Field scala/runtime/RichInt$.MODULE$:Lscala/runtime/RichInt$;
12:  getstatic       #27; //Field scala/Predef$.MODULE$:Lscala/Predef$;
15:  iconst_0
16:  invokevirtual   #31; //Method scala/Predef$.intWrapper:(I)I
19:  sipush  1000
22:  invokevirtual   #35; //Method scala/runtime/RichInt$.until$extension0:
                            (II Lscala/collection/immutable/Range;
25:  new             #37; //class JavaScala$$anonfun$testRun$1
28:  dup
29:  aload_0
30:  aload_1
31:  invokespecial   #40; //Method JavaScala$$anonfun$testRun$1."<init>
                          (LJavaScala;Lscala/runtime/LongRef;)V
34:  invokevirtual   #46; //Method scala/collection/immutable/Range.foreach$mVc$sp:
                         (Lscala/Function1;)
37:  return


Although the number of instructions for the for loop is smaller than the number of instructions for while, most of those instructions are function calls:
- conversion of counter to long
- static conversion of int to RichInt to wrap Java Integer:
            @inline implicit def intWrapper(x: Int)= new runtime.RichInt(x)
- ultimately the foreach method is invoked to execute the loop.

Interestingly enough, the foreach method for collection is implemented using the while loop not the for loop. Scala compiler plug-in such as ScalaCL to optimize the execution of iteration on Scala collections, Arrays, Lists,... have been introduced to get around this issue. The reader can also take comfort in using the Java Class File Dissambler to understand how a method or piece of code translate into efficient, or in our case, inefficient byte-code. Quite a few methods in Scala such as foldLeft, reduceLeft uses tail recursion to traverse collections. It would be interesting to compare the relative performance of those methods with alternatives using iterators.. stay tune.

References