Thursday, February 1, 2024

Modular Markov Chain Monte Carlo in Python

Target audience: Intermediate
Estimated reading time: 5'

Sampling sits at the core of data science and analysis. This article explores a category of numerical sampling techniques, known as Markov Chain Monte Carlo, and how they can be implemented via reusable design patterns, using the Metropolis-Hastings model as an example.
Table of contents

       Theory
What you will learn: How to build modular Python Classes for MCMC with Bridge Pattern: A Metropolis-Hastings Implementation.

Notes:
  • Environments: Python 3.10.10, Numpy 1.25.1, Matplotlib 3.7.2
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.
  • Source code available at Github.com Data_Exploration/Stats

Markov Chain Monte Carlo 

Markov Chain Monte Carlo (MCMC) is a class of algorithms used in computational statistics to sample from probability distributions based on constructing a Markov chain that has the desired distribution as its equilibrium distribution. The core idea is to use a Markov chain to model the complex probability distribution you're interested in, where the chain's states represent the possible outcomes in the distribution. Over time, as the chain progresses, the states it visits are used to generate samples that are representative of the desired distribution.

MCMC methods are particularly useful in situations where direct sampling is difficult due to the complexity of the distribution or high-dimensional spaces. They are widely used in Bayesian statistics, where one needs to compute posterior distributions that are not analytically tractable [ref 1].


The result of a Markov Chain Monte Carlo (MCMC) procedure is a joint posterior probability distribution, or the combined probability of parameters theta, given certain data and a model/hypothesis, denoted as p(theta | Data, Model). 
The generic Naive Bayes rule is defined as \[posterior = \frac{likelihood\ .\ prior}{evidence}\]
In the case of MCMC, the log of the posterior is computed as
\[log(p(\theta |\mathfrak{D},\mathfrak{M}))=log(p(\mathfrak{D} |\theta,\mathfrak{M})) + log(p(\theta | \mathfrak{M})) - log(p(\mathfrak{D} |\mathfrak{M}))\]
\[for\ model\ \mathfrak{M},\ a\ sampled\ value\ \theta\ and\ data \ \mathfrak{D}\]
Essentially, MCMC can be understood as a method for estimating a credible or plausible range of model parameters that align with the observed data. The process of fitting a probability distribution emerges as a secondary outcome.

Metropolis-Hastings algorithm

The Metropolis-Hastings (MH) algorithm is one of the most famous MCMC techniques. It allows for sampling from a distribution by generating a sequence of sample points in a way that, even if you can't directly sample from the distribution itself, you can still obtain a set of samples that approximates the distribution after a sufficient number of iterations [ref 2]. Other popular MCMC methods include the Gibbs sampler and Hamiltonian Monte Carlo.

Theory

For this study, we consider a single variable probability distribution.

Generic algorithm:
The procedure at step i is as follows:
1 Sample the proposal distribution q \[(1)\ \  x^{*} \leftarrow q(x_{i}|x_{i-1})\] Compute the probability u of accepting the sample \[(2)\ \  u(\theta^{*} | \theta_{i})=min\left ( 1, \frac{\widetilde{p}(\theta^{*} ).q\left ( \theta_{i} | \theta^{*}\right )}{\widetilde{p}(\theta_{i} ).q\left ( \theta^{*} | \theta_{i}\right )}\right )\]
3 Select the next value \[(3)\ \  IF\ [ urand_{[0, 1]} < u\left (\theta ^{*} | \theta_{i} \right )]\ \theta_{i+1} = \theta^{*}\ \ \ ELSE\ \theta_{i+1} = \theta_{i}\]

Random walk:
This is the case where the proposal distribution is symmetric. Therefore the probability of accepting the sample is simplified as \[u(\theta^{*} | \theta_{i})=min\left ( 1, \frac{\widetilde{p}(\theta^{*} )}{\widetilde{p}(\theta_{i} )}\right )\]

Reusable design pattern

Reusable software design patterns are standardized solutions to common problems in software design and architecture. They represent best practices evolved through collective experience, providing a template or blueprint for tackling recurring design challenges. Design patterns help in making the software development process more efficient and the software itself more maintainable and scalable by offering proven solutions to typical issues [ref 3].
  • Creational Patterns deal with object creation mechanism.
  • Structural Patterns concern class and object composition to ensure a flexible architecture.
  • Behavioral Patterns are all about class's objects interactions. 

The bridge pattern is a structural pattern. It aims to separate an abstraction from its implementation, making them independent of each other. In this context, the structure of the proposal distribution is kept separate from the structure of the Markov Chain Monte Carlo, allowing any algorithm to choose any distribution freely.

Bridge pattern for Markov Chain Monte Carlo algorithms

The two abstract classes, ProposalDistribution and MCMC define the signature of method to be overridden by each specialized class.

Proposal distribution implementation

First let's define the signature of the proposal distribution

class ProposalDistribution(object):
    from abc import abstractmethod

    @abstractmethod
    def log_prior(self, theta: float) -> float:
        pass

    @abstractmethod
    def log_likelihood(self, theta: float) -> float:
        pass

    @abstractmethod
    def step(self, theta: float, sigma: float) -> float:
        pass

    def log_posterior(self, theta: float, prior: float) -> float:
        pass


Beta distribution:
The beta distribution is most appropriate for discrete distributions
The log likelihood for the Beta distribution is \[log(\mathfrak{L})=log\left ( \binom{N}{x} h^{\theta} h^{N-\theta} \right )\]
class ProposalBeta(ProposalDistribution):
    pi_2_inv = np.sqrt(2 * np.pi)

    def __init__(self, alpha: int, beta: int, num_trials: int, h: int):
        self.alpha = alpha
        self.beta = beta
        self.num_trials = num_trials
        self.h = h

    def log_prior(self, theta: float) -> float:
        x = stats.beta(self.alpha, self.beta).pdf(theta)
        return x if x > 0.0 else 1e-5

    def log_likelihood(self, theta: float) -> float:
        return math.log(stats.binom(self.num_trials, theta).pmf(self.h))

    def step(self, theta: float, sigma: float) -> float:
        return theta + stats.norm(0.0, sigma).rvs()

    def log_posterior(self, theta: float, prior: float) -> float:
        return self.log_likelihood(theta) + np.log(prior)


Normal distribution:
The log likelihood for the normal distribution is \[log(\mathfrak{L})=-\frac{N}{2}log(2\pi)-\frac{1}{2}\sum_{i=1}^{N}\left [ 2.log(\sigma_i) -{\frac{\theta_i^2}{\sigma_i^2}} \right ]\]

Metropolis-Hastings implementation

First, let's outline the signature or interface common, MCMC, to all Markov Chain Monte Carlo algorithms. The method 'sample' accepts the most recently sampled value, and produces a new sample.
class MCMC(object):

    @abstractmethod
    def sample(self, theta: float) -> float:
        pass

The MetropolisHastings class encapsulates the implementation of the Metropolis-Hastings (MH) algorithm. Its constructor sets up the algorithm's parameters:
  • proposal: The proposal distribution.
  • num_iterations: The total number of iterations or samples to be gathered.
  • burn_in_ratio: The proportion of samples allocated to the burn-in phase.
  • sigma_delta: Variance of Normal distribution used as step size for computing next sample.
Note:  It's important to note that the initial or seed value for the samples may not be precise. In fact, it's anticipated that the early samples generated before the algorithm stabilizes are unreliable and ought to be disregarded [ref 4]. These discarded samples are commonly known as the burn-in phase.

class MetropolisHastings(MCMC):

    def __init__(self,
                 proposal: ProposalDistribution,
                 num_iterations: int,
                 burn_in_ratio: float,
                 sigma_delta: float = 0.2):
       
        self.proposed_distribution = proposed_distribution
        self.num_iterations = num_iterations
        self.sigma_delta = sigma_delta
        self.burn_ins = int(num_iterations*burn_in_ratio)

    def sample(self, theta_0: float) -> (np.array, float):

The sampling method, sample, initiates the process with an initial, approximate sample. It outputs a tuple consisting of the samples gathered post-burn-in phase and the acceptance rate.
def sample(self, theta_0: float) -> (np.array, float):
     num_valid_thetas = self.num_iterations - self.burn_ins.  #1
     theta_walk = np.zeros(num_valid_thetas)
     accepted_count = 0
     theta = theta_0
     theta_walk[0] = theta       #2

     j = 0
     for i in range(self.num_iterations):
        theta_star = self.proposal_dist.step(theta, self.sigma_delta) #3 

        try:
           cur_prior = self.proposal_dist.log_prior(theta)  #4
           new_prior = self.proposal_dist.log_prior(theta_star)

           if cur_prior > 0.0 and new_prior > 0.0:
             cur_log_posterior = self.proposal_dist.posterior(theta, cur_prior)  #5
             new_log_posterior = self.proposal_dist.posterior(theta_star, new_prior)

             # 6 Apply the acceptance/rejection criteria
             if MetropolisHastings.__acceptance_rule(cur_log_posterior, new_log_posterior):
               theta = theta_star
               if i > self.burn_ins:   #7
                     accepted_count += 1
                     theta_walk[j + 1] = theta_star
                     j += 1
            else:
               if i > self.burn_ins:
                  theta_walk[j + 1] = theta_walk[j]
                  j += 1

         except ArithmeticError as e:
                print(f'Error {e}')
        return theta_walk, float(accepted_count) / num_valid_thetas


Theta values to be recorded must omit those sampled during the initial burn-in phase (1). The first sample essentially serves as the starting point for Theta (2). At every step, the process involves:
  1. Calculating the subsequent Theta value with the stepping function tied to the proposal distribution (3).
  2. Determining the logarithm of the prior for both the last and the proposed, current sample (4).
  3. Calculating the posterior by applying the logarithm to the Naive Bayes formula (5).
  4. The new posterior probability undergoes the acceptance criteria test (6).
  5. Provided the new sample isn't part of the burn-in phase, it is then recorded (7).
The acceptance probability, denoted as u, is determined by calculating the logarithms of the prior, likelihood, and posterior probability. Consequently, the acceptance test outlined in equation (3) necessitates transforming the difference (proposed theta minus previous theta) into an exponential form.
@staticmethod
def __acceptance_rule(current: float, new: float) -> bool:
    residual = new - current
    return True if new > current else np.random.uniform(0, 1) < np.exp(residual)


Evaluation

The goal is to assess how different elements influence both the acceptance rate and the results produced by the Metropolis-Hastings (MH) algorithm. The elements under consideration include:
  • Num iterations: The total number of iterations performed in the Monte Carlo simulation.
  • Proposal: Utilizes the Beta distribution with shapes 4 and 6 as the proposal distribution. The evaluation focuses solely on the success rate of the Binomial distribution applied in calculating the likelihood.
  • Initial Theta: The initially sampled value for the theta variable.
  • Theta step: The increment size used when sampling the subsequent value of theta.

Num iterationsProposalInitial ThetaTheta stepAcceptance rate
50004,6,0.50.50.10           0.480 
50004,6,0.50.50.95           0.061 
200004,6,0.50.50.95           0.077 
10004,6,0.50.50.95           0.052 
120004,6,0.50.10.50           0.121 
120004,6,0.50.90.50           0.134 
120004,6,0.50.50.50           0.125 
120004,6,0.90.50.50           0.080 
120004,6,0.10.50.50           0.079 


Impact of proposal distribution
In this first test we evaluate the impact of the proposal distribution on the prediction of the output theta. We consider 5000 iterations with no burn-in, initial value of 0.8 and compare two proposal distributions
  • Beta(4, 6) with success rate 0.88
  • Beta(4, 6) with success rate 0.12
Output MH with Beta(4,6) proposal w success rate 0.88, no burn-in, initial value 0.8
Output MH with Beta(4,6) proposal w success rate 0.12, no burn-in, initial value 0.8

Impact of step size
This test evaluates the impact of the step size for computing the next sample as: \[\theta^{*} = \theta_{i-1}+ N(0, \triangle \sigma)\]
Output MH with Beta(4,6) proposal 20% burn-in phase, initial value 0.8, delta sigma 0.05
Output MH with Beta(4,6) proposal 20% burn-in phase, initial value 0.8, delta sigma 0.95


Impact of initial value
This second test evaluates the impact of the initial value on the output of the MH algorithm. We use a 10% burn-in ratio with 12,000 iteration and a Beta(4, 2) proposal distribution with a 16% success rate.
Output MH with Beta(4,2) proposal w success rate 0.16, 10% burn-in, initial value 0.1
Output MH with Beta(4,2) proposal with success rate 0.16, 10% burn-in, initial value 0.1


Limitations

The primary challenges associated with the Metropolis-Hastings algorithm are computational in nature. When proposals are generated randomly, it often requires a significant number of iterations to reach regions of higher (or more probable) posterior densities. Moreover, even the more efficient versions of the Metropolis-Hastings algorithm may accept fewer than 25% of the proposed samples.

Finally, the Metropolis-Hastings algorithm dependents heavily on the selection of the proposal distribution. An inappropriate choice for the proposal distribution can result in suboptimal performance of the algorithm.


References

  • [1] Pattern Recognition and Machine Learning [sec 11.2] - C. Bishop - Springer 2006
  • [2Bayesian Stats - Markov Chain Monte Carlo
  • [3Design Patterns: Element of Reusable  Object-Oriented Software; 151-161 - E. Gamma, R. Helm, R. Johnson, J. Vlissides - Addison-Wesley 1995
  • [4] Machine Learning: A Probabilistic Perspective [sec 24.4.1] - K. Murphy - MIT Press 2012
  • Github.com Data_Exploration/Stats


Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3

Wednesday, January 3, 2024

Optimize Apache Spark with Genetic Algorithm

Target audience: Intermediate
Estimated reading time: 5'

Struggling with the complex task of configuring your Apache Spark application? 

Consider the innovative approach of a genetic algorithm, a search heuristic inspired by Charles Darwin's theory of evolution [ref 1]. This method applies the concepts of natural selection to efficiently determine the best configuration for your Apache Spark application. It aims to achieve an ideal balance between minimizing production costs and maximizing customer satisfaction.


Table of contents
      Encoding
      Gene
      Chromosome
      Mutation
      Crossover
      Scoring
      Selection
      Mating cycle
Follow me on LinkedIn

What you will learn: How to apply genetic algorithm to optimize the configuration parameters for deployment to production.

Notes:
  • Source code available at GitHub - Patrick Nicolas - Genetic Algorithm github.com/patnicolas/streaming/scala/org/pipeline/ga
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.
  • Environments: Apache Spark 3.3.1, Scala 2.13.4

Apache Spark configuration

The goal is to identify the best settings for Apache Spark [ref 2] configuration parameters that minimize deployment expenses while ensuring high-quality service, as indicated by reduced latency. This goal will be converted into a fitness function during the genetic algorithm's development.

Tunable parameters

We define an Apache Spark configuration parameter with the following attributes:
  • key: Spark parameter identifier
  • value: Default or initial value for the parameter
  • isDynamic: Is this parameter tunable?
  • paramType: Type of the parameter (Float, Int,)
  • range: Range of valid values (any values if undefined)
The subsequent portion of the Apache Spark configuration file demonstrates the characteristics of different configuration parameters. This includes parameters that are not tunable (non-dynamic).

{
"sparkParameters": [
  {
    "key": "spark.io.compression.snappy.blockSize",
    "value":"32k",
    "isDynamic":
false,
    "paramType":"String"
  },
  {
    "key": "spark.KryoSerializer.unsafe",
    "value":true,
    "isDynamic": true,
    "paramType":"Boolean"
  },
  {
    "key": "spark.KryoSerializer.buffer.max",
    "value":"64m",
    "isDynamic": true,
    "paramType":"Int",
    "range":["16","32", "64", "96", "128"]
  },
  {
    "key": "spark.KryoSerializer.buffer",
    "value":"64m",
    "isDynamic": true,
    "paramType":"Int",
    "range":["16","32", "64", "96"]
  },
  {
    "key": "spark.worker.cleanup.interval"
    "value":900,
    "isDynamic": false,
    "paramType":"Int",
    "range":["300","600", "900", "1200"]
  },

            ...

  {
    "key": "spark.shuffle.file.buffer",
    "value":"32k",
    "isDynamic": true,
    "paramType":"Int",
    "range":["16","32", "64", "128","256"]
  },
  {
    "key": "spark.driver.memory",
    "value":"6g",
    "isDynamic": true,
    "paramType":"Int",
    "range":["2","4", "6", "8","10"]
  },
  {
    "key": "spark.executor.driverOverheadFactor",
    "value":0.1,
    "isDynamic": true,
    "paramType":"Float",
    "range":["0.05","0.1", "0.15", "0.2","0.25"]
  },
  {
    "key": "spark.executor.memory",
    "value":"4g",
    "isDynamic": true,
    "paramType":"Int",
    "range":["2","3","4","5","6"]
  },
  {
    "key": "spark.executor.memoryOverheadFactor",
    "value":0.1,
    "isDynamic": true,
    "paramType":"Float",
    "range":["0.05","0.1","0.15","0.2","0.25"]
  },
  {
    "key": "spark.default.parallelism",
    "value":8,
    "isDynamic": true,
    "paramType":"Int",
    "range":["4","8", "16", "24","32"]
  },
             ...
  ]
}                
Example of a Spark configuration parameters

The parameter spark.executor.memory is a dynamic, tunable integer variable with 2, 3, 4, 5, & 6g valid values.
The parameter spark.io.compression.snappy.blockSize is not tunable so the value 32k is immutable.

Fitness function

The fitness function is a balance between service delivery quality and the hourly deployment cost. 
Our assumptions are as follows:
  • The deployment cost per hour rises linearly with the number of servers or containers used, denoted by N.
  • The adverse effect of latency on customer satisfaction and retention is estimated to grow exponentially with increasing latency L.
The fitness is consequently defined as... \[ fitness=\frac{1}{e^{\alpha L} + \beta N}\]

Genetic algorithm basics

Inspired by Charles Darwin's theory of natural evolution, a genetic algorithm is a search method that reflects the principles of natural selection. In this approach, the most suitable individuals are selected for reproduction to produce the next generation.
Scala's object-oriented and functional programming capabilities can be utilized to execute the computational process of an evolutionary algorithm.

Developed by John Holland in the 1970s [ref 2], Genetic Algorithms draw their characteristics from Darwin's evolution theory. A living organism is composed of cells, which contain identical chromosomes. These chromosomes, made up of DNA strings, act as a blueprint for the entire organism. A chromosome is made up of genes, which are DNA segments encoding specific proteins.
The first step in reproduction is recombination (or crossover), where genes from the parents form a completely new chromosome (offspring), which may then undergo mutation. During mutation, one or more elements, also known as individuals of the DNA strand or chromosome, are altered. These changes are primarily due to errors in copying genes from the parents. The organism's success in its environment determines its fitness.

In the field of computer science, Genetic Algorithms represent a problem-solving technique that emulates natural processes. They employ a combination of selection, recombination, and mutation to evolve a group of candidates for solving a particular problem. The fundamental computational steps are as follows:

  1. Initialize the population (search space) with a set of random chromosomes, each representing a specific Apache Spark configuration.
  2. Convert these chromosomes into a vector of real or integer values, or a string of bits.
  3. Pair chromosomes for crossover. This involves using a crossover rate to exchange a fragment or section of the "parent" chromosomes from a random point in the encoded string or vector.
  4. Mutate chromosomes by altering one or more of their elements (bits) using a randomly generated index, governed by a mutation rate.
  5. Evaluate each chromosome using a fitness function.
  6. Choose the most fit chromosomes (those that meet the minimum fitness criteria) for reproduction.
  7. Repeat the reproduction cycle (steps 2 to 6) for the newly formed population until a stop condition is reached.
Each genetic operator (selection, crossover, and mutation) depends on a specific parameter:
  • The selection rate is a random threshold value used to reduce the current chromosome population based on their fitness.
  • The crossover rate determines the index at which elements or bits of two parent chromosomes are swapped.
  • The mutation rate calculates the index of the element(s) or bit(s) in a chromosome that undergo mutation (or flipping).

Implementation

The initial phase involves converting the configuration parameters into genetic components. Every parameter is represented as a gene, corresponding to the type of that parameter. For example, a parameter with an integer attribute is linked with an instance of Gene[Int].

Illustration of association configuration parameters and genetic components


A configuration represents a distinctive, tunable collection of Spark parameters, which the genetic algorithm treats and manages as a chromosome. The goal of the genetic algorithm is to identify the most suitable chromosome and its corresponding Spark configuration for the application, thereby optimizing the objective (or fitness).

Encoding

The first step is to implement the configuration and parameters as respective classes, SparkConfiguration and ParameterDefinition.

case class SparkConfiguration(sparkParameters: Seq[ParameterDefinition])

case class ParameterDefinition(
  key: String,
  value: String,
  isDynamic: Boolean,
  paramType: String,
  range: Seq[String] = Seq.empty[String]
)

Next, we establish the genetic encoder responsible for transforming genes to and from configuration parameters [ref 4]. The GAEncoder trait encompasses two characteristics:
  • encodingLength: The number of bits required to represent a parameter's value.
  • range: A sequence of valid, adjustable values applicable to this parameter.
The sequence of bits, termed BitsRepr, is defined as a Seq[Int] consisting of either 0 or 1 values.

There are three primary methods:
  • rand: This method initializes a parameter value randomly.
  • apply: This function encodes a parameter's value into a sequence of bits.
  • unapply: This procedure decodes a bit sequence back into a parameter value.
traitGAEncoder[T]{
  val encodingLength: Int
  val range: Seq[T]

  def rand: T
  def apply(t: T): BitsRepr
  def unapply(bitsRepr: BitsRepr): T
}

Here's a genetic encoder for an Apache Spark configuration parameter of the Int type. The encoding function, 'apply', checks whether the parameter value, 't', falls within the valid range.

final class GAEncoderInt(
  override val encodingLength: Int,
  override val range: Seq[Int]) extends GAEncoder[Int]{

  private[this] val encoder = new BitsIntEncoder(encodingLength)

  override def rand: Int = Random.shuffle(range).head

  @throws(classOf[GAException])
  override def apply(t: Int): BitsRepr = {
     if(!range.contains(t))
       throw new GAException(s"Value $t violates constraint of quantizer")
     encoder(t)
  }

  override def unapply(bitsRepr: BitsRepr): Int = encoder.unapply(bitsRepr)
}

Gene

A Gene serves as the genetic representation of a configuration parameter. Consequently, its constructor requires the following:
  • The name of the parameter, referred to as 'id'.
  • The value of the parameter denoted as 't'.
  • An encoder, gaEncoder that corresponds to the parameter's type.
To minimize its memory usage and facilitate direct bit manipulation, the sequence of bits, 'bitsSequence', is transformed into a Java BitSet.

class Gene[T : Ordering] (id: String, t: T, gaEncoder: GAEncoder[T]) {
  // Encoding as a  sequence of {0, 1}
  private[this] val bitsSequence: BitsRepr = gaEncoder(t)

  // Encoding as Bit set
  private[this] val encoded: util.BitSet = {
    val bs =  new java.util.BitSet(gaEncoder.encodingLength)
    bitsSequence.indices.foreach(index => bs.set(index, bitsSequence(index) == 1))
    bs
  }


  def mutate(mutationProb: Double): Gene[T] = {
    (new MutationOp{
      override val mutationProbThreshold: Double = mutationProb
    }).mutate(this)
  }

The method mutate invokes the mutation operator, MutationOp, described in the next section.

Chromosome

A chromosome symbolizes a Spark configuration. Assuming the configuration parameters are of two types (namely Float and Int), the constructor accepts two parameters:
  • features1: This represents the features/genes of one type.
  • features2: This encompasses the features/genes of the other type.
Additionally, the attribute 'fitness' accumulates the score for the specified set of configuration parameters.

class Chromosome[T : Ordering, U : Ordering](
  features1: Seq[Gene[T]],
  features2: Seq[Gene[U]]){

  var fitness: Double = -1.0


  def xOver(
    otherChromosome: Chromosome[T, U],
    xOverThreshold: Double
  ): (Chromosome[T, U], Chromosome[T, U]) = 
     (new XOverOp{
        override val xOverProbThreshold: Double = xOverThreshold
     }).xOver(this, otherChromosome)


  
  def mutate(mutationProb: Double): Chromosome[T, U] = {
    (new MutationOp{
      override val mutationProbThreshold: Double = mutationProb
    }).xOver(this)
  }
}

The method 'xOver' executes the genetic crossover as outlined in the 'XOverOp' trait.
A crucial part of encoding a chromosome involves transforming a Spark configuration into a typed
Chromosome, featuring integer and floating point parameters/genes.

The process of encoding a Spark configuration is carried out by the '
encode' method. This involves purifying the parameter values from any units (denoted as 'cleansedParamValue'). The type of the configuration parameter, referred to as 'paramType', is utilized to create the encoder and gene of the suitable type.

def encode(sparkConfig: SparkConfiguration): Chromosome[Int, Float] = {
   val floatGenes = ListBuffer[Gene[Float]]()
   val intGenes = ListBuffer[Gene[Int]]()

   sparkConfig.sparkParameters.foreach(paramValue => {
      val value = paramValue.value
      val cleansedParamValue: String =
         if (!value.last.isDigit) value.substring(0, value.length - 1)
         else value

      paramValue.paramType match {
         case "Int" =>
            val encoder = new GAEncoderInt(encodingLength = 6, paramValue.range.map(_.toInt))
            val intGene = Gene[Int](paramValue.key, cleansedParamValue.toInt, encoder)
            intGenes.append(intGene)
            ....

The corresponding method decode is omitted in this post but is available on GitHub ConfigEncoderDecoder.scala

Mutation

The mutation process of a chromosome occurs in two stages:
  1. The genetic algorithm chooses a gene for mutation if the mutation probability falls below a specified threshold.
  2. A bit within the bit sequence is randomly selected and flipped based on a certain probability.
Illustration of mutation of a chromosome


It's important to note that mutating a bit sequence might not always result in a decoded value that is within the valid range set by the configuration parameter.

The 'MutationOp' trait encapsulates the implementation of the two-step mutation process. The first 'mutate' method applies the second mutation step to the chosen gene. Meanwhile, the second 'mutate' method carries out the first step of mutation on the chromosome.

trait MutationOp {
self =>
  protected[this] val mutationProbThreshold: Double
  private[this] val rand = new Random(42L)

   // Mutation of a gene - Step 2
  def mutate[T: Ordering](gene: Gene[T]): Gene[T] = 
     if(rand.nextDouble < mutationProbThreshold) {
         val newValue = createValidMutation(gene, implicitly[Ordering[T]])
         Gene[T](gene.getId, newValue, gene.getEncoder)
      }
      else
         gene

   
  def mutate[T: Ordering, U: Ordering](chromosomes: Seq[Chromosome[T,U]]): Seq[Chromosome[T,U]] = 
    chromosomes.map(mutate(_))      


  // Mutation of a chromosome - Step 1
  def mutate[T : Ordering, U: Ordering](chromosome: Chromosome[T, U]): Chromosome[TU]


The mutation operation initiates when a randomly generated value between [0, 1] remains below a specified low mutation rate, termed 'mutationProbThreshold'.

Afterward, the mutated bit sequence is decoded into a value of type T. This decoded value must fall within the range of valid values predetermined for this configuration parameter, a process known as 'createValidMutation'.

Regarding the mutation operator for a chromosome, it randomly selects one of the genes for mutation and then activates the mutation process with the chosen gene.

def mutate[T : Ordering, U: Ordering](chromosome: Chromosome[T, U]): Chromosome[T, U] =
  
  if(rand.nextDouble < mutationProbThreshold) {
    val features1 = chromosome.getFeatures1
    val features2 = chromosome.getFeatures2
    val chromosomeLength: Int = features1.length + features2.length

      // Select the index of the gene to be mutated, randomly
    val geneIndex = (chromosomeLength*Random.nextDouble).toInt

      // If the index of the gene to mutate is within the first set of features or
      // if there is only one set of features of same type..
    if(geneIndex < features1.length || features2.isEmpty) {
      val geneToMutate = features1(geneIndex)
      val mutatedGene: Gene[T] = apply(geneToMutate)

      features1.updated(geneIndex, mutatedGene)
    }

    // Otherwise the index of the gene to mutate is within the second set of features
else { val relativeIndex = geneIndex - features1.length val geneToMutate: Gene[U] = features2(relativeIndex) val mutatedGene: Gene[U] = apply(geneToMutate)

      features2.updated(relativeIndex, mutatedGene)
    }
    Chromosome[T, U](features1, features2)
 }
 else
   chromosome

Note: For the decoding of a bit sequence into an actual value, it's necessary to establish an implicit ordering for the types T and U associated with the parameters/genes.

Crossover

The crossover process involves dividing two 'parent' chromosomes and then merging their upper and lower segments to create two new 'offspring' chromosomes.

Illustration of cross-over 

The crossover operator is exclusively applied to chromosomes.
Exploring the implementation of crossover between two chromosomes, we find the method 'xOver', which takes the two original chromosomes as its arguments.

def xover[T : Ordering, U : Ordering](
  chromosome1: Chromosome[T, U],
  chromosome2: Chromosome[T, U]
): (Chromosome[T, U], Chromosome[T, U]) = {

  if(rand.nextDouble < xOverProbThreshold) {
    val xOverIndex = (chromosome1.size()*Random.nextDouble).toInt
    val features1Len = chromosome1.getFeatures1.length

      // The cross-over cut-off is done within the first set of genes, preserving
      // the second set of genes ..
    if(xOverIndex < features1Len)
      xOverFirstFeatures(chromosome1, chromosome2)
        // Otherwise the cross-over is performed within the second set of genes
    else
       xOverSecondFeatures(chromosome1, chromosome2)
  }
  else
    (chromosome1, chromosome2)
}


Several approaches exist for choosing candidate chromosomes for crossover [ref 5]. Three frequently used strategies include:
  • midPoint: This method involves dividing the population of ranked chromosomes into two groups and then combining these groups.
  • pairing: This strategy selects chromosome pairs that are contiguous in terms of their ranking.
  • random: This approach randomly selects two candidate chromosomes.


The crossover strategy, xOverStrategy is applied to a population of chromosomes ranked in decreasing value of their fitness as illustrated in the code snipped below.

def xover[T : Ordering, U : Ordering](
chromosomes: Seq[Chromosome[T, U]],
xOverStrategy: String
): Seq[Chromosome[T, U]] = xOverStrategy match {
 
case "midPoint" =>
val midPoint = chromosomes.length >> 1
val (topChromosomes, botChromosomes) = chromosomes.splitAt(midPoint)
val (offSprings1, offSpring2) = (0 until midPoint).map(
index => xover(topChromosomes(index), botChromosomes(index))
).unzip
offSprings1 ++ offSpring2
case "pairing" =>
val (offSprings1, offSpring2) = (chromosomes.indices by 2).map(
index => xover(chromosomes(index), chromosomes(index+1)) 
).unzip
offSprings1 ++ offSpring2
.....
}

Scoring

The 'ScoreOp' scoring operators calculate or update the fitness of a chromosome through the following steps:
  • Decoding the chromosome into a valid Spark configuration.
  • Initiating a SparkSubmit execution using the updated configuration.
  • Determining the fitness of the chromosome.
trait ScoreOp{
self =>
  protected[this] val execSparkSubmit: SparkConfiguration => (Int, Long)
  protected[this] val latencyFactor: Float
  protected[this] val serverHourlyCost: Float

  def score(population: Seq[Chromosome[Int, Float]]): Seq[Chromosome[Int, Float]] =
    population.map(score(_))

  private def score(chromosome: Chromosome[Int, Float]): Chromosome[Int, Float] = {
    val sparkConfig = ConfigEncoderDecoder.decode(chromosome)    // Step 1
    val (numServers, latency) = execSparkSubmit(sparkConfig)           // Step 2

    chromosome.fitness = 1.0/(math.exp(latencyFactor*latency) +        // Step 3
                                               serverHourlyCost*numServers)
    chromosome
  }

The scoring operator allocates the execution of each configuration to various containers by simultaneously running Spark submit tasks concurrently.

Selection

The selection process is outlined in the 'SelectionOp' trait, which includes two methods:
  • 'rand' - This method is used to initialize a population of chromosomes, up to a size of 'maxPopulationSize', with random values.
  • 'select' - This method ranks chromosomes according to their fitness, in decreasing order.
trait SelectionOp{
self =>
  protected[this] val maxPopulationSize: Int

  def rand[T : Ordering, U : Ordering](
    idsT: Seq[String],
    gaEncoder1: GAEncoder[T],
    idsU: Seq[String],
    gaEncoder2: GAEncoder[U]
  ): Seq[Chromosome[T, U]] =
    Seq.fill(maxPopulationSize)(Chromosome.rand(idsT, gaEncoder1, idsU, gaEncoder2))

  
 def select[T : Ordering, U : Ordering](chromosomes: Seq[Chromosome[T, U]]): Seq[Chromosome[T, U]] =
    chromosomes.sortWith(_.fitness > _.fitness).take(maxPopulationSize)


Mating cycle

A replication cycle involves the processes of crossing-over, mutation, scoring, and ultimately selecting chromosomes based on their fitness values. This cycle is encapsulated in the 'Reproduction' class, which has the following attributes:
  • execSparkSubmit: A function for executing Spark Submit.
  • latencyFactor: A factor used in calculating the fitness of chromosomes, based on latency.
  • serverHourlyCost: The hourly cost of an average server or container, factored into the fitness calculation of chromosomes.
  • maxPopulationSize: The upper limit on the number of chromosomes throughout the mating cycle.
  • xOverProbThreshold: A threshold value determining the likelihood of initiating a crossover.
  • mutationProbThreshold: A threshold value for the probability of triggering a mutation.
  • xOverStrategy: The strategy employed for crossover.
  • stopCondition: A function or condition that signals when to end the execution of the genetic algorithm.
class Reproduction protected (
  val execSparkSubmit: SparkConfiguration => (Int, Long),
  override val latencyFactor: Float,
  override val serverHourlyCost: Float,
  override val maxPopulationSize: Int,
  override val xOverProbThreshold: Double,
  override val mutationProbThreshold: Double,
  xOverStrategy: String,
  stopCondition: Seq[Chromosome[Int, Float]] => Boolean
) extends ScoreOp with SelectionOp with XOverOp with MutationOp {
  
  def mate(
    idsInt: Seq[String],
    gaEncoderInt: Seq[GAEncoder[Int]],
    idsFloat: Seq[String],
    gaEncoderFloat: Seq[GAEncoder[Float]]): Seq[Chromosome[Int, Float]] = {

    // Initialization of chromosomes
    val initialChromosomes = Seq.fill(maxPopulationSize)(
      Chromosome.rand(idsInt, gaEncoderInt, idsFloat, gaEncoderFloat)
    )
    // Recursive reproduction cycle
    mate(initialChromosomes, iterationCount = 0)
  }


  @tailrec 
  private def mate(
    chromosomes: Seq[Chromosome[Int, Float]], 
    iterationCount: Int): Seq[Chromosome[Int, Float]] = {

    val offSprings = xOver(chromosomes, xOverStrategy)
    val mutatedChromosomes = mutate(chromosomes ++ offSprings)
    val scoredChromosomes = score(mutatedChromosomes)
    val selectedChromosomes = select(scoredChromosomes)

    // If condition met, exit
    if (iterationCount > 16 || stopCondition(selectedChromosomes)) 
         selectedChromosomes
     // Otherwise recurse 
    else 
        mate(selectedChromosomes, iterationCount + 1)
  }

The 'mate' method executes the reproduction cycle, as outlined in the section detailing the basics of Genetic Algorithms. It requires the following inputs:
  • idsInt: A list of identifiers for configuration parameters that are of type Int.
  • gaEncoderInt: An encoder tailored for each configuration parameter of type Int.
  • idsFloat: A series of identifiers for configuration parameters that are of type Float.
  • gaEncoderFloat: An encoder specific to each of the configuration parameters of type Float.
This method is executed as tail recursion for enhanced efficiency, with a safety mechanism in the form of a stop condition based on the iteration count, set to 16. The flexible stop condition concludes the search by assessing the most recent group of chromosomes.

Examples of possible stop conditions include:
The scenario where the fittest chromosome (representing a Spark Configuration) exhibits a fitness level that is at least 20% higher than that of the next best chromosome.

val stopDiffCondition = (chromosomes: Seq[Chromosome[Int, Float]]) => 
  if(chromosomes.size == 1) true
  else 
    chromosomes.head.fitness/chromosomes(1).fitness >= 1.20

A second scenario where fitness for the best chromosome is at least 50% better that the average fitness of all chromosomes.

val stopAveCondition = (chromosomes: Seq[Chromosome[Int, Float]]) =>
  if (chromosomes.size ==1) true
  else {
    val chromosomesAveFitness = chromosomes.map(_.fitness).sum/chromosomes.size
    chromosomes.head.fitness / chromosomesAveFitness > 2.0
  }

Evaluation

The reproduction cycle applied to large set of parameters is very computationallyl intensive. Therefore, we restricted our test to 5 configuration parameters:
Integer parameters
    spark.default.parallelism
    spark.executor.memory
    spark.driver.memory
Float parameters
    spark.executor.memoryOverheadFactor 
    spark.executor.driverOverheadFactor

All other Spark configuration parameters are fixed.

We consider the ratio of fitness of the best chromosome over the average fitness of all chromosomes > 1.5 as stopping condition.
The fitness function is defined as  \[ fitness=\frac{1}{e^{0.001L} + 0.47N}\] where L is the latency in milliseconds and N the number of AWS EC2 m5d.2xlarge on-demand instances.
The test uses the Spark deployment for the storm tracking application described in  Tracking storms with Kafka/Spark streaming [ref 6]
The maximum number of iterations is set to 24.

Table of output of genetic algorithm to select two best Spark configuration

To ensure a viable and robust deployment of this Spark application in a production environment, a more extensive evaluation involving a greater number of parameters is necessary.

Alternative methods

Alternative methods can be inspired by the techniques used in tuning hyper-parameters of ML models [ref 7].

Gradient search: This technique estimates the derivative or gradient of the fitness function over the configuration parameters pj \[p_{i}^{t+1} = p_{i}^{t} + \eta \frac{\partial fitness}{\partial p_{j}}\]

Reinforcement learning: In this approach, the fitness function is converted into a reward applied to any action on the configuration parameters. The values of the configuration parameters define the state of the reinforcement learning model.  

Bayesian optimization: The Bayesian approach considers the optimization of the Spark configuration as a random entity and assigning a prior distribution to it. This prior reflects our assumptions about the function's behavior. Once we collect evaluations of the function, treated as data, this information is used to update the prior, resulting in the posterior distribution of the objective function. Subsequently, this posterior distribution is employed to create an acquisition function which is instrumental in determining the next point of query.

References



Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3