Showing posts with label Scala concurrency. Show all posts
Showing posts with label Scala concurrency. Show all posts

Tuesday, August 22, 2017

Scala Futures with Callbacks

Target audience: Intermediate
Estimated reading time: 6'

Futures are a critical concept in Scala and parallel computing. This post describes and illustrates the futures in Scala and Akka as a non-blocking mechanism in distributed computing.


Table of contents
Follow me on LinkedIn
Notes
  • Implementation relies on Scala 2.11.8
  • For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted 

Overview

Akka supports futures for which the requester never block waiting for a response. A message or request is sent for a execution and the expect response (success or failure/exception) is delivered asynchronously back to the client controller.
In a previous post, we looked into Akka futures for which the client is blocked until all the concurrent tasks (futures) have completed. In this post, we look into the non-blocking approach that relies on onSuccess and onFailure callback handlers. We reuse the same example as with the blocking design


Futures callback design

The first step is to create a controller to manage the concurrent tasks, FuturesController. The controller takes the input time series xt and a list of model candidates. As the time series uses a single variable, the model are simply defined as a one variable function  (x: Double) => f(x).

 1
 2
 3
 4 
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
case class Launch()
 
final class FuturesController(
    xt: Array[Double],
    xs: List[Double => Double])
    (implicit timeout: Timeout) extends Actor {
   
    // Event/message handling loop/thread
    // implemented as a partial function
    override def receive = {
        case s: Launch => 
            sender ! processFutures(createFutures)
    }
 
    def createFutures: List[Future[Double]]
    def processFutures(futures: List[Future[Double]]): Double
}

The event handler receive (line 10) for the message Launch (line 1) creates as many futures as needed (equal to the number of models to evaluate) createFutures (line 12). The futures are then passed to the method processFutures for handling the callbacks notification (line 16).
The futures are created with their computation (or execution) unit. In this particular case, the task consists of computing the least squared error for each model relative to the input dataset (lines 5 to 9)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def createFutures: List[Future[Double]] = 
   xs.map(f => Future[Double] { 
    
       val yt = Range(0, xt.size).map( f(_) )
       val r = (xt, yt).zipped.foldLeft(0.0)((sum, z) => {
           val diff = z._1 - z._2
           sum + diff*diff
       })
       Math.sqrt(r)
  })

The method processFutures iterates through the notification (success/failure) from all the futures [1]. It filters out the future task(s) which fails [2] then select the model with the lowest least square error [3].
This version allows to catch an exception before applying the filter. The filter catches the incorrect values marked at -1 (2).

def processFutures(futures: List[Future[Double]]): Double =

   futures.map(f => { //1 
       f onSuccess { case y => y }
       f onFailure { case e: Exception => { 
           println(e.toString)
           -1.0 
       }}
   }).filter( _ != -1.0) // 2
     .zipWithIndex.minBy( _._1)._2 //3


Evaluation

Let's consider a very simple application which consists of distributing the following computation
   s(x) = f(x) + g(x) + h(x)  // ff(0), ff(1), ff(2)
Each computation relies on a different ff polynomial functions with an increasing order of complexity. Each of the three computations are assigned to one actor running on a dedicated CPU core.

val ff = List[Double => Double](
   (x: Double) => 3.0*x,
   (x: Double) => x*x + x,
   (x: Double) => x*x*x + x*x + x
)

The compute method create a list of futures from the list ff of computational functions. Then the method process the call back onSuccess and onFailure
The first version is a detailed implementation of the creation and processing of futures

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

def compute(x: Double): Double = {
    // Create a future for each of the component of ff
   val futures = ff.map(f => Future[Double] { f(x) } )
 
     // processes the results from each future
   var sum = 0.0
   futures.foreach( f => {
       f onSuccess { case x: Double => sum += x }
       f onFailure { case e: Exception => println(e.toString) }
    })
    sum
}
 
val x = 2.0
println(s"f($x) = ${compute(x)}")

The appropriate implicit conversions needs to be imported (line 2). A future is created from each of the three polynomial function ff (line 6). Then the output of each of the future execution is aggregated (line 11). This version is relies on a two step process
  • Creation of futures (line 6)
  • Computation and aggregation (lines 10 - 14)
A more compact version relies on the foldLeft higher order method to combine the creation and execution of futures.

1
2
3
4
5
6
7
8
9
def compute(x: Double): Double = {

   ff.map(f => Future[Double] { f(x) } )./:(0.0)((s, f) => {
       var y = 0.0
       f onSuccess { case x: Double => y = s + x  }
       f onFailure { case e: Exception =>  y = s }
       y
   })
}


References