Saturday, July 6, 2013

Scala's Share-Nothing actors

Target audience: Advanced
Estimated reading time: 6'

Introduction

Even with the introduction of executor service and java.util.concurrent high level of abstraction in Java 1.5, programmers have found quite difficult to build reliable multi-threaded applications that shared data and locks It is quite common for less experienced developers to either over-synchronize data access and create deadlocks or allow race conditions and transition the application to an inconsistent state.

Scala's actors is a share-nothing, message passing model. At its core, an actor is a 'thread' with a mailbox to receive and respond to messages. Actors are sub-classes of scala.actors.Actor. The two main methods create an actor are:
  • act: implements the co-routine that correspond to the execution of the thread, similar to Thread.run() in Java.

  • react: process the messages sent by other actors and queued in the mailbox. The method react does not return (non blocking) when receiving and processing a message or request. There are two approach to exit a processing of messages: call exit or call act again with an exit condition being true
Note: The implementation described in this post relies on Scala 2.0 and is not guarantee to compile and execute as expected in the future version of the language. 

Workers ...

The example below describes a master actor (managing task) that creates and manages slave actors (or worker tasks). In order to avoid race condition and adding a lock, the reference newParent to master actor is sent to each slave actor (line 10) through the message passing mechanism react (lines 9 - 13). 
The slave Actor class implements a task for numIters executions of a specific process (line 15). The only way to exit the react loop is to call once again and exit on the condition parent != null (line 9). The computation method process to be executed by those slaves is an attribute of the slave (line 4).
Finally, the slave actor sends a message to its parent that its task is completed (line 17).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class WorkerActor(
  numIters: Int = 25000, 
  message: String,
  process: (Int) =>(Double)) extends Actor {
 
  def act {
    var parent: Actor = null
    while( parent == null) {
      react {
        case (msg: String, newParent: Actor) => 
           parent = newParent
        act()
      }
    }
    process(numIters)   

    parent ! "DONE"
  }
}


... and master

The Master task or actor is responsible to launches then control slave actors. Once a worker actor is completed, it notifies the master through a message 'DONE' (line 15). The master actor starts all the worker tasks (line 7) and sends a non-blocking message, Activate (line 8).
Upon receiving the message DONE (line 15), the master actor decrements the reference count of the worker actor currently active as soon as one completes its execution (line 16). The master ultimately exits when the last worker completes its task and ultimately exits (
reference counter == 0) (line 17).


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MasterActor(
  slaveActors: List[WorkerActor]) extends Actor {
    
  def act() {
        
   for( workerActor <- workerActors) {
     workerActor.start
     workerActor ! ("Activate", this)
   }
        
   var refCounter = workerActors.size-1
   loop  {
      react {       
    
        case "DONE" => {
          refCounter -= 1
          if(refCounter == 0) 
             exit
        }
        case _ =>  { println("Incorrect message") }
      }
   }
  }
}

The main routine, ActorsTest.main, creates the worker which are launched by the master actor that acts as the managing task. The worker tasks execute a local function, waveSum, defined in real-time. This approach is an alternative to the most traditional functional futures.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
object ActorsTest extends App {
   val nOfWorkers = 10
   val numIters = 1250000
   val eps = 0.0001
      
   // Arbitrary method to simulate load on the CPU cores
   def waveSum(numIters: Int): Double =
     (0 until numIters)./:(0.0)(
       (s,i)=> s+Math.exp(Math.sin(i*eps)
      ) 
         
      // Create the worker tasks, then ...
   val workers = (0 until nOfWorker)./:(List[WorkerActor]())(
    (xs, i)=> new WorkerActor(numIters, i.toString, waveSum) :: xs
   )
   new MasterActor(sworkers).start
  }
}


References

Wednesday, June 5, 2013

Don't Fear the Monad: Scala

Target audience: Advanced
Estimated reading time: 6'

In the initial part of our series on monads, Don't fear the Monad: Theory we explored the fundamental elements of category theory, including functors, monads, and natural transformations. This post now delves into their practical implementation in Scala.

Table of contents
Follow me on LinkedIn

Note: To enhance the clarity of algorithm implementation, we've omitted non-essential code such as error checks, comments, exceptions, validation of class and method arguments, scoping qualifiers, and imports.

Overview

I assume that the reader is familiar with the theory behind Functors & Monads. If not, one of my  older posts, Don't fear the monad: Theory  should provide you with some understanding of those concepts.

In the previous post we introduced a Monad as a structure or triple M = <T,eta,mu> on a category X consists of
  - A map: applicative functor from category X to category Y)   T : X->Y
  - A unit: natural transformation  eta: 1 -> T
  - A join: multiplication or natural transformation mu: T*T -> T

Let's implement these monadic operators in Scala for some collections.

trait Monad[M[_]]  {
   def map[X,Y](f: X=>Y): M[X] => M[Y]
   def unit[X](x: X): M[X]
   def join[X](mu: M[M[X]]): M[X] 
}

The map method implements the natural transformation, phi. The unit method create a target category from an element (i.e. Double -> List[Double]). The join method enforces the mu natural transformation.

Monads and Collections

Let's use the list structure introduced in the post related to the theory of Monads (Don't fear the monad: Theory). 

val monadList = new Monad[List] {
    override def map[X,Y](f: X=>Y): List[X] => List[Y]= 
        (xs: List[X]) => xs.map(f)
    override def unit[X](x: X): List[X] = x :: Nil
    override def join[X](xs: List[List[X]]): List[X] = xs.flatten
}

The class Monad[List] is a wrapper around the List Monad. Therefore it is easy to implement all those 3 methods using the method of scala.collection.immutable.List class:
  • map: build a new list by applying the function f to all elements of the original list: x -> x*x => List(.. x ..) -> List( .. x*x ...) 
  • :: nil: create a single element list 
  •  flatten: Converts this list of lists into a List formed by concatenating the element of all the contained lists.
Let's consider X, Y be the category (or type) Int. The Monad can be applied to list of Integers 

val xs = monadList.map((n: Int) => n * n)
xs(List(4, 11, 6)).foreach( println ) 
  
val xss : List[List[Int]] = List( List(3,5,6), List(11,34,12,66))
monadList.join[Int](xss).foreach ( println)


In the example above, the execution of the first foreach method will print '16, 121, 36' while the second foreach invocation generate the sequence '3,5,6,11,34,12,66'.
The same methodology is applied to immutable sequences by implementing the generic Monad trait.

import scala.collection.immutable.Seq

class MonadSeq[Y] extends Monad[Seq] { 
    override def map[X,Y](f: X=>Y): Seq[X] => Seq[Y] = 
        (_x: Seq[X]) => _x.map(f)
    override def unit[X](x: X): Seq[X] = Seq[X](x)
    override def join[X](__x: Seq[Seq[X]]): Seq[X] = __x.flatten
}

The implementation of the monad for immutable sequence is very similar to the monad for immutable lists: the map method relies on the Seq.map method and the join method flattens a 2-dimensional sequence into a single sequence


flatMap

The Scala standard libraries uses monads for collections, options and exceptions handling. The standard library uses a slightly different but equivalent methods to implement the three basic functionality of a monad.
  • apply instead of unit
  • flatMap uses the transformation f: T -> M[T] instead of the "flattening" join
trait _Monad[M[_]] {
   def map[T, U](m: M[T])(f: T =>U): M[U] = flatMap(m)((t: T) => apply(f(t)))
   
   def apply[T](t: T): M[T]
   
   def flatMap[T, U](m: M[T])(f: T =>M[U]): M[U] 
}

Let's use the Monad template above, to create a monad for time series. A time series of type TS is defined as a sequence of indexed observations (Obs. An observation has an index (or sequence ordering) and a value of type T.
The monad can be defined as an implicit class.

case class Obs[T](val t: Int, val features: T)
case class TS[T](val data: List[Obs[T]])

implicit class TS2Monad[T](ts: TS[T]) { 
   def apply(t: T): TS[T] = TS[T](List[Obs[T]](Obs[T](0, t)))
   
   def map[U](f: T => U): TS[U] = 
       TS[U](ts.data.map(obs => Obs[U](obs.t, f(obs.features))))
   
   def flatMap[U](f: T =>TS[U]): TS[U] = 
      TS[U]( (ts.data.map(obs => f(obs.features).data)).flatten)
}

The monad is ready for transforming time series by applying the implicit conversion of a time series of type TS to its monadic representation.

val obsList = List.tabulate(10)(new Obs(_, Random.nextDouble))
val ts = new TS[Double](obsList)
  
import _Monad._
val newTs = ts.map( _*2.0)


For-comprehension

Like many other functional languages, Scala embellish the syntax (sugar coated) . The Scala language combines join and unit methods to produce the Monad external shape method map and flatMap method as defined as
def map(f: A => B): M[B] 
def flatMap(f: A => M[B]): M[B]

  • map applies a natural transformation of the content structure
  • flatMap composes the Monad with an operation f to generate another Monad instance of the same type.
The syntax to implement the following sequence of operations of concatenation of 3 arrays can be expressed using the methods map -> flatMap associated with the Scala collections (List, Array, Map...) 

val sum2 = array1 flatMap { x => 
    array2 flatMap { y =>
       array3 map { z => x+y+z } 
   }  
}

or using the for-yield idiom, which is easier to write and understand.

val sum : Array[Int] = for { 
   x <- array1
   y <- array2
   z <- array3
} yield x+y+


References


Sunday, May 19, 2013

Effective Time Management

Target audience: Beginner
Estimated reading time: 3'



Table of contents

Principle

Each team member to contribute to the extend of his/her capability during the work week. 
The two key elements of an effective time management are:
  • Flexibility: Team members should be able to work multiple tasks, change roles and get around bottlenecks. 
  • Availability:  Team members should support other team members, constantly evaluate the efficient usage of their time and be accessible outside business hours if necessary. 

Effective meetings


There are some basic no "non-sense" rules to make sure that meetings are conducted efficiently.
  • Each meeting should have a clear theme and agenda, emailed to all attendees 48 hours prior the schedule day 
  • Meeting should be restricted to one hour and start with a recap of pending action items for previous sessions and listing the issues on the agenda. 
  • The team should spend no more than 10' on each issue. In case a consensus is not reached, someone  should be assigned to task to research, investigate and propose a solution for the next meeting. 
  • It is highly recommended to deal with critical or controversial issues at the beginning of the meeting. 
  • Minutes of the meeting, including action items, deliverable and milestones should be posted within 24 hours.  

Urgent vs. important tasks  

The most productive tasks are (in decreasing orders)
     1. Important & Non urgent
     2. Important &Urgent
     3. Non important & Urgent
     4. Non important & Non urgent

The following table lists some activities as classified by their importance and urgency.


.UrgentNot Urgent
Important Escalations
Hiring
Resolving bottlenecks
Assigning Defects
Solving Escalations
Project Planning
Training
Technical investigation   
Design
Documentation
Unimportant    Time-sensitive interruptions    
Responding emails
Texting
Phone Calls
Unscheduled meetings
Browsing web
Social visits

The role of project manager is to
- Protect engineers from urgency & interruptions
- Define/communicate the important tasks for the week



Time wasters   

Unscheduled interruptions are far more distracting than people. Beyond the actual time spent to address the interruption, it takes on average 15 minutes to get back to the activity in progress.
The following table lists some examples of interruptions or time wasters and possible solutions.

Wasters Solutions
Lack of clear prioritiesBetter project management
Unproductive meetingsMeetings with goal,agenda,minutes
Productivity bottlenecksFlexible organization in terms of skills,schedule
Unnecessary interruptions        Manager to control interruptions
Process inefficienciesProcess automation and productivity tools
Lack of accountabilityQuantitative metrics (K.P.I.) functionality,quality,schedule


References

Sunday, April 7, 2013

OAuth-2 for Social Media in Java

Target audience: Intermediate
Estimated reading time: 4'



Table of contents
Follow me on LinkedIn

Overview

This post describes some basic implementation of OAuth-2 for applications known as 3-legged OAuth-2 protocol. We use Facebook and Twitter API to illustrate the basic use case for OAuth-2.  We assume the reader is familiar with the basic of Authentication (Basic, OAuth-2) and knowledgeable in Java programming.

In a 3-legged OAuth-2 protocol, users authorize an application to access the provider of services or resources. The consumer application exercises the API of the service provider on the behalf of the user by using a consumer key and secret granted by the provider.

The protocol for the application to access the protected services and resources is
  1. Application developer requests privilege to access protected resources on the behalf of the user
  2. The provider grant access to the application by supplying a consumer key and secret
  3. The application developer retrieve the authentication token using the consumer key and secret
  4. The provider generates the authentication token
  5. The application retrieves the session token (key and secret) using the authentication token
  6. The session token is used as argument to any request to the provider API (i.e. REST GET, POST,..)


Facebook

As expected access to the Facebook API relies on the consumer key, API_KEY (line 4) and secret key API_SECRET (line 5) to generate the authentication token.
The URL for the Facebook end-point (line 2) is defined as a constant. A well-thought implementation would defines these constants in a configuration file as there are no guarantee that Facebook will not change the URL end-point in the future.

The purpose of the constructor (line 11 - 14) is to instantiate the JSON rest client with or without the session key whenever available.
Any security mechanism relies on:
  • Authentication: authenticate (line 19)
  • Authorization: authorize (line 20)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final class FacebookAuth {
  final static String OAUTH_URL =
           "http://www.facebook.com/login.php?api_key=";
  final static String API_KEY = "apikey";
  protected final static String API_SECRET = "apisecret";
  static String apiKeyValue = getKeyValue();
  static String apiSecretValue = getSecretValue();
  
  JsonRestClient _client = null;
 
  public FacebookAuth(final String sessionKey) {
    _client = (sessionKey == null) ?
    new JsonRestClient(apiKeyValue, apiSecretValue) :
    new JsonRestClient(apiKeyValue, apiSecretValue, sessionKey); 
  }
  
    // Authenticate the client as a desktop application client 
    // using the token generated from the consumer key and secret.
  public String authenticate() { ... }
  public String[] authorize(final String authToken) { .. }
}
  

The JSON REST authentication consists of creating an authentication token for the session and generate the authenticated end-point url to process the request.
The helper methods and validations of arguments of methods have been removed for the sake of clarity

public String authenticate() {
  String url = null;
  boolean isDesktop = _client.isDesktop();
   
  try {
    String token = _client.auth_createToken();
    url = new StringBuilder(OAUTH_URL)
         .append(_client.getApiKey())
         .append("&v=1.0&auth_token=")
         .append(token)
         .append("&req_perms=status_update, read_stream,publish_stream") 
         .toString();
  }
  catch( FacebookException e) { ... }
  return url;
}

The authorization step authorize uses the authentication token authToken to retrieve the key of the current session sessionKey. The secret key for the session sessionSecret is retrieved by the JSON client, through the invocation of the method getSecret.

public String[] authorize(final String authToken) {
  String[] results = null;
   
  try {
    String sessionKey = _client.auth_getSession(authToken,true );
    String sessionSecret = _client.getSecret();
    results = new String[] { sessionKey, sessionSecret };
  }
  catch( FacebookException e) { ...}
  return results;
}

Twitter

The Twitter OAuth-2 API is very similar to the Facebook Authentication API. The client application maintains the handle to the authentication services as defined by its consumer key and secret, which are stored by the application. As we cannot assume that there is only one request tokens supplied by the service provider, those tokens are cached as key value pair in requestTokenCache (line 5).
The constructor retrieves the consumer key consumerKey (line 10) and the consumer secrete consumerSecret (line 11).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final class TwitterAuth  {
  public static final String OAUTH_URL =
       "http://www.xyztest.com:3000/dashboard/signup_oauth?";
  private static Map<String, String> twitterOath = null;
  private static Map<String, RequestToken> requestTokenCache 
                          = new HashMap<String, RequestToken>();
  private Twitter _twitter = null;
 
  public CTwitterAuth() {
    String consumerKey = twitterOath.get("oauth.consumerKey");
    String consumerSecret = twitterOath.get("oauth.consumerSecret");
           
    _twitter = new TwitterFactory()
         .getOAuthAuthorizedInstance(consumerKey, consumerSecret);
  }

  public String authenticate(final String user) 
  public String[] authorize(
                   final String user, 
                   final String auth_token
  ) 
}

The implementation of the authentication through the method authenticate extracts the requestToken. If it exits, the request token is used to retrieve the URL for the Twitter authorization end point twitterAuthURL.
Finally, the request token associated to this specific user is cached.

 public String authenticate(final String user) {
   String twitterAuthURL = null;
     
   try {
     RequestToken requestToken =
                 _twitter.getOAuthRequestToken(OAUTH_URL);

     if( requestToken != null) {
       twitterAuthURL = requestToken.getAuthorizationURL();
       requestTokenCache.put(user, requestToken);
     }
  }
  catch( TwitterException e) {  .... }
  return twitterAuthURL;
}

The authorization accesses the request token associated to this user and retrieve the access token accessToken. The access token is very similar to the session token available in the Facebook API and contains the token and its secret, and wrapped into the array tokenStr

public String[] authorize(
   final String user, 
   final String auth_token) {
  
  String[] tokenStr = null;
     
  try {
    RequestToken requestToken  = requestTokenCache.get(user);
    if( requestToken != null) {
       AccessToken accessToken =
         _twitter.getOAuthAccessToken(requestToken);
          
       tokenStr = new String[]  { 
          accessToken.getToken(),                             
          accessToken.getTokenSecret()
       };
     
       if( requestTokenCache.remove(user) == null ) {
        logger.error("can't remove user from auth. cache");
       }
    }
  }
  catch(TwitterException e) {  ... }
  return tokenStr;
}


Note:
Although the OAUTH-2 standard is well-defined, the social network providers update their API regularly. Versioning of REST API (resource, actions..) remains challenging so there is no guarantee that new API introduced by the provider is backward compatible with the existing client code. Therefore there is no guarantee that the authentication code for Facebook and Twitter REST API is still valid at the time you read this post. However, the concept and generic implementation of the OAUTH-2 protocol, described in the post should be easily portable to any new API or other social network provider.

This implementation lists the authentication URL and REST end-point available at the time of this post. These parameters are very likely subjected to changes by Facebook and Twitter.

References