Monday, May 5, 2014

Performance Scala Parallel Collections

Target audience: Beginner
Estimated reading time: 3'

This post evaluates the performance improvement of Scala parallel collections ovr mutable and immutable collections.


Table of contents
Follow me on LinkedIn

Overview

The Scala standard library includes some parallel collections which purpose is to shield developers from the intricacies of concurrent thread execution and race condition. The parallel collections are a very convenient approach to encapsulate concurrency into a high level abstraction similar to the traditional data workflow, scientists are familiar with.
Parallel computing is supported for some collection using the par method as listed below.

  • List[T].par: ParSeq[T]
  • Array[T].par: ParArray[T]
  • Map[K,V].par: ParMap[K,V]
  • HashMap[K,V].par: ParHashMap[K,V]
  • Set[T].par: ParSet[T]
  • ParVector, ParRange and ParIterable
Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted

Benchmark for parallel arrays

The main purpose of parallel collections is to improve the performance of execution through concurrency. Let’s consider a map and reduce function applied to an array of arbitrary length

final val sz = 100000
val data = Array.tabulate(sz) ( _ << 1)
data.par.map( x => f(x))
data.par.reduceLeft( _ + _)

The next step is to create a set of benchmark test classes, ParArrayBenchmark and ParMapBenchmark that automates the performance evaluation of parallel arrays and maps over an arbitrary number of tasks, nTasks.
The first step is to define a timing function (line 1), that executes a function g for times iterations (line 4).

1
2
3
4
5
6
def timing(g: Int => Unit, times: Int): Long = {
   // Measure duration of 'times' execution of g
   val startTime = System.currentTimeMillis
   Range(0, times).foreach(g)
   System.currentTimeMillis - startTime
}

The benchmark is parameterized for the type U of elements in an array. The constructor takes an Array (line 2) and a parallelizable array ParArray (line 3) as arguments.
The benchmark ParArrayBenchmark evaluate and compare the performance of an array and a parallel array for the most commonly used higher order methods: map (line 6), filter (line 14) and reduce (line 22).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class ParArrayBenchmark[U](
  u: Array[U], 
  v: ParArray[U], 
  times: Int) {

  def map(f: U => U)(nTasks: Int): Double = {
    v.tasksupport = new ForkJoinTaskSupport(
      new ForkJoinPool(nTasks)
    )
    val duration = timing(_ => u.map(f), times).toDouble
    timing( _ => v.map(f), times )/duration
  }
 
  def filter(f: U => Boolean)(nTasks: Int): Double = {
     v.tasksupport = new ForkJoinTaskSupport(
       new ForkJoinPool(nTasks)
     )
     val duration = timing(_ => u.filter(f), times).toDouble
     timing( _ => v.filter(f), times )/duration
  }

  def reduce(f: (U,U) => U)(nTasks: Int): Double = {
     v.tasksupport = new ForkJoinTaskSupport(
       new ForkJoinPool(nTasks)
     )
     val duration = timing(_ => u.reduceLeft(f), times).toDouble
     timing( _ => v.reduceLeft(f), times )/duration
  }
}

The benchmark is flexible enough to support any kind of method argument f with any type U for all three methods; map, filter and reduce.
The Scala classes ForkJoinTaskSupport and ForkJoinPool are wrappers around the Java classes, ForkJoinTask and ForkJoinPool. ForkJoinPool (lines 8, 16 and 24) provides Scala developers with a very efficient way to manage threads pool: It executes nTasks tasks that are potentially created by other tasks.
The tasks are implemented using Java threads, managed by an executor service, familiar to most Java developers.

Benchmark for parallel maps

Let's create a benchmark for evaluating the performance of parallel maps, similar to the benchmark on parallel arrays.
Once again, the evaluation methods map (line 7) and filter (line 21) are flexible enough to accommodate any function argument f of any type U. The implementation of these two methods follows the same pattern as the one use for the parallel array. The duration of the execution of map and filter is computed through the timing method, introduced in the previous paragraph.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ParMapBenchmark[U](
  u: immutable.Map[Int, U], 
  v: ParMap[Int, U], 
  times: Int) {
      
   //Define the map operator for the performance benchmark of map
  def map(f: U => U)(nTasks: Int): Double = {
     v.tasksupport = new ForkJoinTaskSupport(
       new ForkJoinPool(nTasks)
     )

     val duration = timing(_ => u.map{
      case (e1, e2) => (e1, f(e2))
     }, times).toDouble
     timing( _ => v.map{ 
       case (e1, e2) => (e1, f(e2))
     }, times )/duration
   }
 
    //Define the filter operator for the performance benchmark of Scala map
  def filter( f: U => Boolean)(nTasks: Int): Double = {
    v.tasksupport = new ForkJoinTaskSupport(
      new ForkJoinPool(nTasks)
    )

    val duration = timing(_ => u.filter{ 
      case (e1, e2) => f(e2)
    }, times).toDouble
    timing( _ => v.filter{ 
      case (e1, e2) => f(e2)
    }, times)/duration
   }
}


Performance Results

The objective of the performance test is to evaluate the efficiency of the Scala parallel collection according to
  • The number of available CPU cores
  • The complexity of the computation
  • The size of the collection
Let’s look at the relative performance of the map task on a single threaded Array and a parallel array ParArray.
Let's use fairly simple mathematical functions mapF (line 2) (resp. filterF (line 5) and reduceF (line 8) for evaluating the map (resp. filter and reduce) functions on array and parallel arrays. The arrays are create and populated with random values (lines 10 & 11).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
  // Arbitrary map function
val mapF = (x: Double) => Math.sin(x*0.01) + Math.exp(-x)
    
  // Arbitrary filter function
val filterF = (x: Double) => (x > 0.8)
     
  // Arbitrary reduce function
val reduceF = (x:Double, y:Double) => (x+y)*x

val data = Array.fill(SZ)(Random.nextDouble)
val pData = ParArray.fill(SZ)(Random.nextDouble)
 
   // Initialized and execute the benchmark for the parallel array
val benchmark = new ParArrayBenchmark[Double](data, pData, TIMES)

benchmark.map(mapF)(n)
benchmark.filter(filterF)(n)


The results are not surprising in the following respects:
  • The reducer doesn't take advantage of the parallelism of the array. The reduction of ParArray has a small overhead in the single-task scenario and then matches the performance of Array.
  • The performance of the map function benefits from the parallelization of the array. The performance levels off when the number of tasks allocated equals or exceeds the number of CPU core.
The second test consists of comparing the behavior of two parallel collections, ParArray and ParHashMap, on two methods, map and filter, using a configuration identical to the fist test as follows:

We reuse the mathematical functions for evaluate the map, filter and reduce functions used for arrays in the test client code.

1
2
3
4
5
6
7
8
val mapData = new HashMap[Int, Double]
Range(0, SZ).foreach(n => mapData.put(n, Random.nextDouble) )

val parMapData = new ParHashMap[Int, Double]
Range(0, SZ).foreach(n => parMapData.put(n, Random.nextDouble) )

benchmark.map(mapF)(n)
benchmark.filter(filterF)(n)



The impact of the parallelization of collections is very similar across methods and across collections. It's important to notice that the performance of the parallel collections levels off at around four times the single thread collections for fie concurrent tasks and above.


References

Tuesday, March 4, 2014

Curried and Partial Functions in Scala

Target audience: Intermediate
Estimated reading time: 5'



Table of contents
Follow me on LinkedIn

Introduction

Although most of Scala developers have some level of knowledge of curried and partial functions, they struggle to grasp the different use case either of those functional programming techniques are applied and their relative benefits. For those interested in more detailed explanation of currying existing functions, I would recommend the excellent post of Daniel Westheide.

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted

Partial functions

Partially defined functions are commonly used to restrict the domain of applicability of function arguments. The restriction can apply to either the type of the argument or its values. Let's consider the computation of square root of a floating point value dsqrt. The value of the argument has to be positive. A simple implementation relies on the Option monad.

def dsqrt(x: Double): Option[Double] = 
    if(x<0.0) None else Some(Math.sqrt(x))

The same method can be implemented using a partial function by applying the matching pattern to the argument as follows.

val zero = 0.0

def dsqrt: PartialFunction[Double, Double]= { 
    case x: Double if(x >=zero) => Math.sqrt(x) 
}

The method dsqrt return an object of type PartialFunction with an input argument of type Double and an output of type Double. The method can handle only input value x >= 0.0. Any other input value generates a MatchErr exception.

Let's evaluate the partial function with different argument types and values. The partial function accepts input with type for which an implicit conversion has been already defined. The first invocation of dsqrt (line 2) returns a valid Partial Function. The second invocation (line 8) triggers an implicit conversion from Long to Double, before returning the partial function. The third call to dsqrt will returns a MatchErr (line 12)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  // Succeeds
Try (dsqrt(3.6)) match { 
  case Success(res) => {} 
  case Failure(e) => Console.println(e.toString)
 }

  // Succeed because the implicit conversion Long to Double
Try (_sqrt(4L)) match { }

  // Fails with the following message
  // "throws scala.MatchError: -3.6 (of class java.lang.Double)"
Try (_sqrt(-3.6)) match { }

A similar restriction can be applied to the type of argument. Let's consider the incremental methods add1 (line 3) and add2 (line 15) of class Value. These two methods process values of type AnyValue. It requires that the type of argument to be checked. add and add2 described two alternative and crude type safe checking approaches.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Value(x: Int) {

  def add1(anyVal: AnyVal): AnyVal = {
    if(anyVal.isInstanceOf[Int]) {
      val value = anyVal.asInstanceOf[Int]
     (x + value).asInstanceOf[AnyVal]
    }
    else if (anyVal.isInstanceOf[Double]) {
      val value = anyVal.asInstanceOf[Double].floor.toInt
     (x + value).asInstanceOf[AnyVal]
    }
    else { }
  }
   
  def add2(anyVal: AnyVal): AnyVal = anyVal.getClass.getName match {
    case "Int" => {
      val value = anyVal.asInstanceOf[Int]
     (x + value).asInstanceOf[AnyVal]
    }
    case "Double" => {
      val value = anyVal.asInstanceOf[Double].floor.toInt
     (x + value).asInstanceOf[AnyVal]
    }
    case _ => {}
  }
} 

The two implementation add1 and add2 are cumbersome to say the least. An alternative implementation using a pattern matching on the type an returning a partial function is far more elegant.

1
2
3
4
5
6
7
8
9
class Value(x: Int) {
  def add: PartialFunction[Any, Any] = {
    case n: Int => x + n
    case y: Double => x + y.floor.toInt
  }
}

val value = new Value(4)
Console.println(value.add(4.5))

In the example above, we do not have to handle the case for each the argument has an improper type. The partial function will simply discards it.

Note: The method Actor.receive that define a message loop in an actor, consuming messages from the mail box are indeed partial functions.

Currying

Currying is the transformation of function with multiple arguments into a chain of function taking a single argument. if f: x-> f(x,y) then curry(f): x -> (y->f(x,y))
Let's take a simple example of a sum of two floating point values. The original 2 arguments functions (1) can be converted into a single argument function returning a anonymous function taking the second argument as parameter (2). Scala provides developers with a simple syntax sugar to define the cascade of functions calls (3)

def sum(x: Double, y: Double): Double = x+y
def sum(x: Double): Double = (y: Double) => x+y
def sum(x: Double)(y: Double): Double = x+y
 

Most of high order methods on collections are curried. The following example illustrate the commonly used foldLeft.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class Collection[T](private val values: Array[T]) {

  def foldLeft[U](u:U, op:(U,T)=>U):U = 
    values.foldLeft[U](u)((u,t)=> op(u,t))
  def foldLeft[U](u:U)(op:(U,T)=>U):U = 
    this.foldLeft(u, op)    
}
 
val myCollection = new Collection[Int](Array[Int](3, 5, 8))
val product = myCollection.foldLeft[Int](0)((prod, x) => prod*x)


Is there any benefits of using curried function instead of functions or methods with multiple arguments? Yes, in the case the type inferencer has more information that the second argument can use. Let's consider the foldLeft method above:
    def foldLeft[U](u: U)(op:(U, T)=>U):U = this.foldLeft(u, op)

The type inferencer determine the type U of the first argument and used it subsequently in the binary operator parameters
op:(U, T) => U

References

Monday, February 17, 2014

Efficient Trading Option Pricing in Scala

Target audience: Intermediate
Estimated reading time: 5'

Ever pondered how the pricing of an option on a future or index works? Curious about the underlying formula?
This article offers a succinct summary of the Black-Scholes formula and discusses how it's implemented in Scala, utilizing memoization to enhance execution speed.


Table of contents
      Formulation
      Memoization
Follow me on LinkedIn

What you will learn: A fast, memoized implementation of option pricing equation of Black-Scholes.

Notes
  • For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.
  • Familiarity with option trading [ref 1] is not required to understand the topic
  • Environments: Scala 2.12.10, JDK 11

Overview

You don't need to be a seasoned professional in option trading or asset valuation to grasp the essence of Black-Scholes. 

The Black-Scholes model is a method employed to calculate the fair or theoretical value of a call or put option. This calculation is based on six key factors: the volatility, the type of option, the price of the underlying stock, the duration until expiration, the strike price, and the risk-free interest rate.

The Black-Scholes-Merton equations, a notable set of stochastic partial differential formulas, are renowned for pricing options based on underlying security's price, volatility, and expiration time [ref 2]. Functional languages like Scala give developers the flexibility to experiment with and test various volatility and time decay models [ref 3].

Theory

Black-Scholes approach assumes that the price of the underlying security follows a geometric Brownian distribution known as the Wiener process. If we note S(t) the price of security (stock, bond, future), V(t) the price of the option over time, r the annualized compound risk-free interest rate or rate of return, and sigma the volatility of the price of the underlying security S then the Black-Scholes equation can be described as \[\frac{\partial V}{\partial t} + \frac{\sigma ^{2}S^{2}}{2}\frac{\partial^2 V}{\partial S^2} + r (S\frac{\partial V}{\partial S}- V) = 0\] The resolution of the partial differential equations produces the following solution for a call option: \[(1)\ \ C(t) = N(d)S - N(d')Ke^{-r(T-t)}\,\,with\\N(x)=\frac{1}{\sqrt{2 \pi }}\int_{-\infty }^{x}e^{-\frac{t^{2}}{2}}dt\\(2)\ \ d = \frac{1}{\sigma \sqrt{T-t}}[log(\frac{S}{K}) + (r+\frac{\sigma ^{2}}{2})(T-t)]\\(3)\ \ d^{'}=d-\sigma \sqrt{T-t}\] K is the strike price for the underlying security. The price of a put option is computed as \[P(t)=N(-d + \sigma \sqrt{T-t})Ke^{-r(T-t))} - N(-d)S\] As a reminder, a financial call option gives a buyer the right for a premium fee but not the obligation to acquire an agreed amount of assets S (securities, bonds, commodity) from a seller at a future date T for a certain predefined price (strike price: K). The seller is obligated to sell the commodity should the buyer so decide. A put option gives the buyer the right but not the obligation to sell an asset by a future data at a predefined (Strike) price.

Implementation

Calculating the price of an option requires extensive CPU usage. The approach includes several steps:
  1. Establish and encapsulate the parameters used in the Black-Scholes formula.
  2. Carry out the process to calculate the price of a call option.
  3. Identify the various computational elements involved in the Black-Scholes formula.
  4. Create a system for storing temporary results.
  5. Combine these interim outcomes to finalize the call price.
  6. Use Monte Carlo Simulation for determining the security's price.

Formulation

First let's define a data class, BSParams to encapsulate the parameters of the Black-Scholes formula:
  • S Current price of the underlying security
  • K Strike price
  • r Annual compound risk-free interest rate
  • sigma Volatility of the price of the underlying
  • T Expiration date for the option
case class BSParams(
   S: Double, 
   K: Double, 
   r: Double, 
   sigma: Double, 
   T: Double)


The Scala code snippet below implements the pricing of a call option for a security S and reflect the mathematical equations defined in the previous paragraph.

def callPrice(p: BSParams, t: Double): Double = {
    import Math._

   // Implement the Black-Scholes stochastic equation,
   val sigmaT = p.sigma * sqrt(p.T - t)

   // Formula (2)
   val d = (log(p.S/p.K)+ (p.r + 0.5*sigma*sigma)*(p.T-t)) /sigmaT 

   // Formula (3)
val d' = d1 - sigmaT
   
val gauss = new Gaussian
   // Computation of the call C(t) as per formula. (1)
   p.S*gauss.value(d) - p.K*exp(-p.r*p.T)*gauss.value(d')
}

However, professional traders in commodities and futures for instance are more interested simulating the impact of volatility, sigma, time decay t -T or price of the commodity on the price of the option using Monte Carlo simulation. In this use case, the computation of the entire Black-Scholes formula is not necessary. 

Memoization

Memoization, in the realm of computing, is a strategy for optimizing program performance. It involves caching the outcomes of costly function calls to pure functions. This way, when inputs are repeated, the program can quickly retrieve the stored result instead of recalculating it.

The aim is to reduce computational expenses by incorporating the algorithm into a workflow. In this approach, steps within the workflow that remain unchanged by the alteration of a single variable's value are not executed. For example, a change in the underlying security's price, denoted as S, necessitates recalculating only a specific part of the workflow, namely \[S=> log(S/X)\] The Black-Scholes algorithm is divided into five distinct internal computational stages:

object BSParams{
   val fLog = (p: BSParams) => Math.log(p.r / p.T)
   val fMul = (p: BSParams) => p.r * p.T
   val fPoly = (p: BSParams) => 0.5 * p.sigma * p.sigma * p.T
   val fExp = (p: BSParams) => -p.K * Math.exp(-p.r * p.T)
}

The 4 computational steps are used to maintain intermediate results so the entire Black-Scholes formula does not have to be computed from scratch each time one of the parameters S, K, r, sigma or T is changed.
The computational state is completely described by
  • Parameters of Black-Scholes equations S, K, r, sigma, T
  • Partial or intermediate results {bs1 to bs5}
Next, we need to wrap the computation and update of intermediate results of the Black-Scholes formula.

case class PartialResult(bs1: Double, bs2: Double, bs3: Double, bs4: Double, bs5: Double){
    import BSParams._

    lazy val d1: Double = (bs1 + bs2 + bs3) / bs4

    def f1(p: BSParams): PartialResult = this.copy(bs2 = fLog(p))

    def f2(p: BSParams): PartialResult = this.copy(bs2 = fMul(p), bs5 = fExp(p))

    def f3(p: BSParams): PartialResult = this.copy(bs3 = fPoly(p), bs5 = fExp(p))
 }

As mentioned earlier, the computation state is defined by the Black-Scholes parameters p and intermediate results values, pr.

class State(p: BSParams, pr: PartialResult){

    def setS(newS: Double): State = new State(p.copy(S = newS), pr)

    def setR(newR: Double): State = new State(p.copy(r = newR), pr)

    def setSigma(newSigma: Double): State = 
                  new State(p.copy(sigma = newSigma), pr)

    lazy val call: Double = {
       import org.apache.commons.math3.analysis.function.Gaussian

       val gauss = new Gaussian
       p.S * gauss.value(pr.d1) -pr.bs5 * gauss.value(pr.d1 - pr.bs4)
    }
}

Each of the methods setS, setR, setSigma updates one of the Black-Scholes equation parameters that automatically triggers a selective re-computation of some of the intermediate results.
The price of the call is a lazy value computed only once (immutable state).  The update of the state of the Black-Scholes computation is rather straight forward.
The following code snippet illustrates the immutable update of sigma in the price of a call 

// Initial computation/state of Black-Scholes  
val state0 = new State(0.4, 0.6, 0.1, 2.0, 1.4)

  // Compute the price of a call
val callValue = state0.call
log.info(callValue)
// Update the value of sigma val state1 = state0.setSigma(0.14) val newCallValue = state1.call
log.info(newCallValue)

Monte-Carlo Simulation

A Monte Carlo simulation is a method employed to estimate the various possible outcomes of a process that is challenging to predict owing to the presence of random elements. This technique is instrumental in assessing the effects of risk and uncertainty.
The Monte Carlo simulation finds application in an array of disciplines, such as finance, business, physics, and engineering, among others. It's often used to solve a variety of problems and is sometimes known as a multiple probability simulation.

The Black-Scholes formula can be validated through a Monte-Carlo simulation, using a Gaussian distribution to represent the stochastic component of the formula.
The price of security S(t) at time t is defined as \[(4)\ \ S_{t}=S_{0}+e^{(r - \frac{\sigma^{2}}{2})t+\mathbb{N}(0,1)\sigma\sqrt{t}}\]
The solution S(t) can be approximated using an iterative or recursive process. \[(5)\ \ S_{t}=S_{t-1} + e^{(r - \frac{\sigma^{2}}{2}).dt+\mathbb{N}(0, 1)\sigma\sqrt{dt}}\] 
This particular implementation uses a fast tail recursion to simulate the computation of the underlying security at any given time t. The sampling times are normalized as integers with dt =1 for a faster simulation.

 import Random._

def monteCarlo(p: BSParams, s0: Double): List[Double] = {
     // Implementation of equation (4)
   val initialValue = s0*exp((p.r-0.5*p.sigma*p.sigma)+p.sigma*nextGaussian)
   monteCarlo(p, 0, List[Double](initialValue))
}

  
@tailrec
private def monteCarlo(p: BSParams, t: Int, values: List[Double]): List[Double] = {
   import Math._
    
   if(t >= p.T)
     values.reverse
   else {
        // Implementation of equation (5)
val st = values.last * exp((p.r-0.5*p.sigma*p.sigma)+p.sigma*nextGaussian) monteCarlo(p, t+1, st :: values) } }

The implementation of the Monte Carlo simulation relies on the fast Scala tail recursion [ref 4].

Evaluation

Let's predict the price of security using the Monte Carlo method for 100 units of time.

val S0 = 80.0
val r = 0.06
val sigma = 0.2
val K = 0.9
val bsParams = BSParams(S0,  r, K, sigma, 100)
val pricePrediction = monteCarlo(bsParams, S0)

The plot below illustrates the impact of the risk free of interest rate, r (6% and 1%) on the simulated price, pricePrediction  of the underlying security.


References

[2] Black-Scholes Model: What It Is, How It Works
[3] Scala for the Impatient - C. Horstman - Addison-Wesley 2012