This post evaluates the performance improvement of Scala parallel collections ovr mutable and immutable collections.
Overview
The Scala standard library includes some parallel collections which purpose is to shield developers from the intricacies of concurrent thread execution and race condition. The parallel collections are a very convenient approach to encapsulate concurrency into a high level abstraction similar to the traditional data workflow, scientists are familiar with.
Parallel computing is supported for some collection using the par method as listed below.
Note:
For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted
Parallel computing is supported for some collection using the par method as listed below.
- List[T].par: ParSeq[T]
- Array[T].par: ParArray[T]
- Map[K,V].par: ParMap[K,V]
- HashMap[K,V].par: ParHashMap[K,V]
- Set[T].par: ParSet[T]
- ParVector, ParRange and ParIterable
Benchmark for parallel arrays
The main purpose of parallel collections is to improve the performance of execution through concurrency. Let’s consider a map and reduce function applied to an array of arbitrary length
final val sz = 100000
val data = Array.tabulate(sz) ( _ << 1)
data.par.map( x => f(x))
data.par.reduceLeft( _ + _)
The next step is to create a set of benchmark test classes, ParArrayBenchmark and ParMapBenchmark that automates the performance evaluation of parallel arrays and maps over an arbitrary number of tasks, nTasks.
The first step is to define a timing function (line 1), that executes a function g for times iterations (line 4).
The first step is to define a timing function (line 1), that executes a function g for times iterations (line 4).
1
2
3
4
5
6 | def timing(g: Int => Unit, times: Int): Long = {
// Measure duration of 'times' execution of g
val startTime = System.currentTimeMillis
Range(0, times).foreach(g)
System.currentTimeMillis - startTime
}
|
The benchmark is parameterized for the type U of elements in an array. The constructor takes an Array (line 2) and a parallelizable array ParArray (line 3) as arguments.
The benchmark ParArrayBenchmark evaluate and compare the performance of an array and a parallel array for the most commonly used higher order methods: map (line 6), filter (line 14) and reduce (line 22).
The benchmark ParArrayBenchmark evaluate and compare the performance of an array and a parallel array for the most commonly used higher order methods: map (line 6), filter (line 14) and reduce (line 22).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | class ParArrayBenchmark[U](
u: Array[U],
v: ParArray[U],
times: Int) {
def map(f: U => U)(nTasks: Int): Double = {
v.tasksupport = new ForkJoinTaskSupport(
new ForkJoinPool(nTasks)
)
val duration = timing(_ => u.map(f), times).toDouble
timing( _ => v.map(f), times )/duration
}
def filter(f: U => Boolean)(nTasks: Int): Double = {
v.tasksupport = new ForkJoinTaskSupport(
new ForkJoinPool(nTasks)
)
val duration = timing(_ => u.filter(f), times).toDouble
timing( _ => v.filter(f), times )/duration
}
def reduce(f: (U,U) => U)(nTasks: Int): Double = {
v.tasksupport = new ForkJoinTaskSupport(
new ForkJoinPool(nTasks)
)
val duration = timing(_ => u.reduceLeft(f), times).toDouble
timing( _ => v.reduceLeft(f), times )/duration
}
}
|
The benchmark is flexible enough to support any kind of method argument f with any type U for all three methods; map, filter and reduce.
The Scala classes ForkJoinTaskSupport and ForkJoinPool are wrappers around the Java classes, ForkJoinTask and ForkJoinPool. ForkJoinPool (lines 8, 16 and 24) provides Scala developers with a very efficient way to manage threads pool: It executes nTasks tasks that are potentially created by other tasks.
The tasks are implemented using Java threads, managed by an executor service, familiar to most Java developers.
The Scala classes ForkJoinTaskSupport and ForkJoinPool are wrappers around the Java classes, ForkJoinTask and ForkJoinPool. ForkJoinPool (lines 8, 16 and 24) provides Scala developers with a very efficient way to manage threads pool: It executes nTasks tasks that are potentially created by other tasks.
The tasks are implemented using Java threads, managed by an executor service, familiar to most Java developers.
Benchmark for parallel maps
Let's create a benchmark for evaluating the performance of parallel maps, similar to the benchmark on parallel arrays.
Once again, the evaluation methods map (line 7) and filter (line 21) are flexible enough to accommodate any function argument f of any type U. The implementation of these two methods follows the same pattern as the one use for the parallel array. The duration of the execution of map and filter is computed through the timing method, introduced in the previous paragraph.
Once again, the evaluation methods map (line 7) and filter (line 21) are flexible enough to accommodate any function argument f of any type U. The implementation of these two methods follows the same pattern as the one use for the parallel array. The duration of the execution of map and filter is computed through the timing method, introduced in the previous paragraph.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 | class ParMapBenchmark[U](
u: immutable.Map[Int, U],
v: ParMap[Int, U],
times: Int) {
//Define the map operator for the performance benchmark of map
def map(f: U => U)(nTasks: Int): Double = {
v.tasksupport = new ForkJoinTaskSupport(
new ForkJoinPool(nTasks)
)
val duration = timing(_ => u.map{
case (e1, e2) => (e1, f(e2))
}, times).toDouble
timing( _ => v.map{
case (e1, e2) => (e1, f(e2))
}, times )/duration
}
//Define the filter operator for the performance benchmark of Scala map
def filter( f: U => Boolean)(nTasks: Int): Double = {
v.tasksupport = new ForkJoinTaskSupport(
new ForkJoinPool(nTasks)
)
val duration = timing(_ => u.filter{
case (e1, e2) => f(e2)
}, times).toDouble
timing( _ => v.filter{
case (e1, e2) => f(e2)
}, times)/duration
}
}
|
Performance Results
The objective of the performance test is to evaluate the efficiency of the Scala parallel collection according to
- The number of available CPU cores
- The complexity of the computation
- The size of the collection
Let's use fairly simple mathematical functions mapF (line 2) (resp. filterF (line 5) and reduceF (line 8) for evaluating the map (resp. filter and reduce) functions on array and parallel arrays. The arrays are create and populated with random values (lines 10 & 11).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 | // Arbitrary map function
val mapF = (x: Double) => Math.sin(x*0.01) + Math.exp(-x)
// Arbitrary filter function
val filterF = (x: Double) => (x > 0.8)
// Arbitrary reduce function
val reduceF = (x:Double, y:Double) => (x+y)*x
val data = Array.fill(SZ)(Random.nextDouble)
val pData = ParArray.fill(SZ)(Random.nextDouble)
// Initialized and execute the benchmark for the parallel array
val benchmark = new ParArrayBenchmark[Double](data, pData, TIMES)
benchmark.map(mapF)(n)
benchmark.filter(filterF)(n)
|
The results are not surprising in the following respects:
- The reducer doesn't take advantage of the parallelism of the array. The reduction of ParArray has a small overhead in the single-task scenario and then matches the performance of Array.
- The performance of the map function benefits from the parallelization of the array. The performance levels off when the number of tasks allocated equals or exceeds the number of CPU core.
We reuse the mathematical functions for evaluate the map, filter and reduce functions used for arrays in the test client code.
1
2
3
4
5
6
7
8 | val mapData = new HashMap[Int, Double]
Range(0, SZ).foreach(n => mapData.put(n, Random.nextDouble) )
val parMapData = new ParHashMap[Int, Double]
Range(0, SZ).foreach(n => parMapData.put(n, Random.nextDouble) )
benchmark.map(mapF)(n)
benchmark.filter(filterF)(n)
|
The impact of the parallelization of collections is very similar across methods and
across collections. It's important to notice that the performance of the parallel
collections levels off at around four times the single thread collections for fie
concurrent tasks and above.