Monday, March 11, 2019

Genetic Algorithm I: Foundation

Target audience: Advanced
Estimated reading time: 6'

Drawing inspiration from Charles Darwin's theory of natural evolution, a genetic algorithm is a search heuristic that mirrors the principles of natural selection. Here, the most suitable individuals are chosen for reproduction to generate the subsequent generation.
In this piece, we will explore the foundational concepts of Genetic Algorithms and delve into their implementation using Scala.


Table of contents
References
Follow me on LinkedIn
 

Introduction

Scala is a type checked, object oriented and functional programming language built on top of Java Virtual Machine. We can leverage Scala's lambdas, partial functions and closures to implement the computational workflow of evolutionary  algorithm.
The second part of the implementation of genetic algorithm in Scala introduces the concept and implementation of genetic operations (cross-over, mutation and selection) on a population of chromosomes. Genetic Algorithms II: Operators
The 3rd and final post on genetic algorithms, explores the application of genetic algorithm as a solver or optimizer Genetic Algorithms III: Solver


Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted

Theory

Genetic Algorithms have been invented by John Holland in the 1970s and derived their properties from the theory of evolution of Darwin. A living organism consists of cells which are composed of identical chromosomes. Chromosomes are strings of DNA and serves as a model for the whole organism. A chromosome consist of genes that are blocks of DNA and encodes a specific protein.
Recombination (or crossover) is the first stage of reproduction. Genes from parents generate the whole new chromosome (offspring) that can be mutated. During mutation, one or more elements, also known are individuals of the DNA strand or chromosome is changed. This changes are mainly caused by errors in copying genes from parents. The success of the organism in its life measures its fitness.
In computer science, Genetic Algorithms are a way of solving problems by mimicking nature. They use the same combination of selection, recombination and mutation to evolve a set of candidates for resolving a given problem. The basic computational sequence is
  1. Select the initial population (search space) as a set of chromosones which represent candidates for solving a problem.
  2. Encode the chromosones into a vector of real values (continuous process)  X(x1,..,xn) or string of bits (00101110101111010111).
  3. Evaluate or rank the chromosones using a fitness function. 
  4. Select the fittest chromosones (which meet the minimum fitness criteria) for reproduction.
  5. Pair chromosones for cross-over. The process consists of applying a cross-over rate to interchange fragment or section of the "parent" chromosones from a random point in the encoded string or vector. 
  6. Mutate chromosomes by flipping one or more of their elements(bits) using a randomly generated index bounded by a mutation rate.
  7. Iterate the reproduction cycle (steps 2 to 6) for the current population.
Each of the genetic operators (selection, cross-over and mutation) relies on a parameter:
  • Selection rate is the random  threshold value to reduce the current population of chromosones according to their fitness
  • Crossover rate is used to compute the index beyond with the elements or bits of two parents chromosones are exchange.
    The following parents 
    10001001110010010 1010001001000011 will generate the following offsprings 10001001110010011 and 1010001001000010
  • Mutation Rate is used to compute the index of the element(s) or bit(s) in a chromosome that is/are mutated (or flipped).10001001110010010 becomes 10001001110000010

Chromosomes

The first step is to implement the key components of a genetic algorithm The implementation has to be generic enough to support any kind of fitness functions and encoding scheme.
A chromosomes is composed of one or several genes, each representing a predicate, axiom, fact or a rule/policy.
A Chromosone class wraps a list of parameterized genes, code (genetic code) (line 1). The most important methods related to chromosomes are
  • +- for the cross-over between two chromosomes (line 6)
  • ^ for the mutation of each chromosome (line 11)
  • /= for the normalization of the fitness value of each chromosome during the selection process (line 14)
This implementation uses the unfitness value of the chromosome for ranking, instead the usual fitness value. It is defined as 1- normalized_fitness (line 3).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
final class Chromosome[T <: Gene](val code: List[T]) {  

   var unfitness: Double = 1000*(1.0 + Random.nextDouble)

    // cross-over
   def +- (
       that: Chromosome[T], 
       gIdx: GeneticIndices): (Chromosome[T], Chromosome[T])= {}

   // mutation
   def ^ (gIdx: GeneticIndices): Chromosome[T] = {}

   // normalization of fitness
   def /= (normalizeFactor: Double): Unit = {}
  ...
}

The class GeneticIndices converts the cross-over and mutation probability (or rate) into indices (or position) on the bits of the chromosome or gene the cross-over and mutation is applied. The parameter chOpIdx defines the index along the bit string of a chromosome and geneOpIdx the index along the bit string of a gene

 
case class GeneticIndices(chOpIdx: Int, geneOpIdx: Int) 
 

The generic indices are generated from the cross-over or mutation rate (probability) by the method geneticIndices (line 1). The index of the bit for genetic operations on the chromosome is defined as chIdx (ratio over the length of the chromosome) (lines 5 - 8). Likewise, the index or position of the bit genetic operations on the genes is defined as gIdx (line 11)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def geneticIndices(prob: Double): GeneticIndices = {
     var idx = (prob*chromosomeSize).floor.toInt

     val chIdx = 
        if(idx == chromosomeSize) 
           chromosomeSize-1 
        else 
           idx

     idx = (prob*geneSize).floor.toInt 
     val gIdx = if(idx == geneSize) geneSize-1 else idx

     GeneticIndices(chIdx, gIdx) 
}


Genes

The class Gene wraps the genetic code associated to a predicate or a rule and takes four parameters:
  • id: This is the identifier of the gene. It is usually the name of the variable represented by the gene (line 2).
  • target: This is the target value or threshold to be converted or discretized into a bit string (line 3).
  • op: This is the operator that is applied to the target value (line 4).
  • discr: This is the discretization (or quantization) scheme that converts a numerical (real) value to an integer which is then converted into bits. It also supports the conversion of the bits string back to a numerical value.
The genes as subjected to the same genetic operations as the chromosomes: cross-over (line 8) and mutation (line 9). The conversion from bits string back to numerical value is implemented by the method decode (line 11).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class Gene(
    val id: String, 
    target: Double, 
    op: Operator)(implicit discr: Discretization) {

   val bits: Bitset

   def +- (that: Gene, gIdx: GeneticIndices): Gene = {}
   def ^ (gIdx: GeneticIndices): Gene = ^ (gIdx.geneOpIdx)
   def ^ (idx: Int): Gene = {}
   def decode(bits: BitSet): (Double, Operator)  { }
}


Quantization

The class Discretization has two arguments
  • A function, toInt, converts a real value to an integer (line 2)
  • A reverse function toDouble converts the integer back to a real value (line 3)
The main benefit to encapsulate the two conversions into a single class is to reduce the risk of inconsistency and inaccuracy between these two conversions.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class Discretization(
   toInt: Double => Int, 
   toDouble: Int => Double) {
 
   def this(R: Int) = this(
      (x: Double) => (x*R).floor.toInt, (n: Int) => n/R
   )
}

def decode(bits: BitSet): (Double, Operator) = 
      (discr.toDouble(convert(geneBits.rValue, bits)), 
       op(convert(geneBits.rOp, bits))
)

The instantiation of Gene (lines 5-6) converts a predicate into a bit string of type java.util.BitSet. The bit string is decoded by the decode method of the Gene companion object (lines 10 - 12).


Genetic population

The Population class takes two parameters:
  • limit: This is the maximum size of the population (line 4)
  • chromosomes: This is the pool of chromosomes defining the current population (line 5).
A reproduction cycle executes the following sequence of three genetic operators on a population:
select for the selection of the fittest chromosomes of the population (line 8)
+- for the pair-wise crossover of all the chromosomes (line 1)
^ for the mutation of each chromosome (line 12).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
type Pool[T <: Gene] = ArrayBuffer[Chromosome[T]]

class Population[T <: Gene](
    limit: Int, 
    val chromosomes: Pool[T]) {

  // selection operator
  def select(score: Chromosome[T] => Unit, cutOff: Double)
  // cross-over operator
  def +- (xOver: Double)
  // mutation operator
  def ^ (mu: Double)
  // ...
}

This completes the first part of the implementation of genetic algorithms in Scala. The next post dives into the details in the implementation of the genetic operators.

Thank you for reading this article. For more information ...

References

Saturday, January 5, 2019

Versioning Scala Classes

Target audience: Beginner
Estimated reading time: 4'

Versioning stands as a prevalent hurdle in the commercial software development landscape. In Java or Scala, the class loader is frequently leveraged to accommodate multiple iterations of a library, executable, or framework.
In this article, we introduce a straightforward versioning approach employing a class loader and a jar file pinpointed through its URL.


Follow me on LinkedIn

Note: The code associated with this article is written using Scala 2.11.8


Simple implementation

A library can be easily versioned by creating multiple jar files, one for each version. Let's select a simple case for each a library is self-contained in a jar file with two versions
  • ArrayIndex1.jar
  • ArrayIndex2.jar
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.net.{URL, URLClassLoader}
import java.io.File
import java.lang.reflect.Method

val cl1 = new URLClassLoader( 
    Array[URL](new File("ArrayIndex1.jar").toURL),   
    Thread.currentThread().getContextClassLoader()
)
 
val cl2 = new URLClassLoader( 
    Array[URL](new File("ArrayIndex2.jar").toURL),  
    Thread.currentThread().getContextClassLoader()
)
  
val compatibleClass: Class[_] = 
    if( version > 0.9) 
       cl1.loadClass("ArrayIndex") 
    else 
       cl2.loadClass("ArrayIndex")

val obj = compatibleClass.newInstance
val methods = compatibleClass.getMethods
println(s"${methods.map( _.getName).mkString(",")}")

The first step consists of loading the two versions of the library through their URL by converting each jar file names to a URL. This feat is accomplished by instantiating the class loaders: In this particular case, the class loader is undefined by its URL with the type URLClassLoader (line 5 & 9). The jar files are loaded within the current thread (line 7 & 11): A more efficient approach would consists of creating a future to load the classes asynchronously.
The class to be used in this case depends on the variable version (line 15). Once the appropriate class is loaded, its instance and method are ready available to the client be invoked.

Thank you for reading this article. For more information ...

References

---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Wednesday, November 21, 2018

Integrate 3rd Party Services to Spark

Target audience: Beginner
Estimated reading time: 10'

Apache Spark stands as a go-to framework for dispersing computational tasks on a grand scale. At times, these tasks necessitate engagement with external or third-party services, spanning domains like natural language processing, image classification, reporting, or data refinement.

In this article, we delve into the intricacies of integrating third-party microservices hosted on AWS into the Apache Spark framework.


Table of contents
Follow me on LinkedIn

Introduction

Apache Spark is a commonly used framework to distribute large scale computational tasks. Some of these tasks may involve accessing external or 3rd party remote services such as natural language processing, images classification, reporting or data cleansing. 
These remote services or micro-services are commonly accessed through a REST API and are deployable as clusters of nodes (or instances) to improve scalability and high availability. 
Load balancing solutions are known to address scalability challenges since the dawn of the internet.

Is there an alternative to load balancers for scaling remote web services?
We compare two approaches to integrate Spark workers to the 3rd party service nodes: 
  • Deploy a load balancer between Spark executors and remote service
  • Apply hash partitioning for which the IP of each  service instance is assigned to a given Spark partition.
Note: This post assumes the reader is somewhat familiar with load balancers and a rudimentary knowledge of the Apache Spark framework.

Load balancers

Load balancers are commonly used to route requests to web or micro-services according to a predefined policy such as CPU load, average processing time or downtime.  They originally gain acceptance late 90's with the explosion of internet and web sites.
A load balancer is a very simple and easy solution to deploy micro services at scale: It is self contained and does not involve any significant architecture or coding changes to the underlying application (business/logic or data layers).

In a typical Apache Spark deployment, the Spark context splits data sets into partitions. Each partition pre-processes data to generate the request to the service then initiate the connection to the service cluster through the load balancer
Deployment using Apache Spark with load balanced services


The data processing steps are
  1. Master split the input data over a given set of partitions
  2. Workers nodes pre-process and cleanse data if necessary
  3. Request are dynamically generated
  4. Each partition establish and manage the connection to the load balancer
  5. Finally workers node processed the response and payload
Load balancers provides an array of features such as throttling, persistent session, or stateful packet inspection that may not be needed in a Spark environment. Moreover the load balancing scheme is at odds with the core concept of big data: data partitioning. 

Let's consider an alternative approach: assigning (mapping) one or two nodes in the  cluster nodes to each partition.


Partitioning service nodes

The first step is to select a scheme to assign a given set of service node, using IP, to a partition. Spark supports several mechanisms to distribution functions across partitions
  • Range partitioning
  • Hash partitioning
  • Custom partitioning
In this study we use a simple partitioning algorithm that consists of hashing the set of IP addresses for the service nodes, as illustrated in the following block diagram.
Deployment using Apache Spark and hashed IP partitioning


The data pipeline is somewhat similar to the load balancer
  1. Master split the input data over a given set of partitions
  2. IP addresses of all service notes are hashed and assign to each partition
  3. Workers nodes pre-process and cleanse data if necessary
  4. Requests to the service are dynamically generated
  5. Each partition establish and manage the connection to a subset of service nodes
  6. Finally worker nodes processed the response and payload
The implementation of the hashing algorithm in each partition is quite simple. A hash code is extracted from the input element (line 2, 3), as a seed to the random selection of the service node (line 5, 6). The last step consists of building the request, establish the connection to the target service node and process the response (line 9, 11).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 def hashedEndPoints(input: Input, timeout: Int, ips: Seq[String]): Output = {
     val hashedCode = input.hashCode + currentTimeMillis
     val seed = (if (hashedCode < 0) -hashedCode 
                       else hashedCode)
     val random = new scala.util.Random(seed)
     val serverHash = random.nextInt(serverAddresses.length)
     val targetIp = serverAddresses(serverHash)

     val url = s"http://${targetIp}:80/endpoint"
     val httpConnector = HttpConnector(url, timeout)
        // Execute request and return a response of type Output
   }

The function, hashedEndPoint, executed within each partition, in invoked from the master

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def process(
   notes: Dataset[Input],
   timeout: Int,
   serverAddresses: Seq[String]
)(implicit sparkSession: SparkSession): Dataset[Output] = {
 
   inputs.map(
      input => 
          if (serverAddresses.isEmpty) 
              throw new iIlegalStateException("error ...")
hashedEndPoints(input, timeout, serviceAddresses ) }

Beside ensuring consistency and avoiding adding an external component (load balancer) to the architecture, the direct assignment of service nodes to the Spark partitions has some performance benefits.


Performance evaluation

Let's evaluate the latency for processing a corpus of text though an NLP algorithm deployed over a variable number of worker notes. The following chart plots the average latency for a given NLP engine to process documents, with a variable number of nodes.


Hash partitioning does improve performance significantly in the case of a small number of nodes. However, it out-performs the traditional load balancing deployment by as much as 20% for larger number of nodes (or instances).

References


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3