Thursday, June 16, 2016

Analyze Scala Performance Using Javap

Target audience: Beginner
Estimated reading time: 4'

Table of contents
Overview
Follow me on LinkedIn

Overview

As mentioned in a previous post, iterators in Scala, such as foreach, for & while loop have different performance characteristics. A recurrent question between developers is the relative performance of Scala and Java regarding for and while.

The "for comprehension" closure in Scala is indeed a syntactic sugar wraps around a sequence of flatMap and map methods, Monad in Practice. It is expected to be significantly slower and should not be used as an iterator.
We select the higher method foreach to implement the for iterator in Scala.


Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.

Evaluation

The simple evaluation consists of processing a large number of iterations of a very simple statement which execution does not interfere with the actual performance of the iterator. The code is also is easy to understand. The code for the while loop is described below.

def testRun(numIterations: Int) : Unit = {
  val startTime = System.currentTimeMillis
  var sum: Long = 0
  var i: Int = 0

  while(i < numIterations) {
    var j = 0
    while(j  < 1000) {
      sum += 1
      j += 1
    }
     i += 1
  }
  Console.printf(s"${(System.currentTimeMillis-startTime)}")
}

The test is executed on a 4 cores Intel i7 CPU with java 1.7.02 64-bit and Scala 2.10.2. The chart below, compares the duration of the execution of the for iterator for Java and Scala.


The performance of Scala and Java for executing the while loop are very similar.

The second test consists of comparing the relative performance of Java for loop and Scala foreach higher order method.

def testRun(numIterations: Int) : Unit = {
    var sum: Long = 0
    (0 until numIterations.foreach( sum += _)
}




Analysis using Javap

The first step is to analyze the number of byte-code instructions generated by the Scala compiler. We use the Java Class File Disassembler,  javap, to print out the actual instructions processed by the Java virtual machine.
            javap -c -verbose xxx.class
The sequence of instructions generated for the execution of the while loops are displayed below.

0:   lconst_0
1:   lstore_1
2:   iconst_0
3:   istore_3
4:   iload_3
5:   sipush  1000
8:   if_icmpge       42
11:  iconst_0
12:  istore  4
14:  iload   4
16:  sipush  1000
19:  if_icmpge       35
22:  lload_1
23:  lconst_1
24:  ladd
25:  lstore_1
26:  iload   4
28:  iconst_1
29:  iadd
30:  istore  4
32:  goto    14
35:  iload_3
36:  iconst_1
37:  iadd
38:  istore_3
39:  goto    4
42:  return

Disassembling the Java class for the code with the for iterators produces the following print out:

0:   new     #12; //class scala/runtime/LongRef
3:   dup
4:   lconst_0
5:   invokespecial   #16; //Method scala/runtime/LongRef."<init>":(J)V
8:   astore_1
9:   getstatic       #22; //Field scala/runtime/RichInt$.MODULE$:Lscala/runtime/RichInt$;
12:  getstatic       #27; //Field scala/Predef$.MODULE$:Lscala/Predef$;
15:  iconst_0
16:  invokevirtual   #31; //Method scala/Predef$.intWrapper:(I)I
19:  sipush  1000
22:  invokevirtual   #35; //Method scala/runtime/RichInt$.until$extension0:
                            (II Lscala/collection/immutable/Range;
25:  new             #37; //class JavaScala$$anonfun$testRun$1
28:  dup
29:  aload_0
30:  aload_1
31:  invokespecial   #40; //Method JavaScala$$anonfun$testRun$1."<init>
                          (LJavaScala;Lscala/runtime/LongRef;)V
34:  invokevirtual   #46; //Method scala/collection/immutable/Range.foreach$mVc$sp:
                         (Lscala/Function1;)
37:  return


Although the number of instructions for the for loop is smaller than the number of instructions for while, most of those instructions are function calls:
- conversion of counter to long
- static conversion of int to RichInt to wrap Java Integer:
            @inline implicit def intWrapper(x: Int)= new runtime.RichInt(x)
- ultimately the foreach method is invoked to execute the loop.

Interestingly enough, the foreach method for collection is implemented using the while loop not the for loop. Scala compiler plug-in such as ScalaCL to optimize the execution of iteration on Scala collections, Arrays, Lists,... have been introduced to get around this issue. The reader can also take comfort in using the Java Class File Dissambler to understand how a method or piece of code translate into efficient, or in our case, inefficient byte-code. Quite a few methods in Scala such as foldLeft, reduceLeft uses tail recursion to traverse collections. It would be interesting to compare the relative performance of those methods with alternatives using iterators.. stay tune.

References

Thursday, May 5, 2016

Bootstrapping With Replacement in Scala

Target audience: Intermediate
Estimated reading time: 6'

Bootstrapping is a method in statistics where random sampling of a dataset is done with replacement. It allows data scientists to determine the sampling distribution for a broad range of probability distributions through this technique.

Background

A primary goal of bootstrapping is to evaluate the precision of various statistical measures like mean, standard deviation, median, mode, or error. These measures, sf, often termed as estimators, serve to approximate a distribution. The most frequently used approximation is called the empirical distribution function. When the data points x are independent and identically distributed (iid), this empirical or approximate distribution can be assessed by employing resampling techniques.

The following diagram captures the essence of bootstrapping by resampling.

Generation of bootstrap replicates by resampling

Each of the B bootstrap samples has the same number of observations or data points as the original data set from which the samples are created. Once the samples are created, a statistical function sf such as mean, mode, median or standard deviation is computed for each sample.
The standard deviation for the B statistics should be similar to the standard deviation of the original data set.

Implementation in Scala

The purpose of this post is to illustrate some basic properties of bootstrapped sampling
  • Profile of the distribution of statistics sf for a given probability distribution
  • Comparison of the standard deviation of the statistics sf with the standard deviation of the original dataset
Let's implement a bootstrap by resampling in Scala, starting with a class Bootstrap.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class Bootstrap(
    numSamples: Int = 1000,
    sf: Vector[Double] => Double,
    inputDistribution: Vector[Double],
    randomizer: Random
) {

  lazy val bootstrappedReplicates: Array[Double] =
     (0 until numSamples)./:( new mutable.ArrayBuffer[Double] )(
        ( buf, _ ) => buf += createBootstrapSample
      ).toArray

  def createBootstrapSample: Double {}

  lazy val mean = bootstrappedReplicates.reduce( _ + _ )/numSamples

  def error: Double = {}
}

The class Bootstrap is instantiated with a pre-defined number of samples, numSamples (line 2), a statistic function sf (line 3), a data set generated by a given distribution inputDistribution (line 4) and a randomizer (line 5).
The computation of the bootstrap replicates, bootstrappedReplicates is central to resampling (lines 8 - 11). As described in the introduction, a replicate, s is computed from a sample of the original data set with the method createBootstrapSample (line 10).

Let's implement the method createBootstrapSample.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def createBootstrapSample: Double =
    sf(
       (0 until inputDistribution.size)./:( new mutable.ArrayBuffer[Double] )(
         ( buf, _ ) => {
            randomizer.setSeed( randomizer.nextLong )
            val randomValueIndex = randomizer.nextInt( inputDistribution.size )
            buf += inputDistribution( randomValueIndex )
         }
       ).toVector
    )

The method createBootstrapSample
- Samples the original dataset using a uniform random function (line 6)
- Applies the statistic function sf to this sample dataset (line 1 & 11)

The last step consists of computing the error (deviation) on the bootstrap replicates

1
2
3
4
5
6
7
8
  def error: Double = {
      val sumOfSquaredDiff = bootstrappedReplicates.reduce(
        (s1: Double, s2: Double) => (s1 - mean) (s1 - mean) +  (s2 - mean)*(s2 - mean)
      )

      Math.sqrt(sumOfSquaredDiff / (numSamples - 1))
  }


Evaluation

The first evaluation consists of comparing the distribution of replicates with the original distribution. To this purpose, we generate an input dataset using
  • Normal distribution
  • LogNormal distribution

Setup

Let's create a method, bootstrapEvaluation to compare the distribution of the bootstrap replicates with the dataset from which the bootstrap samples are generated.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def bootstrapEvaluation( 
     dist: RealDistribution, 
     random: Random, 
     gen: (Double, Double) 
): (Double, Double) = {

    val inputDistribution = (0 until 5000)./:(new ArrayBuffer[(Double, Double)])
     (
       ( buf, _ ) => {
          val x = gen._1 * random.nextDouble - gen._2
          buf += ( ( x, dist.density( x ) ) )
      }
    ).toVector

    val mean = (x: Vector[Double]) => x.sum/x.length
    val bootstrap = new Bootstrap(
        numReplicates,
        mean, 
        inputDistribution.map( _._2 ), 
        new Random( System.currentTimeMillis)
    )

    val meanS = bootstrap.bootstrappedReplicates.sum / 
          bootstrap.bootstrappedReplicates.size
    val sProb = bootstrap.bootstrappedReplicates.map(_ - meanS)
         // .. plotting histogram of distribution sProb
    (bootstrap.mean, bootstrap.error)
  }

We are using the normal and log normal probability density function defined in the Apache Commons Math Java library. These probability density functions are defined in the org.apache.commons.math3.distribution package.

The comparative method bootstrapEvaluation has the following argument:
  • dist: A probability density function used to generate the data set upon which sampling is performed (line 2).
  • random: A random number generator (line 3)
  • gen: A pair of parameters for the linear transform for the generation of random values (a.r + b) (line 4).
The input distribution inputDistribution { (x, pdf(x)} is generated for 5000 data points (lines 7 - 13).
Next the bootstrap is created with the appropriate number of replicates, numReplicates, the mean of the input data set as the statistical function s, the input distribution and the generic random number generator of Scala library, as arguments (lines 16 -20).
Let's plot the distribution the input data set generated from a normal density function.

val (meanNormal, errorNormal) = bootstrap(
    new NormalDistribution, 
    new scala.util.Random, 
    (5.0, 2.5)
)

Normally distributed dataset

The first graph plots the distribution of the input dataset using the Normal distribution.


The second graph illustrates the distribution (histogram) of the replicates s - mean.


The bootstrap replicates sf(x) are also normally distributed. The mean value for the bootstrap replicates is 0.1978 and the error is 0.001691

Dataset with a log normal distribution

We repeat the same process for the lognormal distribution. This time around the dataset to sample from follows a log-normal distribution.

val (meanLogNormal, errorLogNormal) = bootstrap(
    new LogNormalDistribution, 
    new scala.util.Random, 
    (2.0, 0.0)
)



Although the original dataset used for generated the bootstrap samples is normally distributed, the bootstrap replicates sf(x) are normally distributed. The mean for the bootstrap replicates is 0.3801 and the error is 0.002937

The error for the bootstrap resampling from a log normal distribution is twice as much as the error related to the normal distribution
The result is not surprising: The bootstrap replicates follow a normal distribution which matches closely the original dataset created using the same probability density function. 

References

  • Programming in Scala - 3rd edition M Odersky, L. Spoon, B. Venners - Artima - 2016
  • Elements of Statistics Learning: Data mining, Inference and Prediction - 7.11 Bootstrap method Springer - 2001
  • github.com/patnicolas

Friday, April 15, 2016

Manage Spark Context in ScalaTest

Target audience: Beginner
Estimated reading time: 4'

This post describes a methodology to manage the Spark context while testing you application using ScalaTest


Table of contents
Follow me on LinkedIn

Introduction

Debugging Apache Spark application using ScalaTest seems quite simple when dealing with a single test:
  • Specify your Spark context configuration SparkConf
  • Create the Spark context
  • Add test code related to your application
  • Clean up resources (Spark context, Akka context, File handles...)
However, there are cases when you may need to create and close the spark context used across multiple test or on a subset of the tests. The challenge is to make sure that the Spark context is actually close when it is no longer needed.
This post introduces two basic
ScalaTest methods beforeAll and afterAll of the trait BeforeAndAfterAll to manage the context life-cyle of your test application.

Wrapping the Spark context

The objective is to create a small framework that create or retrieve an existing Spark context before executing a test and closing it after the test is completed, independently of the status of the test.
The initialization of the Spark context consist of specifying the configuration for the sequence of tests. Some of these parameters can be dynamically defined through a simple parameterization. The context is created only if it is not already defined within the scope of the test using the getOrCreate method. 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
trait SparkContextWrapper {
  protected[this] var sc: SparkContext = _

  def getContext: SparkContext = {
    val conf = new SparkConf().setAppName(s"Test-App")
      .set("spark.executor.instances", "2")
      .set("spark.driver.memory", "2g")
      .set("spark.executor.memory", "2g")
      .set("spark.executor.cores", "2")
      .set("spark.serializer", 
              "org.apache.spark.serializer.KryoSerializer")
      .set("spark.rdd.compress", "true")
      .set("spark.shuffle.spill", "true")
      .set("spark.shuffle.spill.compress", "true")
      .set("spark.shuffle.memoryFraction", "0.4")
      .set("spark.io.compression.codec", "snappy")
      .set("spark.network.timeout", "600")

    sc = SparkContext.getOrCreate(conf.setMaster("local[4]"))
    sc.setLogLevel("ERROR")
    sc
  }

  def close: Unit = {
    if (sc != null) sc.stop
  }
}

Scala test context

The solution is to let ScalaTest method manage the lifecycle of the Spark context for testing. 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
trait SparkContextForTesting 
     extends SparkContextWrapper with BeforeAndAfterAll { 
  self: Suite =>

  override def beforeAll: Unit = {
    getContext
    super.beforeAll
  }

  override def afterAll: Unit = {
    close
    super.afterAll
  }
}

Note: The BeforeAndAfterAll trait can only be sub-classed by a test suite inheriting the Suite trait. The method beforeAll (line: 5) is automatically called before the first test of your test suite and the method afterAll (line 10)is called after your last test is completed, whether those tests succeeded or not.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class MyTest 
  extends FlatSpec 
    with Matchers 
      with SparkContextForTesting {

 val x: Int = -4
 it should "My first test" in {
   val sqlContext = new SQLContext(sc) 
   // ....
 }
   // ... other tests

 it should "My last test" in {
   // clean up after it completes
 }
}

"That's all folks!