Sunday, September 14, 2014

Mixin Constraint on Self-typed Methods

Target audience: Intermediate
Estimated reading time: 4'

This post illustrates the appropriate use of self-type to restrict the composition of (stackable) traits (mixins) in relation to an existing class or trait.

Follow me on LinkedIn

Overview

Mixin traits with self-type restriction has commonly used in Scala. Dependency injection and the Cake pattern in particular, relies on constraint imposed by a trait that it can be used only with subclass of a predefined types. The same approach can be used to constraint a trait to be used with class that support one or several methods.

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.

Mixin constraint on self-type

Let's consider the case of a class taxonomy (or hierarchy) that classifies machine learning algorithms as either supervised learning or unsupervised learning.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
trait Learning {
  def predict(x: Double): Double { }
}
 
trait SupervisedLearning {
  def train(x: Array[Double]): Int = { ... }
}
 
trait Validation {
  self: SupervisedLearning => 
    def validate(x: Array[Double]): Double 
}
 
class SVM 
  extends SupervisedLearning 
    with Validation {

  override def train(x: Array[Double]): Int = { ... }
}

The support vector machine of type SVM is a type of supervised learning algorithm, and therefore extends the SupervisedLearning trait (line 5 & 115). The code snippet compiles because the class SVM (line 14) complies with the restriction imposed by the trait Validation on sub-types of SupervisedLearning (line 10).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
trait UnsupervisedLearning {
  def group(x: Array[Double]): int
}
 
    // Validation: failed self-typed inheritance 
    // from SupervisedLearning trait
class EM 
  extends UnupervisedLearning 
    with Validation { 
  
  override def train(x: Array[Double]): Int = { ... }
}

The Scala code snippet does not compile because the expectation-maximization algorithm, EM is an unsupervised learning algorithm and therefore is not a sub-class of SupervisedLearning.

Mixin constraint on self-typed method

Marking a trait to be extended (implemented) with sub-class with predefined method(s) is the same as marking a trait to be extended (implemented) with sub-class with predefined type.
Let's reuse the class hierarchy introduced in the previous section.

trait Validation { 
  self: { def train(x: Array[Double]): Int } =>
     def validate(x: Array[Double]): Double = -1.0
}

This time around the Validation can be mixed with a class that implements the method train
As previously seen, the class SVM complies with the restriction imposed by the Validation. However the declaration of the reinforcement learning algorithm QLearning generated a compilation error because it does not implement the method train as required.

// No training needed for Q-Learning
class QLearning 
  extends Learning 
     with Validation{ 
   
  def predict(x: Double): Double { } 
}

Although brief, this introduction to self-referential condition should help you to start considering this technique to protect you code from unintentional erroneous sub-typing and trait mixing.

References

  • Scala Cookbook A. Alexander O' Reilly 2013 
  • The Scala Programming Language - M. Odersky, L. Spoon, B.Venners - Artima 2007
  • github.com/patnicolas

Friday, August 22, 2014

Akka Master-Worker Design

Target audience: Intermediate
Estimated reading time: 5'

This article describes and implements the Master-Workers design for distributed computing using Akka actors and Scala programming language.

Follow me on LinkedIn

Overview

Traditional multi-threaded applications rely on accessing data located in shared memory. The mechanism relies on synchronization monitors such as locks, mutexes or semaphores to avoid deadlocks and inconsistent mutable states. Those applications are difficult to debug because of race condition and incur the cost of a large number of context switches.
The Actor model addresses those issues by using immutable data structures (messages) and asynchronous (non-blocking) communication. The actor model has already been described in the previous post "Scala Share-nothing Actors". 

Note: This post focuses on the simple Master-worker model using Akka framework 2.3.4 and Scala 2.11.7

Master-workers model

In this design, the "worker" actors are initialized and managed by the "master" actor which is responsible for controlling the iterative process, state, and termination condition of the algorithm. The orchestration of the distributed tasks (or steps) executing the algorithm is performed through message passing:
* Activate from master to workers to launch the execution of distributed tasks
* Complete from workers to master to notify completion of tasks and return results.
* Terminate from master to terminate the worker actors.


The first step is to defined the immutable messages.

sealed abstract class Message(val id: Int)

case class Activate(i: Int, xt: Array[Double]) extends Message(i)
case class Completed(i: Int, xt: Array[Double]) extends Message(i)
case class Start(i: Int =0) extends Message(i)


The Start message is sent to the master by the client code, (external to the master-workers communication) to launch the computation.
The following sequence diagram illustrates the management of worker' tasks by the master actor through immutable, asynchronous messages.


The next step is to define the key attributes of the master. The constructor takes 4 arguments:
* A time series xt (line 5)
* A transformation function fct (line 6)
* A data partitioner (line 7)
* A method to aggregate the results from all the worker actors aggr (line 8)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type DblSeries = Array[Array[Double]]
type DblVector = Array[Double]
 
abstract class Master(
  xt: DblSeries, 
  fct: DblSeries => DblSeries, 
  partitioner: Partitioner, 
  aggr: (List[DblSeries]) => immutable.Seq[DblVector]) extends Actor{
 
   val workers = List.tabulate(partitioner.numPartitions)(n => 
       context.actorOf(Props(new Worker(n, fct)), 
           name = s"${worker_ String.valueOf(n)}"))
   
    workers.foreach( context.watch ( _ ) )
     // ...
}

The master actor creates list of worker actors, workers using the higher order method tabulate (line 10). The master registers the worker actor to be notified of their termination context.watch (line 14).

Events handler

In the implementation of the event handler receive for the master below, the Start message triggers the partitioning of the original dataset through a split function (line 3).
Upon completion of their tasks, the workers emit a Completed message to the master (line 6). The master counts the number of workers which have completed their tasks. Once all the workers have completed their tasks with the condition aggregator.size >= partitioner.numPartitions-1, the master computes the aggregated value (line 8), aggr then stop all the workers through its context workers.foreach( context.stop(_) ) (line 9).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
override def receive = {
    // Sent by client to master to initiate the computation
   case s: Start => split
 
    // Sent by workers on completion of their computation
   case msg: Completed => {
      if(aggregator.size >= partitioner.numPartitions-1) {
         val aggr = aggregate.take(MAX_NUM_DATAPOINTS).toArray
         workers.foreach( context.stop(_) )
      }
      aggregator.append(msg.xt)
   }
   
     // Sent by client to shutdown master and workers
   case Terminated(sender) => {
       // wait the current execution of workers completes
      if( aggregator.size >= partitioner.numPartitions-1) {
         context.stop(self)
         context.system.shutdown
      }
   }
}

The message Terminated (line 15) shuts down the master and the global context for all the actors, context.system.shutdown (lines 18 & 19).

The next step consists of defining the tasks for the worker actors. A worker actors is fully specified by its id and the data transformation fct (lines 2 & 3).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
final class Worker(
     id: Int, 
     fct: DblSeries => DblSeries) extends Actor {
   
  override def receive = {
     // Sent by master to start execution
     case msg: Activate => {
        val msgId = msg.id+id
        val output = fct(msg.xt)
        sender ! Completed(msgId, output)
  }
}

The event loop processes only one type of message, Activate, (line 7) which executes the data transformation fct (lines 8 & 9).

The last step is the implementation of the test application. Let's consider the case of the cancellation of noise on a very large dataset xt executed across multiple worker actors. The dedicated master actor of type NoiseRemover partitions the dataset using an instance of Partitioner distributed the cancellation algorithm cancelNoise to its worker (or slave) actors. The results aggregation function aggr has to be defined for this specific operation.

def cancelNoise(xt: DblSeries): DblSeries
 
class NoiseRemover(
    xt: DblSeries,
    partitioner: Partitioner,
    aggr: List[DblSeries] => immutable.Seq[DblVector])
 extends Master(xt, cancelNoise, partitioner, aggr)


The Akka actor context ActorSystem is initialized (line 1). The test driver implements a very simple results aggregation function aggregate passed as parameter of the noise remover master actor, controller (line 4). The reference to the controller is generated by the Akka actor factory method ActorSystem.actorOf (line 8).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val actorSystem = ActorSystem("System")
   
  // Specifies the aggregator used in the master
def aggregate(aggr: List[DblSeries]): Seq[DblVector] =
    aggr.transpose.map( _.sum).toSeq
 
  // Create the Akka master actor
val controller = actorSystem.actorOf(
   Props(new NoiseRemover(xt, partitioner, aggregate)), "Master"
)

controller ! Start(1)

Finally the execution is started with a "fire and forget" message Start (line 12)