Sunday, November 26, 2023

Optimize Apache Spark with Genetic Algorithm

Target audience: Intermediate
Estimated reading time: 5'

Struggling with the complex task of configuring your Apache Spark application? 

Consider the innovative approach of a genetic algorithm, a search heuristic inspired by Charles Darwin's theory of evolution [ref 1]. This method applies the concepts of natural selection to efficiently determine the best configuration for your Apache Spark application. It aims to achieve an ideal balance between minimizing production costs and maximizing customer satisfaction.


Table of contents
      Encoding
      Gene
      Chromosome
      Mutation
      Crossover
      Scoring
      Selection
      Mating cycle
Follow me on LinkedIn

What you will learn: How to apply genetic algorithm to optimize the configuration parameters for deployment to production.

Notes:
  • Source code available at GitHub - Patrick Nicolas - Genetic Algorithm github.com/patnicolas/streaming/scala/org/pipeline/ga
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.
  • Environments: Apache Spark 3.3.1, Scala 2.13.4

Apache Spark configuration

The goal is to identify the best settings for Apache Spark [ref 2] configuration parameters that minimize deployment expenses while ensuring high-quality service, as indicated by reduced latency. This goal will be converted into a fitness function during the genetic algorithm's development.

Tunable parameters

We define an Apache Spark configuration parameter with the following attributes:
  • key: Spark parameter identifier
  • value: Default or initial value for the parameter
  • isDynamic: Is this parameter tunable?
  • paramType: Type of the parameter (Float, Int,)
  • range: Range of valid values (any values if undefined)
The subsequent portion of the Apache Spark configuration file demonstrates the characteristics of different configuration parameters. This includes parameters that are not tunable (non-dynamic).

{
"sparkParameters": [
  {
    "key": "spark.io.compression.snappy.blockSize",
    "value":"32k",
    "isDynamic":
false,
    "paramType":"String"
  },
  {
    "key": "spark.KryoSerializer.unsafe",
    "value":true,
    "isDynamic": true,
    "paramType":"Boolean"
  },
  {
    "key": "spark.KryoSerializer.buffer.max",
    "value":"64m",
    "isDynamic": true,
    "paramType":"Int",
    "range":["16","32", "64", "96", "128"]
  },
  {
    "key": "spark.KryoSerializer.buffer",
    "value":"64m",
    "isDynamic": true,
    "paramType":"Int",
    "range":["16","32", "64", "96"]
  },
  {
    "key": "spark.worker.cleanup.interval"
    "value":900,
    "isDynamic": false,
    "paramType":"Int",
    "range":["300","600", "900", "1200"]
  },

            ...

  {
    "key": "spark.shuffle.file.buffer",
    "value":"32k",
    "isDynamic": true,
    "paramType":"Int",
    "range":["16","32", "64", "128","256"]
  },
  {
    "key": "spark.driver.memory",
    "value":"6g",
    "isDynamic": true,
    "paramType":"Int",
    "range":["2","4", "6", "8","10"]
  },
  {
    "key": "spark.executor.driverOverheadFactor",
    "value":0.1,
    "isDynamic": true,
    "paramType":"Float",
    "range":["0.05","0.1", "0.15", "0.2","0.25"]
  },
  {
    "key": "spark.executor.memory",
    "value":"4g",
    "isDynamic": true,
    "paramType":"Int",
    "range":["2","3","4","5","6"]
  },
  {
    "key": "spark.executor.memoryOverheadFactor",
    "value":0.1,
    "isDynamic": true,
    "paramType":"Float",
    "range":["0.05","0.1","0.15","0.2","0.25"]
  },
  {
    "key": "spark.default.parallelism",
    "value":8,
    "isDynamic": true,
    "paramType":"Int",
    "range":["4","8", "16", "24","32"]
  },
             ...
  ]
}                
Example of a Spark configuration parameters

The parameter spark.executor.memory is a dynamic, tunable integer variable with 2, 3, 4, 5, & 6g valid values.
The parameter spark.io.compression.snappy.blockSize is not tunable so the value 32k is immutable.

Fitness function

The fitness function is a balance between service delivery quality and the hourly deployment cost. 
Our assumptions are as follows:
  • The deployment cost per hour rises linearly with the number of servers or containers used, denoted by N.
  • The adverse effect of latency on customer satisfaction and retention is estimated to grow exponentially with increasing latency L.
The fitness is consequently defined as... \[ fitness=\frac{1}{e^{\alpha L} + \beta N}\]

Genetic algorithm basics

Inspired by Charles Darwin's theory of natural evolution, a genetic algorithm is a search method that reflects the principles of natural selection. In this approach, the most suitable individuals are selected for reproduction to produce the next generation.
Scala's object-oriented and functional programming capabilities can be utilized to execute the computational process of an evolutionary algorithm.

Developed by John Holland in the 1970s [ref 2], Genetic Algorithms draw their characteristics from Darwin's evolution theory. A living organism is composed of cells, which contain identical chromosomes. These chromosomes, made up of DNA strings, act as a blueprint for the entire organism. A chromosome is made up of genes, which are DNA segments encoding specific proteins.
The first step in reproduction is recombination (or crossover), where genes from the parents form a completely new chromosome (offspring), which may then undergo mutation. During mutation, one or more elements, also known as individuals of the DNA strand or chromosome, are altered. These changes are primarily due to errors in copying genes from the parents. The organism's success in its environment determines its fitness.

In the field of computer science, Genetic Algorithms represent a problem-solving technique that emulates natural processes. They employ a combination of selection, recombination, and mutation to evolve a group of candidates for solving a particular problem. The fundamental computational steps are as follows:

  1. Initialize the population (search space) with a set of random chromosomes, each representing a specific Apache Spark configuration.
  2. Convert these chromosomes into a vector of real or integer values, or a string of bits.
  3. Pair chromosomes for crossover. This involves using a crossover rate to exchange a fragment or section of the "parent" chromosomes from a random point in the encoded string or vector.
  4. Mutate chromosomes by altering one or more of their elements (bits) using a randomly generated index, governed by a mutation rate.
  5. Evaluate each chromosome using a fitness function.
  6. Choose the most fit chromosomes (those that meet the minimum fitness criteria) for reproduction.
  7. Repeat the reproduction cycle (steps 2 to 6) for the newly formed population until a stop condition is reached.
Each genetic operator (selection, crossover, and mutation) depends on a specific parameter:
  • The selection rate is a random threshold value used to reduce the current chromosome population based on their fitness.
  • The crossover rate determines the index at which elements or bits of two parent chromosomes are swapped.
  • The mutation rate calculates the index of the element(s) or bit(s) in a chromosome that undergo mutation (or flipping).

Implementation

The initial phase involves converting the configuration parameters into genetic components. Every parameter is represented as a gene, corresponding to the type of that parameter. For example, a parameter with an integer attribute is linked with an instance of Gene[Int].

Illustration of association configuration parameters and genetic components


A configuration represents a distinctive, tunable collection of Spark parameters, which the genetic algorithm treats and manages as a chromosome. The goal of the genetic algorithm is to identify the most suitable chromosome and its corresponding Spark configuration for the application, thereby optimizing the objective (or fitness).

Encoding

The first step is to implement the configuration and parameters as respective classes, SparkConfiguration and ParameterDefinition.

case class SparkConfiguration(sparkParameters: Seq[ParameterDefinition])

case class ParameterDefinition(
  key: String,
  value: String,
  isDynamic: Boolean,
  paramType: String,
  range: Seq[String] = Seq.empty[String]
)

Next, we establish the genetic encoder responsible for transforming genes to and from configuration parameters [ref 4]. The GAEncoder trait encompasses two characteristics:
  • encodingLength: The number of bits required to represent a parameter's value.
  • range: A sequence of valid, adjustable values applicable to this parameter.
The sequence of bits, termed BitsRepr, is defined as a Seq[Int] consisting of either 0 or 1 values.

There are three primary methods:
  • rand: This method initializes a parameter value randomly.
  • apply: This function encodes a parameter's value into a sequence of bits.
  • unapply: This procedure decodes a bit sequence back into a parameter value.
traitGAEncoder[T]{
  val encodingLength: Int
  val range: Seq[T]

  def rand: T
  def apply(t: T): BitsRepr
  def unapply(bitsRepr: BitsRepr): T
}

Here's a genetic encoder for an Apache Spark configuration parameter of the Int type. The encoding function, 'apply', checks whether the parameter value, 't', falls within the valid range.

final class GAEncoderInt(
  override val encodingLength: Int,
  override val range: Seq[Int]) extends GAEncoder[Int]{

  private[this] val encoder = new BitsIntEncoder(encodingLength)

  override def rand: Int = Random.shuffle(range).head

  @throws(classOf[GAException])
  override def apply(t: Int): BitsRepr = {
     if(!range.contains(t))
       throw new GAException(s"Value $t violates constraint of quantizer")
     encoder(t)
  }

  override def unapply(bitsRepr: BitsRepr): Int = encoder.unapply(bitsRepr)
}

Gene

A Gene serves as the genetic representation of a configuration parameter. Consequently, its constructor requires the following:
  • The name of the parameter, referred to as 'id'.
  • The value of the parameter denoted as 't'.
  • An encoder, gaEncoder that corresponds to the parameter's type.
To minimize its memory usage and facilitate direct bit manipulation, the sequence of bits, 'bitsSequence', is transformed into a Java BitSet.

class Gene[T : Ordering] (id: String, t: T, gaEncoder: GAEncoder[T]) {
  // Encoding as a  sequence of {0, 1}
  private[this] val bitsSequence: BitsRepr = gaEncoder(t)

  // Encoding as Bit set
  private[this] val encoded: util.BitSet = {
    val bs =  new java.util.BitSet(gaEncoder.encodingLength)
    bitsSequence.indices.foreach(index => bs.set(index, bitsSequence(index) == 1))
    bs
  }


  def mutate(mutationProb: Double): Gene[T] = {
    (new MutationOp{
      override val mutationProbThreshold: Double = mutationProb
    }).mutate(this)
  }

The method mutate invokes the mutation operator, MutationOp, described in the next section.

Chromosome

A chromosome symbolizes a Spark configuration. Assuming the configuration parameters are of two types (namely Float and Int), the constructor accepts two parameters:
  • features1: This represents the features/genes of one type.
  • features2: This encompasses the features/genes of the other type.
Additionally, the attribute 'fitness' accumulates the score for the specified set of configuration parameters.

class Chromosome[T : Ordering, U : Ordering](
  features1: Seq[Gene[T]],
  features2: Seq[Gene[U]]){

  var fitness: Double = -1.0


  def xOver(
    otherChromosome: Chromosome[T, U],
    xOverThreshold: Double
  ): (Chromosome[T, U], Chromosome[T, U]) = 
     (new XOverOp{
        override val xOverProbThreshold: Double = xOverThreshold
     }).xOver(this, otherChromosome)


  
  def mutate(mutationProb: Double): Chromosome[T, U] = {
    (new MutationOp{
      override val mutationProbThreshold: Double = mutationProb
    }).xOver(this)
  }
}

The method 'xOver' executes the genetic crossover as outlined in the 'XOverOp' trait.
A crucial part of encoding a chromosome involves transforming a Spark configuration into a typed
Chromosome, featuring integer and floating point parameters/genes.

The process of encoding a Spark configuration is carried out by the '
encode' method. This involves purifying the parameter values from any units (denoted as 'cleansedParamValue'). The type of the configuration parameter, referred to as 'paramType', is utilized to create the encoder and gene of the suitable type.

def encode(sparkConfig: SparkConfiguration): Chromosome[Int, Float] = {
   val floatGenes = ListBuffer[Gene[Float]]()
   val intGenes = ListBuffer[Gene[Int]]()

   sparkConfig.sparkParameters.foreach(paramValue => {
      val value = paramValue.value
      val cleansedParamValue: String =
         if (!value.last.isDigit) value.substring(0, value.length - 1)
         else value

      paramValue.paramType match {
         case "Int" =>
            val encoder = new GAEncoderInt(encodingLength = 6, paramValue.range.map(_.toInt))
            val intGene = Gene[Int](paramValue.key, cleansedParamValue.toInt, encoder)
            intGenes.append(intGene)
            ....

The corresponding method decode is omitted in this post but is available on GitHub ConfigEncoderDecoder.scala

Mutation

The mutation process of a chromosome occurs in two stages:
  1. The genetic algorithm chooses a gene for mutation if the mutation probability falls below a specified threshold.
  2. A bit within the bit sequence is randomly selected and flipped based on a certain probability.
Illustration of mutation of a chromosome


It's important to note that mutating a bit sequence might not always result in a decoded value that is within the valid range set by the configuration parameter.

The 'MutationOp' trait encapsulates the implementation of the two-step mutation process. The first 'mutate' method applies the second mutation step to the chosen gene. Meanwhile, the second 'mutate' method carries out the first step of mutation on the chromosome.

trait MutationOp {
self =>
  protected[this] val mutationProbThreshold: Double
  private[this] val rand = new Random(42L)

   // Mutation of a gene - Step 2
  def mutate[T: Ordering](gene: Gene[T]): Gene[T] = 
     if(rand.nextDouble < mutationProbThreshold) {
         val newValue = createValidMutation(gene, implicitly[Ordering[T]])
         Gene[T](gene.getId, newValue, gene.getEncoder)
      }
      else
         gene

   
  def mutate[T: Ordering, U: Ordering](chromosomes: Seq[Chromosome[T,U]]): Seq[Chromosome[T,U]] = 
    chromosomes.map(mutate(_))      


  // Mutation of a chromosome - Step 1
  def mutate[T : Ordering, U: Ordering](chromosome: Chromosome[T, U]): Chromosome[TU]


The mutation operation initiates when a randomly generated value between [0, 1] remains below a specified low mutation rate, termed 'mutationProbThreshold'.

Afterward, the mutated bit sequence is decoded into a value of type T. This decoded value must fall within the range of valid values predetermined for this configuration parameter, a process known as 'createValidMutation'.

Regarding the mutation operator for a chromosome, it randomly selects one of the genes for mutation and then activates the mutation process with the chosen gene.

def mutate[T : Ordering, U: Ordering](chromosome: Chromosome[T, U]): Chromosome[T, U] =
  
  if(rand.nextDouble < mutationProbThreshold) {
    val features1 = chromosome.getFeatures1
    val features2 = chromosome.getFeatures2
    val chromosomeLength: Int = features1.length + features2.length

      // Select the index of the gene to be mutated, randomly
    val geneIndex = (chromosomeLength*Random.nextDouble).toInt

      // If the index of the gene to mutate is within the first set of features or
      // if there is only one set of features of same type..
    if(geneIndex < features1.length || features2.isEmpty) {
      val geneToMutate = features1(geneIndex)
      val mutatedGene: Gene[T] = apply(geneToMutate)

      features1.updated(geneIndex, mutatedGene)
    }

    // Otherwise the index of the gene to mutate is within the second set of features
else { val relativeIndex = geneIndex - features1.length val geneToMutate: Gene[U] = features2(relativeIndex) val mutatedGene: Gene[U] = apply(geneToMutate)

      features2.updated(relativeIndex, mutatedGene)
    }
    Chromosome[T, U](features1, features2)
 }
 else
   chromosome

Note: For the decoding of a bit sequence into an actual value, it's necessary to establish an implicit ordering for the types T and U associated with the parameters/genes.

Crossover

The crossover process involves dividing two 'parent' chromosomes and then merging their upper and lower segments to create two new 'offspring' chromosomes.

Illustration of cross-over 

The crossover operator is exclusively applied to chromosomes.
Exploring the implementation of crossover between two chromosomes, we find the method 'xOver', which takes the two original chromosomes as its arguments.

def xover[T : Ordering, U : Ordering](
  chromosome1: Chromosome[T, U],
  chromosome2: Chromosome[T, U]
): (Chromosome[T, U], Chromosome[T, U]) = {

  if(rand.nextDouble < xOverProbThreshold) {
    val xOverIndex = (chromosome1.size()*Random.nextDouble).toInt
    val features1Len = chromosome1.getFeatures1.length

      // The cross-over cut-off is done within the first set of genes, preserving
      // the second set of genes ..
    if(xOverIndex < features1Len)
      xOverFirstFeatures(chromosome1, chromosome2)
        // Otherwise the cross-over is performed within the second set of genes
    else
       xOverSecondFeatures(chromosome1, chromosome2)
  }
  else
    (chromosome1, chromosome2)
}


Several approaches exist for choosing candidate chromosomes for crossover [ref 5]. Three frequently used strategies include:
  • midPoint: This method involves dividing the population of ranked chromosomes into two groups and then combining these groups.
  • pairing: This strategy selects chromosome pairs that are contiguous in terms of their ranking.
  • random: This approach randomly selects two candidate chromosomes.


The crossover strategy, xOverStrategy is applied to a population of chromosomes ranked in decreasing value of their fitness as illustrated in the code snipped below.

def xover[T : Ordering, U : Ordering](
chromosomes: Seq[Chromosome[T, U]],
xOverStrategy: String
): Seq[Chromosome[T, U]] = xOverStrategy match {
 
case "midPoint" =>
val midPoint = chromosomes.length >> 1
val (topChromosomes, botChromosomes) = chromosomes.splitAt(midPoint)
val (offSprings1, offSpring2) = (0 until midPoint).map(
index => xover(topChromosomes(index), botChromosomes(index))
).unzip
offSprings1 ++ offSpring2
case "pairing" =>
val (offSprings1, offSpring2) = (chromosomes.indices by 2).map(
index => xover(chromosomes(index), chromosomes(index+1)) 
).unzip
offSprings1 ++ offSpring2
.....
}

Scoring

The 'ScoreOp' scoring operators calculate or update the fitness of a chromosome through the following steps:
  • Decoding the chromosome into a valid Spark configuration.
  • Initiating a SparkSubmit execution using the updated configuration.
  • Determining the fitness of the chromosome.
trait ScoreOp{
self =>
  protected[this] val execSparkSubmit: SparkConfiguration => (Int, Long)
  protected[this] val latencyFactor: Float
  protected[this] val serverHourlyCost: Float

  def score(population: Seq[Chromosome[Int, Float]]): Seq[Chromosome[Int, Float]] =
    population.map(score(_))

  private def score(chromosome: Chromosome[Int, Float]): Chromosome[Int, Float] = {
    val sparkConfig = ConfigEncoderDecoder.decode(chromosome)    // Step 1
    val (numServers, latency) = execSparkSubmit(sparkConfig)           // Step 2

    chromosome.fitness = 1.0/(math.exp(latencyFactor*latency) +        // Step 3
                                               serverHourlyCost*numServers)
    chromosome
  }

The scoring operator allocates the execution of each configuration to various containers by simultaneously running Spark submit tasks concurrently.

Selection

The selection process is outlined in the 'SelectionOp' trait, which includes two methods:
  • 'rand' - This method is used to initialize a population of chromosomes, up to a size of 'maxPopulationSize', with random values.
  • 'select' - This method ranks chromosomes according to their fitness, in decreasing order.
trait SelectionOp{
self =>
  protected[this] val maxPopulationSize: Int

  def rand[T : Ordering, U : Ordering](
    idsT: Seq[String],
    gaEncoder1: GAEncoder[T],
    idsU: Seq[String],
    gaEncoder2: GAEncoder[U]
  ): Seq[Chromosome[T, U]] =
    Seq.fill(maxPopulationSize)(Chromosome.rand(idsT, gaEncoder1, idsU, gaEncoder2))

  
 def select[T : Ordering, U : Ordering](chromosomes: Seq[Chromosome[T, U]]): Seq[Chromosome[T, U]] =
    chromosomes.sortWith(_.fitness > _.fitness).take(maxPopulationSize)


Mating cycle

A replication cycle involves the processes of crossing-over, mutation, scoring, and ultimately selecting chromosomes based on their fitness values. This cycle is encapsulated in the 'Reproduction' class, which has the following attributes:
  • execSparkSubmit: A function for executing Spark Submit.
  • latencyFactor: A factor used in calculating the fitness of chromosomes, based on latency.
  • serverHourlyCost: The hourly cost of an average server or container, factored into the fitness calculation of chromosomes.
  • maxPopulationSize: The upper limit on the number of chromosomes throughout the mating cycle.
  • xOverProbThreshold: A threshold value determining the likelihood of initiating a crossover.
  • mutationProbThreshold: A threshold value for the probability of triggering a mutation.
  • xOverStrategy: The strategy employed for crossover.
  • stopCondition: A function or condition that signals when to end the execution of the genetic algorithm.
class Reproduction protected (
  val execSparkSubmit: SparkConfiguration => (Int, Long),
  override val latencyFactor: Float,
  override val serverHourlyCost: Float,
  override val maxPopulationSize: Int,
  override val xOverProbThreshold: Double,
  override val mutationProbThreshold: Double,
  xOverStrategy: String,
  stopCondition: Seq[Chromosome[Int, Float]] => Boolean
) extends ScoreOp with SelectionOp with XOverOp with MutationOp {
  
  def mate(
    idsInt: Seq[String],
    gaEncoderInt: Seq[GAEncoder[Int]],
    idsFloat: Seq[String],
    gaEncoderFloat: Seq[GAEncoder[Float]]): Seq[Chromosome[Int, Float]] = {

    // Initialization of chromosomes
    val initialChromosomes = Seq.fill(maxPopulationSize)(
      Chromosome.rand(idsInt, gaEncoderInt, idsFloat, gaEncoderFloat)
    )
    // Recursive reproduction cycle
    mate(initialChromosomes, iterationCount = 0)
  }


  @tailrec 
  private def mate(
    chromosomes: Seq[Chromosome[Int, Float]], 
    iterationCount: Int): Seq[Chromosome[Int, Float]] = {

    val offSprings = xOver(chromosomes, xOverStrategy)
    val mutatedChromosomes = mutate(chromosomes ++ offSprings)
    val scoredChromosomes = score(mutatedChromosomes)
    val selectedChromosomes = select(scoredChromosomes)

    // If condition met, exit
    if (iterationCount > 16 || stopCondition(selectedChromosomes)) 
         selectedChromosomes
     // Otherwise recurse 
    else 
        mate(selectedChromosomes, iterationCount + 1)
  }

The 'mate' method executes the reproduction cycle, as outlined in the section detailing the basics of Genetic Algorithms. It requires the following inputs:
  • idsInt: A list of identifiers for configuration parameters that are of type Int.
  • gaEncoderInt: An encoder tailored for each configuration parameter of type Int.
  • idsFloat: A series of identifiers for configuration parameters that are of type Float.
  • gaEncoderFloat: An encoder specific to each of the configuration parameters of type Float.
This method is executed as tail recursion for enhanced efficiency, with a safety mechanism in the form of a stop condition based on the iteration count, set to 16. The flexible stop condition concludes the search by assessing the most recent group of chromosomes.

Examples of possible stop conditions include:
The scenario where the fittest chromosome (representing a Spark Configuration) exhibits a fitness level that is at least 20% higher than that of the next best chromosome.

val stopDiffCondition = (chromosomes: Seq[Chromosome[Int, Float]]) => 
  if(chromosomes.size == 1) true
  else 
    chromosomes.head.fitness/chromosomes(1).fitness >= 1.20

A second scenario where fitness for the best chromosome is at least 50% better that the average fitness of all chromosomes.

val stopAveCondition = (chromosomes: Seq[Chromosome[Int, Float]]) =>
  if (chromosomes.size ==1) true
  else {
    val chromosomesAveFitness = chromosomes.map(_.fitness).sum/chromosomes.size
    chromosomes.head.fitness / chromosomesAveFitness > 2.0
  }

Evaluation

The reproduction cycle applied to large set of parameters is very computationallyl intensive. Therefore, we restricted our test to 5 configuration parameters:
Integer parameters
    spark.default.parallelism
    spark.executor.memory
    spark.driver.memory
Float parameters
    spark.executor.memoryOverheadFactor 
    spark.executor.driverOverheadFactor

All other Spark configuration parameters are fixed.

We consider the ratio of fitness of the best chromosome over the average fitness of all chromosomes > 1.5 as stopping condition.
The fitness function is defined as  \[ fitness=\frac{1}{e^{0.001L} + 0.47N}\] 
where L is the latency in milliseconds and N the number of AWS EC2 m5d.2xlarge on-demand instances.
The test uses the Spark deployment for the storm tracking application described in  Tracking storms with Kafka/Spark streaming [ref 6]
The maximum number of iterations is set to 24.

Table of output of genetic algorithm to select two best Spark configuration

To ensure a viable and robust deployment of this Spark application in a production environment, a more extensive evaluation involving a greater number of parameters is necessary.

Alternative methods

Alternative methods can be inspired by the techniques used in tuning hyper-parameters of ML models [ref 7].

Gradient search: This technique estimates the derivative or gradient of the fitness function over the configuration parameters pj
 \[p_{i}^{t+1} = p_{i}^{t} + \eta \frac{\partial fitness}{\partial p_{j}}\]


Reinforcement learning: In this approach, the fitness function is converted into a reward applied to any action on the configuration parameters. The values of the configuration parameters define the state of the reinforcement learning model.  

Bayesian optimization: The Bayesian approach considers the optimization of the Spark configuration as a random entity and assigning a prior distribution to it. This prior reflects our assumptions about the function's behavior. Once we collect evaluations of the function, treated as data, this information is used to update the prior, resulting in the posterior distribution of the objective function. Subsequently, this posterior distribution is employed to create an acquisition function which is instrumental in determining the next point of query.

References



Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3

Sunday, November 19, 2023

Explainable ML models with SHAP

Target audience: Beginner
Estimated reading time: 5'

Have you ever faced the need to rationalize the prediction made by one of your models, or to identify which features are crucial? If so, SHAP values and plots are your go-to resources, offering the fundamental structure for an explanation.


Table of contents
       Use case
       Dataset
       Models
       Metrics
       Dependency plot
       Decision plot
       Force plot
Follow me on LinkedIn

What you will learn: How to use SHAP values and plots to identify the most significant features for multi-classification models.

Notes:

Introduction

SHAP (SHapley Additive exPlanations), which is based on the concepts of game theory, is employed to clarify the predictions of machine learning models [ref 1]. This approach evaluates the contribution of each feature to a model's prediction, aiding in pinpointing the key features and understanding their specific effects on the model's results.

The complete description of the theory behind SHAP [ref 2] is beyond the scope of this article but can be summarized as follow:
For M players, S a subset of M players
\[\varphi _{i}= \sum _{S\sqsubseteq M-\left \{ i \right \} }\frac{|S|! (|M|-|S|-1)!}{|M|!}\left ( f(S \cup \left \{ i \right \}) -f(S)) \right )\] where f is the prediction model\[S\sqsubseteq M-\left \{ i \right \}\] is the subset S of players excluding player i

The prediction made by a model, denoted as f, can be expressed as the total of its SHAP values plus a constant base value, as shown in the equation: f(x)=base.value+(SHAP.values)
To begin a global interpretation using SHAP, one should first look at the average absolute SHAP value for every feature across the entire dataset. This approach measures the average impact (whether positive or negative) of each feature's contribution to the predicted air quality index.

Use Case

SHAP values serve various purposes, including:
  • Debugging models to spot biases or anomalies in the data.
  • Assessing feature importance to pinpoint and eliminate features with minimal impact.
  • Providing detailed explanations for individual predictions.
  • Summarizing models using SHAP value summary plots.
  • Detecting biases to determine if specific features have an undue influence on certain groups.
  • Facilitating regulatory approval by elucidating the model's decision-making process.
In this article, our aim is to calculate SHAP values and analyze the significance of each feature in three classification models. These models are used to forecast Air Quality in 138 cities across the Philippines.

Dataset

We used the Air Quality Index (AQI) dataset of 138 Philippine cities weather data, available In Open Weather Map from Kaggle data repository [ref 3].

The 8 features are components that contribute to air pollution such as  Carbon monoxide (CO), Nitrogen monoxide (NO), Nitrogen dioxide (NO2), Ozone (O3), Sulphur dioxide (SO2), Ammonia (NH3), and particulates (PM2.5 and PM10). 
The 5 labels/classes are indexed as Good (1), Fair (2), Moderate (3), Poor (4), Very Poor (5).

SHAP values and plots

First we implement the class SHAPEval to compute the SHAP values and generate Summary, Dependency, Force and Decision plots, given a predictive model, model_prediction [ref 4].
class SHAPEval(object):
  def __init__(self, model_predictor, plot_type: SHAPPlotType):
     self.model_predictor = model_predictor
     self.plot_type = plot_type


  def __call__(self, validation_data: pd.array, column_names: List[AnyStr]) -> NoReturn:
        # 1- Compute SHAP values
    shap_descriptor = shap.KernelExplainer(self.model_predictor, validation_data)
    shap_values = shap_descriptor.shap_values(validation_data)
        
        # 2- Apply specific  plot to validation data and extracted SHAP values
    match self.plot_type:
       case SHAPPlotType.SUMMARY_PLOT:
           shap.summary_plot(shap_values, validation_data, feature_names=column_names)
       
       case SHAPPlotType.PARTIAL_DEPENDENCY_PLOT:
           shap.dependence_plot("o3", shap_values, validation_data, feature_names=column_names)
       
       case SHAPPlotType.FORCE_PLOT:
           data_point_rank = 8
           shap.force_plot(
                    shap_descriptor.expected_value,
                    shap_values[data_point_rank,:],
                    validation_data[data_point_rank,:],
                    feature_names=column_names,
                    matplotlib=True)
        
       case SHAPPlotType.DECISION_PLOT:
           shap.decision_plot(
                    shap_descriptor.expected_value,
                    shap_values,
                    feature_names=column_names,
                    link='logit')
       case _:
           raise Exception(f'Plot type {self.plot_type} is not supported')


The dunder special method, __call__ accepts a test dataset, validation_data, and a list of feature names, column_names, for the following purposes:
  1. To calculate SHAP values using a Kernel Explainer.
  2. To create various SHAP visualizations.
Different types of explainers exist for various models, such as the TreeExplainer for random forests, the SamplingExplainer for models with independent features, or the DeepExplainer for differentiable models [ref 5].

For our purposes, we have chosen the Kernel Explainer. Its approach of employing weighted linear regression to determine the significance of each feature is particularly well-suited for models like logistic regression, support vector machines, and neural networks.

Models

Following this, we use the SHAPEval method on each of the three models. The ModelEval class, designed for evaluating models, has a constructor with four parameters:
  • filename: This refers to the location of the CSV file that holds the Air Quality Index data.
  • dropped_features: A list of features deemed irrelevant, which will be omitted from the training dataset.
  • label: The column that serves as the target for the classification model.
  • val_train_split: This denotes the proportion of samples allocated for validation compared to training.
@dataclass
class TestMetric:
  accuracy: float
  f1: float
  mean_squared_error: float



class ModelEval(object):
  random_state = 5713
   
  def __init__(self,
                 filename: AnyStr,
                 dropped_features: List[AnyStr],
                 label: AnyStr,
                 val_train_split: float):

     def set_label(x: float) -> int:
        return int(x) - 1

     df = pd.read_csv(filename)
        # Drop non features and label columns
     dropped_features.append(label)
     X = df.drop(dropped_features, axis=1)
        
        # Apply standard normalization
     X_scaled = StandardScaler().fit(X).transform(X)
        # Select column containing label
     y = df[label].apply(set_label)
        
         # Train - validation split
     self.feature_names = X.columns.values.tolist()
     self.X_train, self.X_val, self.y_train, self.y_val = \
            train_test_split(X_scaled, y, test_size=val_train_split, random_state=ModelEval.random_state)



   def __call__(self, model_type: ModelType, plot_type: SHAPPlotType) -> TestMetric:
          # Initialize the classification model
      match model_type:
        case ModelType.LOGISTIC_REGRESSION:
            model = LogisticRegression(
                    solver='lbfgs', 
                    max_iter=1000, 
                    penalty='l2', 
                    multi_class='multinomial')

        case ModelType.SVM:
            model = SVC(
                   kernel="rbf", 
                   decision_function_shape='ovo', 
                   random_state=ModelEval.random_state)

        case ModelType.MLP:
            model = MLPClassifier(
                    hidden_layer_sizes=(32, 16),
                    max_iter=500,
                    alpha=0.0001,
                    solver='adam',
                    random_state=ModelEval.random_state)
        case _:
            raise Exception(f'Model name {model_type} is not supported')
             
             # Train the model
      model.fit(self.X_train, self.y_train)
             # Compute SHAP values and selected plots
      shap_eval = SHAPEval(model.predict, plot_type)
      shap_eval(self.X_val,  self.feature_names)
             
             # prediction and quality metrics
      y_predicted = model.predict(self.X_val)
      return TestMetric(
            accuracy_score(self.y_val, y_predicted),
            f1_score(self.y_val, y_predicted, average='weighted'),
            mean_squared_error(self.y_val, y_predicted)
        )


The following code snippet instantiates the ModelEval class to generate a decision plot (SHAPPlotType.DECISION_PLOT) for the logistic regression (ModelType.LOGISTIC_REGRESSION)

test_filename = '../../data/Philippine_Air_Quality.csv'
test_drop_features = ['datetime', 'coord.lon', 'coord.lat', 'extraction_date_time', 'city_name']
test_label = 'main.aqi'
test_size = 0.01

try:
   model_eval = ModelEval(test_filename, test_drop_features, test_label, test_size)
   test_metrics = model_eval(ModelType.LOGISTIC_REGRESSION, SHAPPlotType.DECISION_PLOT)
 
except SHAPException as e:
    print(str(e))
except Exception as e:
    print(str(e))


Evaluation

The three models been evaluated are using Adam optimizer
  • Logistic regression with L-BFGS solving and L2 regularization
  • Support Vector Machine with Adam optimizer, radial basis function kernel function and ovo decision function shape 
  • Multi-layer perceptron with two hidden layers of respective sizes 32, 16 and Adam solver

Metrics

The quality metrics output for the three models are:
ModelAccuracyF1-ScoreMSE
Logistic Regression0.9280.9240.119
Support Vector Machine0.9740.9530.025
Multi-Layer Perceptron0.9920.9890.002


Comparative summary plots

API: shap.summary_plot(shap_values, data, feature_names)

Initially, we calculate and present a summary report detailing the SHAP values for all three models: logistic regression, support vector machine, and multi-layer perceptron. This plot illustrates the positive and negative correlations between the predictors and the target variable. 
The 'dotty' appearance of the plot arises from the inclusion of each data point from the training dataset. By examining the distribution and positioning of the dots across various features, we can assess which features exert the most influence. Some features may demonstrate a uniform effect (indicated by closely grouped dots), whereas others may show more diverse impacts (evidenced by dots that are more widely scattered).

SHAP summary plot for Logistic Regression with 156 samples

SHAP summary plot for Support Vector Machine with 96 samples

SHAP summary plot for Multi-layer Perceptron with 780 samples

The data points in the plot are arranged along the X-axis based on their SHAP values, ranging from -0.6 to 2.2. The thickness of the stack at each SHAP value indicates how many data points have that particular value, representing the density or concentration of the SHAP value. Additionally, the vertical 'feature value' bar is colored to show the actual raw prediction values.

In these plots, the features like o3, pm2_5, and others are ordered from top to bottom according to their average absolute SHAP value.

The consistency of SHAP values across the three models—logistic regression, support vector machine, and multi-layer perceptron—emphasizes the significance of the o3 and pm2_5 components in influencing the predictions. Notably, the Multi-layer perceptron model displays one or two predominant SHAP values for each feature, aligning with its high f1 score as a classifier.

Dependency plot

API:  shap.dependence_plot('o3', shap_values, data, feature_names)

The dependency plot illustrates the impact that one or two variables exert on the predicted result, revealing the nature of the relationship—whether it's linear, monotonic, or more intricate—between the target and the variables. This type of plot is especially useful for understanding models based on ensemble methods and deep learning.

We will proceed to create a SHAP dependence plot for the neural network model, utilizing a dataset of 780 samples.

SHAP dependency between o3 and pm10 components plot for MLP with 780 samples

The x-axis represents the numerical values of the feature o3. The y-axis shows the SHAP values for both o3 and pm10 features. The higher the value, the greater the impact on the prediction.
The high dispersion along the y-axis indicates that there is some dependency between the targeted feature o3 and other features, primarily pm10.

Decision plot

API: shap.decision_plot(expected_value, shap_values, feature_names, link='logit')

SHAP decision plots reveal the process by which complex models make their predictions, essentially illustrating the decision-making mechanism of these models. In these plots, features are ranked in order of their importance, which is calculated based on the observations being plotted.

Each observation's predicted outcome is depicted by a line of a specific color. These lines intersect the x-axis at the top of the plot, at points that correspond to the predicted values for the observations. The predicted value is what determines the color of the line, typically represented on a spectrum.

The plot effectively demonstrates how the contribution of each feature adds up to the final prediction made by the model.


SHAP Decision plot on 156 samples for logistic regression


The dataset's average prediction, also known as the base value, is set at 0.64. The features, such as o3 and others, are organized in a descending order based on their significance. Each line in the plot represents either a test or validation sample and shows the cumulative effect of each feature. A movement towards the right of the base value (0.64) signifies that the feature positively influences the prediction. Conversely, a shift towards the left indicates that the feature negatively affects the prediction.

In the plot, 156 validation samples are illustrated, culminating in four distinct final probability values: 0.43, 0.73, 0.88, and 0.98.

Force Plot

API: shap.force_plot(expected_value, shap_values[index,:], data[index,:], feature_names, matplotlib=True)

For each observation, you can create a sophisticated visualization known as the force plot. In these plots, features are arranged from left to right, with those making a positive impact positioned on the left and those with a negative impact on the right. For the 8th observation, the key features influencing the model's prediction are highlighted in red and blue. Red indicates the features that increased the model's score, while blue denotes the features that decreased the score.

SHAP observation force plot for 8th sample with logistic regression


Each feature's contribution is represented by an arrow, colored to reflect its impact. The size and orientation of these arrows demonstrate both the strength and the nature (positive indicated by red, negative by blue) of each feature's influence on the prediction.

As highlighted in the summary plot, the o3 component emerges as a primary feature, exerting a negative effect on the prediction with a score of -0.746. Conversely, the pm2_5 feature makes a positive contribution, impacting the prediction with a score of 0.246.

Limitations

Despite its usefulness, SHAP comes with certain constraints, including:
  • It demands substantial computational resources, especially for intricate multi-label or multi-class models that use extensive datasets.
  • The computation relies on the assumption of feature independence, particularly in the case of Kernel or Linear SHAP.
  • While SHAP reveals the extent to which a feature influences a prediction, it does not explain how these features collectively contribute to the target variable.
Thank you for reading this article. For more information ...

References



----------------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3