Showing posts with label Akka. Show all posts
Showing posts with label Akka. Show all posts

Friday, August 22, 2014

Akka Master-Worker Design

Target audience: Intermediate
Estimated reading time: 5'

This article describes and implements the Master-Workers design for distributed computing using Akka actors and Scala programming language.

Follow me on LinkedIn

Overview

Traditional multi-threaded applications rely on accessing data located in shared memory. The mechanism relies on synchronization monitors such as locks, mutexes or semaphores to avoid deadlocks and inconsistent mutable states. Those applications are difficult to debug because of race condition and incur the cost of a large number of context switches.
The Actor model addresses those issues by using immutable data structures (messages) and asynchronous (non-blocking) communication. The actor model has already been described in the previous post "Scala Share-nothing Actors". 

Note: This post focuses on the simple Master-worker model using Akka framework 2.3.4 and Scala 2.11.7

Master-workers model

In this design, the "worker" actors are initialized and managed by the "master" actor which is responsible for controlling the iterative process, state, and termination condition of the algorithm. The orchestration of the distributed tasks (or steps) executing the algorithm is performed through message passing:
* Activate from master to workers to launch the execution of distributed tasks
* Complete from workers to master to notify completion of tasks and return results.
* Terminate from master to terminate the worker actors.


The first step is to defined the immutable messages.

sealed abstract class Message(val id: Int)

case class Activate(i: Int, xt: Array[Double]) extends Message(i)
case class Completed(i: Int, xt: Array[Double]) extends Message(i)
case class Start(i: Int =0) extends Message(i)


The Start message is sent to the master by the client code, (external to the master-workers communication) to launch the computation.
The following sequence diagram illustrates the management of worker' tasks by the master actor through immutable, asynchronous messages.


The next step is to define the key attributes of the master. The constructor takes 4 arguments:
* A time series xt (line 5)
* A transformation function fct (line 6)
* A data partitioner (line 7)
* A method to aggregate the results from all the worker actors aggr (line 8)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type DblSeries = Array[Array[Double]]
type DblVector = Array[Double]
 
abstract class Master(
  xt: DblSeries, 
  fct: DblSeries => DblSeries, 
  partitioner: Partitioner, 
  aggr: (List[DblSeries]) => immutable.Seq[DblVector]) extends Actor{
 
   val workers = List.tabulate(partitioner.numPartitions)(n => 
       context.actorOf(Props(new Worker(n, fct)), 
           name = s"${worker_ String.valueOf(n)}"))
   
    workers.foreach( context.watch ( _ ) )
     // ...
}

The master actor creates list of worker actors, workers using the higher order method tabulate (line 10). The master registers the worker actor to be notified of their termination context.watch (line 14).

Events handler

In the implementation of the event handler receive for the master below, the Start message triggers the partitioning of the original dataset through a split function (line 3).
Upon completion of their tasks, the workers emit a Completed message to the master (line 6). The master counts the number of workers which have completed their tasks. Once all the workers have completed their tasks with the condition aggregator.size >= partitioner.numPartitions-1, the master computes the aggregated value (line 8), aggr then stop all the workers through its context workers.foreach( context.stop(_) ) (line 9).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
override def receive = {
    // Sent by client to master to initiate the computation
   case s: Start => split
 
    // Sent by workers on completion of their computation
   case msg: Completed => {
      if(aggregator.size >= partitioner.numPartitions-1) {
         val aggr = aggregate.take(MAX_NUM_DATAPOINTS).toArray
         workers.foreach( context.stop(_) )
      }
      aggregator.append(msg.xt)
   }
   
     // Sent by client to shutdown master and workers
   case Terminated(sender) => {
       // wait the current execution of workers completes
      if( aggregator.size >= partitioner.numPartitions-1) {
         context.stop(self)
         context.system.shutdown
      }
   }
}

The message Terminated (line 15) shuts down the master and the global context for all the actors, context.system.shutdown (lines 18 & 19).

The next step consists of defining the tasks for the worker actors. A worker actors is fully specified by its id and the data transformation fct (lines 2 & 3).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
final class Worker(
     id: Int, 
     fct: DblSeries => DblSeries) extends Actor {
   
  override def receive = {
     // Sent by master to start execution
     case msg: Activate => {
        val msgId = msg.id+id
        val output = fct(msg.xt)
        sender ! Completed(msgId, output)
  }
}

The event loop processes only one type of message, Activate, (line 7) which executes the data transformation fct (lines 8 & 9).

The last step is the implementation of the test application. Let's consider the case of the cancellation of noise on a very large dataset xt executed across multiple worker actors. The dedicated master actor of type NoiseRemover partitions the dataset using an instance of Partitioner distributed the cancellation algorithm cancelNoise to its worker (or slave) actors. The results aggregation function aggr has to be defined for this specific operation.

def cancelNoise(xt: DblSeries): DblSeries
 
class NoiseRemover(
    xt: DblSeries,
    partitioner: Partitioner,
    aggr: List[DblSeries] => immutable.Seq[DblVector])
 extends Master(xt, cancelNoise, partitioner, aggr)


The Akka actor context ActorSystem is initialized (line 1). The test driver implements a very simple results aggregation function aggregate passed as parameter of the noise remover master actor, controller (line 4). The reference to the controller is generated by the Akka actor factory method ActorSystem.actorOf (line 8).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val actorSystem = ActorSystem("System")
   
  // Specifies the aggregator used in the master
def aggregate(aggr: List[DblSeries]): Seq[DblVector] =
    aggr.transpose.map( _.sum).toSeq
 
  // Create the Akka master actor
val controller = actorSystem.actorOf(
   Props(new NoiseRemover(xt, partitioner, aggregate)), "Master"
)

controller ! Start(1)

Finally the execution is started with a "fire and forget" message Start (line 12)


Tuesday, July 22, 2014

Akka Actors Blocking on Futures

Target audience: Intermediate
Estimated reading time: 6'

This is a brief introduction to distributed computing using blocking Scala/Akka futures.


Table of contents
Follow me on LinkedIn

Overview

Akka is actor based and message-driven framework for building concurrent and reliable applications. As such Akka supports futures for which the requester never block waiting for a response. A message or request is sent for a execution and the expect response (success or failure/exception) is delivered asynchronously in the future. The code snippets in our two examples omits condition test or supporting methods for clarity sake. The code compiles and executes with Akka 2.2.4 and 2.3.6

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted 

Concurrent processing

One simple and common usage of Akka/Scala futures is to have some computation performed concurrently without going through the pain of creating actors.
Not every computation requires a sequential execution for which the input of one task depends on the output of another tasks. Let's consider the case of the evaluation of a model or function against a predefined time series or dataset.

The first step is to create a controller to manage the concurrent tasks, FuturesController (line 3). The controller takes the input time series xt (line 4) and a list of model candidates, xs as function of time Function1[Double] (line 5). The time series uses a single variable (dimension 1), so the models are simply defined as a one variable function (x: Double) => f(x).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
case class Start()
 
final class FuturesController(
  xt: Array[Double],
  xs: List[Double => Double])
 (implicit timeout: Timeout) extends Actor {
   
  override def receive = {
    case s: Start => {
      val futures = createFutures
      val results = futures.map(
         Await.result(_, timeout.duration)
      )

      sender ! results.zipWithIndex.minBy( _._1 )._2
    }
    case _ => logger.error("Message not recognized")
  }
 
  def createFutures: List[Future[Double]]
}

The event handler receive (line 8) for the message Start creates as many future as needed (equal to the number of models to evaluate) (line 10). The controller/actor blocks by invoking Await until each of the future tasks completes (line 12). Finally, the controller returns the result of the computation (line 15), in this case the fittest of the models xs. The handler logs a simple error message in case a message other than Start is received (line 17).

The futures are created through the method createFutures. The implementation of createFutures consists of computing the least squared error for each model relative to the input dataset using a map transformation and a fold aggregator (lines 4 - 7).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def createFutures: List[Future[Double]] =
  
  xs.map(f => Future[Double] { 
    val yt = Range(0, xt.size).map( f(_) )
    val r = (xt, yt).zipped./:(0.0)((sum, z) => {
       val diff = z._1 - z._2
       sum + diff*diff
    })

    Math.sqrt(r)
  })
}

Evaluation

Let's consider an input data set generated with the following model
              y = m(m-1) + r // r [0. 1]
where r is a random value between 0 and 1, representing noise.



val TIME_SERIES_LEN = 200
val xt = Array.tabulate(TIME_SERIES_LEN)(
    m => m*(m - 1.0) + Random.nextDouble
)

The following code snippet lists all key packages to be imported for most common applications using Akka actors and futures (lines 1 to 6).

The driver program instantiates the Actor system (line 12), creates the controller actor master using the Akka actor factory actorOf (lines 13, 14). It sends a ask request (line 18) and returns the best model (line 22) if the controller completes it tasks within a predefined timeout (line 10). The code print the stack trace in case of failure (line 23).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import akka.actor.{Actor, Props, ActorSystem}
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import akka.pattern.ask
 

private val duration = Duration(2000, "millis")
implicit val timeout = new Timeout(duration)
   
val actorSystem = ActorSystem("system")
val master = actorSystem.actorOf(
  Props(new FuturesController(xt, approx)), "Function_eval"
)
 
Try {
  val future = master ? Start()
  Await.result(future, timeout.duration)
} 
match {
  case Success(n) => logger.info(s"Best fit is $n")
  case Failure(e) => e.printStackTrace
}

actorSystem.shutdown


It is critical that the application shuts down the the Akka system before it exits (line 26).

References

Saturday, July 6, 2013

Scala's Share-Nothing actors

Target audience: Advanced
Estimated reading time: 6'

Introduction

Even with the introduction of executor service and java.util.concurrent high level of abstraction in Java 1.5, programmers have found quite difficult to build reliable multi-threaded applications that shared data and locks It is quite common for less experienced developers to either over-synchronize data access and create deadlocks or allow race conditions and transition the application to an inconsistent state.

Scala's actors is a share-nothing, message passing model. At its core, an actor is a 'thread' with a mailbox to receive and respond to messages. Actors are sub-classes of scala.actors.Actor. The two main methods create an actor are:
  • act: implements the co-routine that correspond to the execution of the thread, similar to Thread.run() in Java.

  • react: process the messages sent by other actors and queued in the mailbox. The method react does not return (non blocking) when receiving and processing a message or request. There are two approach to exit a processing of messages: call exit or call act again with an exit condition being true
Note: The implementation described in this post relies on Scala 2.0 and is not guarantee to compile and execute as expected in the future version of the language. 

Workers ...

The example below describes a master actor (managing task) that creates and manages slave actors (or worker tasks). In order to avoid race condition and adding a lock, the reference newParent to master actor is sent to each slave actor (line 10) through the message passing mechanism react (lines 9 - 13). 
The slave Actor class implements a task for numIters executions of a specific process (line 15). The only way to exit the react loop is to call once again and exit on the condition parent != null (line 9). The computation method process to be executed by those slaves is an attribute of the slave (line 4).
Finally, the slave actor sends a message to its parent that its task is completed (line 17).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class WorkerActor(
  numIters: Int = 25000, 
  message: String,
  process: (Int) =>(Double)) extends Actor {
 
  def act {
    var parent: Actor = null
    while( parent == null) {
      react {
        case (msg: String, newParent: Actor) => 
           parent = newParent
        act()
      }
    }
    process(numIters)   

    parent ! "DONE"
  }
}


... and master

The Master task or actor is responsible to launches then control slave actors. Once a worker actor is completed, it notifies the master through a message 'DONE' (line 15). The master actor starts all the worker tasks (line 7) and sends a non-blocking message, Activate (line 8).
Upon receiving the message DONE (line 15), the master actor decrements the reference count of the worker actor currently active as soon as one completes its execution (line 16). The master ultimately exits when the last worker completes its task and ultimately exits (
reference counter == 0) (line 17).


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MasterActor(
  slaveActors: List[WorkerActor]) extends Actor {
    
  def act() {
        
   for( workerActor <- workerActors) {
     workerActor.start
     workerActor ! ("Activate", this)
   }
        
   var refCounter = workerActors.size-1
   loop  {
      react {       
    
        case "DONE" => {
          refCounter -= 1
          if(refCounter == 0) 
             exit
        }
        case _ =>  { println("Incorrect message") }
      }
   }
  }
}

The main routine, ActorsTest.main, creates the worker which are launched by the master actor that acts as the managing task. The worker tasks execute a local function, waveSum, defined in real-time. This approach is an alternative to the most traditional functional futures.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
object ActorsTest extends App {
   val nOfWorkers = 10
   val numIters = 1250000
   val eps = 0.0001
      
   // Arbitrary method to simulate load on the CPU cores
   def waveSum(numIters: Int): Double =
     (0 until numIters)./:(0.0)(
       (s,i)=> s+Math.exp(Math.sin(i*eps)
      ) 
         
      // Create the worker tasks, then ...
   val workers = (0 until nOfWorker)./:(List[WorkerActor]())(
    (xs, i)=> new WorkerActor(numIters, i.toString, waveSum) :: xs
   )
   new MasterActor(sworkers).start
  }
}


References