Showing posts with label Actors. Show all posts
Showing posts with label Actors. Show all posts

Friday, August 22, 2014

Akka Master-Worker Design

Target audience: Intermediate
Estimated reading time: 5'

This article describes and implements the Master-Workers design for distributed computing using Akka actors and Scala programming language.

Follow me on LinkedIn

Overview

Traditional multi-threaded applications rely on accessing data located in shared memory. The mechanism relies on synchronization monitors such as locks, mutexes or semaphores to avoid deadlocks and inconsistent mutable states. Those applications are difficult to debug because of race condition and incur the cost of a large number of context switches.
The Actor model addresses those issues by using immutable data structures (messages) and asynchronous (non-blocking) communication. The actor model has already been described in the previous post "Scala Share-nothing Actors". 

Note: This post focuses on the simple Master-worker model using Akka framework 2.3.4 and Scala 2.11.7

Master-workers model

In this design, the "worker" actors are initialized and managed by the "master" actor which is responsible for controlling the iterative process, state, and termination condition of the algorithm. The orchestration of the distributed tasks (or steps) executing the algorithm is performed through message passing:
* Activate from master to workers to launch the execution of distributed tasks
* Complete from workers to master to notify completion of tasks and return results.
* Terminate from master to terminate the worker actors.


The first step is to defined the immutable messages.

sealed abstract class Message(val id: Int)

case class Activate(i: Int, xt: Array[Double]) extends Message(i)
case class Completed(i: Int, xt: Array[Double]) extends Message(i)
case class Start(i: Int =0) extends Message(i)


The Start message is sent to the master by the client code, (external to the master-workers communication) to launch the computation.
The following sequence diagram illustrates the management of worker' tasks by the master actor through immutable, asynchronous messages.


The next step is to define the key attributes of the master. The constructor takes 4 arguments:
* A time series xt (line 5)
* A transformation function fct (line 6)
* A data partitioner (line 7)
* A method to aggregate the results from all the worker actors aggr (line 8)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type DblSeries = Array[Array[Double]]
type DblVector = Array[Double]
 
abstract class Master(
  xt: DblSeries, 
  fct: DblSeries => DblSeries, 
  partitioner: Partitioner, 
  aggr: (List[DblSeries]) => immutable.Seq[DblVector]) extends Actor{
 
   val workers = List.tabulate(partitioner.numPartitions)(n => 
       context.actorOf(Props(new Worker(n, fct)), 
           name = s"${worker_ String.valueOf(n)}"))
   
    workers.foreach( context.watch ( _ ) )
     // ...
}

The master actor creates list of worker actors, workers using the higher order method tabulate (line 10). The master registers the worker actor to be notified of their termination context.watch (line 14).

Events handler

In the implementation of the event handler receive for the master below, the Start message triggers the partitioning of the original dataset through a split function (line 3).
Upon completion of their tasks, the workers emit a Completed message to the master (line 6). The master counts the number of workers which have completed their tasks. Once all the workers have completed their tasks with the condition aggregator.size >= partitioner.numPartitions-1, the master computes the aggregated value (line 8), aggr then stop all the workers through its context workers.foreach( context.stop(_) ) (line 9).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
override def receive = {
    // Sent by client to master to initiate the computation
   case s: Start => split
 
    // Sent by workers on completion of their computation
   case msg: Completed => {
      if(aggregator.size >= partitioner.numPartitions-1) {
         val aggr = aggregate.take(MAX_NUM_DATAPOINTS).toArray
         workers.foreach( context.stop(_) )
      }
      aggregator.append(msg.xt)
   }
   
     // Sent by client to shutdown master and workers
   case Terminated(sender) => {
       // wait the current execution of workers completes
      if( aggregator.size >= partitioner.numPartitions-1) {
         context.stop(self)
         context.system.shutdown
      }
   }
}

The message Terminated (line 15) shuts down the master and the global context for all the actors, context.system.shutdown (lines 18 & 19).

The next step consists of defining the tasks for the worker actors. A worker actors is fully specified by its id and the data transformation fct (lines 2 & 3).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
final class Worker(
     id: Int, 
     fct: DblSeries => DblSeries) extends Actor {
   
  override def receive = {
     // Sent by master to start execution
     case msg: Activate => {
        val msgId = msg.id+id
        val output = fct(msg.xt)
        sender ! Completed(msgId, output)
  }
}

The event loop processes only one type of message, Activate, (line 7) which executes the data transformation fct (lines 8 & 9).

The last step is the implementation of the test application. Let's consider the case of the cancellation of noise on a very large dataset xt executed across multiple worker actors. The dedicated master actor of type NoiseRemover partitions the dataset using an instance of Partitioner distributed the cancellation algorithm cancelNoise to its worker (or slave) actors. The results aggregation function aggr has to be defined for this specific operation.

def cancelNoise(xt: DblSeries): DblSeries
 
class NoiseRemover(
    xt: DblSeries,
    partitioner: Partitioner,
    aggr: List[DblSeries] => immutable.Seq[DblVector])
 extends Master(xt, cancelNoise, partitioner, aggr)


The Akka actor context ActorSystem is initialized (line 1). The test driver implements a very simple results aggregation function aggregate passed as parameter of the noise remover master actor, controller (line 4). The reference to the controller is generated by the Akka actor factory method ActorSystem.actorOf (line 8).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val actorSystem = ActorSystem("System")
   
  // Specifies the aggregator used in the master
def aggregate(aggr: List[DblSeries]): Seq[DblVector] =
    aggr.transpose.map( _.sum).toSeq
 
  // Create the Akka master actor
val controller = actorSystem.actorOf(
   Props(new NoiseRemover(xt, partitioner, aggregate)), "Master"
)

controller ! Start(1)

Finally the execution is started with a "fire and forget" message Start (line 12)


Saturday, July 6, 2013

Scala's Share-Nothing actors

Target audience: Advanced
Estimated reading time: 6'

Introduction

Even with the introduction of executor service and java.util.concurrent high level of abstraction in Java 1.5, programmers have found quite difficult to build reliable multi-threaded applications that shared data and locks It is quite common for less experienced developers to either over-synchronize data access and create deadlocks or allow race conditions and transition the application to an inconsistent state.

Scala's actors is a share-nothing, message passing model. At its core, an actor is a 'thread' with a mailbox to receive and respond to messages. Actors are sub-classes of scala.actors.Actor. The two main methods create an actor are:
  • act: implements the co-routine that correspond to the execution of the thread, similar to Thread.run() in Java.

  • react: process the messages sent by other actors and queued in the mailbox. The method react does not return (non blocking) when receiving and processing a message or request. There are two approach to exit a processing of messages: call exit or call act again with an exit condition being true
Note: The implementation described in this post relies on Scala 2.0 and is not guarantee to compile and execute as expected in the future version of the language. 

Workers ...

The example below describes a master actor (managing task) that creates and manages slave actors (or worker tasks). In order to avoid race condition and adding a lock, the reference newParent to master actor is sent to each slave actor (line 10) through the message passing mechanism react (lines 9 - 13). 
The slave Actor class implements a task for numIters executions of a specific process (line 15). The only way to exit the react loop is to call once again and exit on the condition parent != null (line 9). The computation method process to be executed by those slaves is an attribute of the slave (line 4).
Finally, the slave actor sends a message to its parent that its task is completed (line 17).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class WorkerActor(
  numIters: Int = 25000, 
  message: String,
  process: (Int) =>(Double)) extends Actor {
 
  def act {
    var parent: Actor = null
    while( parent == null) {
      react {
        case (msg: String, newParent: Actor) => 
           parent = newParent
        act()
      }
    }
    process(numIters)   

    parent ! "DONE"
  }
}


... and master

The Master task or actor is responsible to launches then control slave actors. Once a worker actor is completed, it notifies the master through a message 'DONE' (line 15). The master actor starts all the worker tasks (line 7) and sends a non-blocking message, Activate (line 8).
Upon receiving the message DONE (line 15), the master actor decrements the reference count of the worker actor currently active as soon as one completes its execution (line 16). The master ultimately exits when the last worker completes its task and ultimately exits (
reference counter == 0) (line 17).


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MasterActor(
  slaveActors: List[WorkerActor]) extends Actor {
    
  def act() {
        
   for( workerActor <- workerActors) {
     workerActor.start
     workerActor ! ("Activate", this)
   }
        
   var refCounter = workerActors.size-1
   loop  {
      react {       
    
        case "DONE" => {
          refCounter -= 1
          if(refCounter == 0) 
             exit
        }
        case _ =>  { println("Incorrect message") }
      }
   }
  }
}

The main routine, ActorsTest.main, creates the worker which are launched by the master actor that acts as the managing task. The worker tasks execute a local function, waveSum, defined in real-time. This approach is an alternative to the most traditional functional futures.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
object ActorsTest extends App {
   val nOfWorkers = 10
   val numIters = 1250000
   val eps = 0.0001
      
   // Arbitrary method to simulate load on the CPU cores
   def waveSum(numIters: Int): Double =
     (0 until numIters)./:(0.0)(
       (s,i)=> s+Math.exp(Math.sin(i*eps)
      ) 
         
      // Create the worker tasks, then ...
   val workers = (0 until nOfWorker)./:(List[WorkerActor]())(
    (xs, i)=> new WorkerActor(numIters, i.toString, waveSum) :: xs
   )
   new MasterActor(sworkers).start
  }
}


References

Saturday, December 15, 2012

Introduction of Scala to Java 7 Developers

Target audience: Beginner
Estimated reading time: 4'


Table of contents

The purpose of this post is to introduce the Scala programming language from the perspective of an experienced Java developer. It is assumed that the reader has a basic understanding of design patterns and multi-threaded applications.
In a nutshell, Scala is:
-
Object-oriented
- Functional
- Scalable (Actors and immutability)

The elements of the Scala programming language presented in this post are a small subset of the rich set of features developers would benefit from the language.

Notes: 
  • This post relies on Scala 2.10
  • For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.

Immutability

Scala relies on immutable object and collections to reduce complexity associated with multi-threaded applications, eliminate race condition, simplify debugging.
The CRU (
Create, Read, Update) object lifecycle in Java is reduced to CR (Create, Read) in Scala.
The Scala standard library offers a large variety of immutable collections.

val x = 23.5  // Read only
val mp = new immutable.Map[String, String]
val xs = List[Any]()


Traits vs. interface

Java interfaces are purely declarative. Scala traits allows:
* method implementation
* member value initialization
* abstract and
lazy values (to be initialized through sub-class or on demand)

Scala traits cannot be instantiated and do not have constructors. Scala emulates multiple inheritance by stacking traits (Cake pattern).

trait MyObject[T] {
   val tobeDefined: Int
   def add(t: T): Option[T] = {}
   def head: T = { }
}

Lazy values

Scala supports lazy values that are declared as member of a class but instantiated once and only once when they are accessed

class MyObject[T] {
    lazy val n = (n<<2) + Random.next(n)
    ...
}

Implicits

3rd party libraries and frameworks cannot always be extended through composition or inheritance. Scala implicit class/types and methods allows a "behind the scene" type conversion. The following code snippet converts a double value into a vector/array of double of size 1. The implicit is automatically invoke whenever it is imported into another section of the code base: for instance, the vector v is constructed through the implicit conversion dbl2Vec.

type DblVector = Array[Double]
    // Definition of implicit
implicit def dbl2Vec(x: Double): DblVector = Array.fill(1)(x)
  ...
val v: DblVector = 4.5

The following example extends the concept of Exception handling, Try with a concept of Either converting any instance of Try to an instance of Either using toEither method of the implicit class Try2Either. The implicit class conversion is used in handling a rounding error in the normalize function.

implicit class Try2Either[T](_try: Try[T]) {
   def toEither[U](r: ()=>U)(f: T=>T): Either[U, T] = _try match {
      case Success(res) => Right(f(res))
      case Failure(e) => Console.println(e.toString); Left(r())  
   }
}

/**
 * Method fails and recover if the denominator is too small
*/
def normalize(x: DblVector): Either[Recover, DblVector] = Try {
    val sum = x.sum 
    if(sum > 0.01) x.map( _ / sum) 
    else throw new IllegalStateException("prob out of range")  
}
.toEither(() => new Recover)((v: DblVector) => v.map( Math.log(_)))


Tail recursion

Although convenient, traditional recursive implementation of computation can be ineffective, leaving developers to rely on iterative algorithms.
Scala's tail recursion is a very effective way to iterate through a data sequence without incurring a performance overhead because it avoids the creation of new stack frames during the recursion.
The following code snippet implements the Viterbi algorithm used to extract the most likely sequence of latent states in an hidden Markov model given a lambda model and a set of observations.

@scala.annotation.tailrec
private def viterbi(t: Int): Double = {

  if( t == 0) initial   
  else { 
    Range(0, lambda.getN).foreach( updateMaxDelta(t, _) )   
    if( t ==  obs.size-1) {
        val idxMaxDelta = Range(0, lambda.getN).map(i => 
                    (i, state.delta(t, i))).maxBy(_._2)
        state.QStar.update(t+1, idxMaxDelta._1)
        idxMaxDelta._2
    }
    else viterbi(t+1)
  }
}

Actor model

Traditional multi-threaded applications rely on accessing data located in shared memory. The mechanism relies on synchronization monitors such as locks, mutexes or semaphores to avoid deadlocks and inconsistent mutable states. Those applications are difficult to debug because of race condition and incur the cost of a large number of context switches.
The Actor model addresses those issues by using immutable data structures (messages) and asynchronous (non-blocking) communication.
The actors exchange messages and data using immutable messages as described in the example below.

sealed abstract class Message(val id: Int)
case class Activate(i: Int, xt: Array[Double]) extends Message(i)
case class Completed(i: Int, xt: Array[Double]) extends Message(i)
case class Start(i: Int =0) extends Message(i)


An actor processes messages queued in its mailbox asynchronously, FIFO. The method, receive which returns a PartialFunction is the message processing loop, which correspond to the Runnable.run() in Java

final class Worker(
     id: Int, 
     fct: DblSeries => DblSeries) 
extends Actor {
    // Actor event/message loop
  override def receive = {
    case msg: Activate => {
      val msgId = msg.id+id
      val output = fct(msg.xt)
      sender ! Completed(msgId, output)
  }
}

Syntactic similarities

Java developers will find comfort knowing that Scala preserves a lot of constructs and idioms which make Java 7 such a wide spread and supported programming language.

 Construct  Scala 2.10Java 7
Variable         var x: Float = 1.0F  float x = 1.0F;
Value val n = 4L
Constants final val MAX_TEMP: Float = 232.0F final float MAX_TEMP = 232.0F;
Class class Person { ...} public class Person { }
Constructor class Person(
  name: String,
  age: Int) {}
public class Person {
....private String name;
....private int age;
....public Person(String name, int age) {}
}
Setters Not applicable
Immutability
public class Person {
.. public void setName(String name) {}
}
Getters class Person(
  val name: String,
  val age: Int) {}

....//Public access to member values.
public class Person {
...public final String getName() {}
}
Method Arguments validation def add(index: Int): Unit = {
....require(index>=0 && index<size, ..
..    
s"$index out of bounds")
...
public void add(int index) {
...if( index < 0 || index >= size) {
... ...throw new IllegalArgumentException(...);
....
Singleton object Man extends Person {} public class Man extends Person {
....private static Man instance = new Man();
... private Man();
....public static Man getInstance() {
........ return instance;
... }
}
Overriding override def getName(id : Int) : String {} @override
public final getName(int id) {}
Interfaces trait Searchable {
..def search(id : Int) : String
}
interface Searchable {
...String search(int id);
}
Type Casting n : Int = o.asInstanceOf[Int] Integer n = (Integer)o;
Type Checking n.isInstanceOf[Int] n instanceOf Integer
Generics class Stack[T](val maxSize : Int) {
....def push(t : T) : Unit = {}
....def pop : T = {}
}
public class Stack <T> {
.....private int maxSize;
.....public Stack(int maxSize) {}
.....public void push(T t) {}
.....public <T> pop() {}
}
Abstract Class abstract
class Collection[T](max : Int) {

...def add(t : T) : Unit
...def get(index : Int) : Int
}
abstract
public class Collection <T> {

...abstract public void add(T t);
.. abstract public <T> get(int index);
}
Exception Try { .... }
match { 
....case Success(res) =>{}
... case Failure(e) => {}
}
try { .. }
catch( NullPointerException ex) {}
catch( Exception ex) {}
Bounded type
Parameters
class List[T <: Float] {}
class List[T >: Node] {}
public class List<T extend Float> {}
public class List <T super Node> {}
Wildcard
Parameters
class List[+T] {}
class List[-Node] {}
public class List<? extends T> {}
public class List<? super Node> {}
Index-based
For loop
for( i <= 0 until n) {} for( int i = 0; i < n; i++) {}
Iteration
Collection
for(index <- indices) {}for(int index : indices) {}


Key differences

There are quite a few important and tricky programmatic difference between Scala 2.11 and Java 7
  • == In Scala == compares instances similarly to equals. In Java == compares the reference to instances
  • Comparable integer java.lang.Integer supports comparable interface while scala.Int does not
  • Static method Java supports explicit static methods. Scala does not, although static methods are implicitly defined as method of objects (Singletons)
  • Inner class In Scala, Inner class are accessed through the reference Self to the outer class. Java accesses Inner class through the Outer class
  • Checked exceptions Scala does not support Checked exceptions, Java does.
  • Primitive types As a pure object oriented, Scala does not support primitive types. Java does
  • Expression as a value As a functional language, Scala defines expressions such as if ... else ... as value, that can be evaluated and assigned to. Java does not
  • Immutable collections Scala supports immutable collections by default. Java does not.
  • Localized imports and type aliases Scala has scoped import and type aliases. Java does not
  • Explicit operator overloading Scala allows developers to overload operator through the definition of new methods. Java does not.
  • Actor Scala provides full parallelism with actor and immutable messages. Java does not.
  • Futures Scala futures are asynchronous and can be updated thorough promises. Java does not support the concept
  • Higher order kind Scala has higher order kind, such as Monad. Java does not
  • Tail recursion (or elimination) Scala supports tail recursion in order to avoid stack overflow. Java does not
  • Monad and functors are core concept in Scala, but not in Java.

Note This comparative analysis relies on version 2.10.x of Scala language and Java 7. Future version of Scala and Java may produce a slightly different result.

So much more....

This post barely scratches the surface of the capabilities of Scala which include
  • Futures
  • Partial functions and partially applied functions
  • Dependency injection
  • Monadic representation
  • Macros
  • View bounded type parameterization
  • F-Bounded polymorphism
  • Closures
  • Delimited continuation
  • Method argument currying

References