Loading [MathJax]/extensions/TeX/AMSsymbols.js

Wednesday, December 13, 2023

Distributed Recursive Kalman Filter on Large Datasets

Target audience: Expert
Estimated reading time: 7'

Have you ever pondered the best way to efficiently operate a Kalman filter with a significantly large set of measurements? The predict and update/correct cycle of Kalman filtering can be computationally demanding when dealing with millions of records. Utilizing recursion and Apache Spark might be the solution you're looking for.



       Overview
       Noise
       Prediction
       Tail recursion
       Kalman predictor
       Recursive method


What you will learn: How to implement the Kalman Filter on a vast set of measurements utilizing multiple sampling techniques, recursion, and Apache Spark.

Notes:
  • Environments: Apache Spark 3.4.0, Scala 2.12.11
  • Source code available at GitHub github.com/patnicolas/kalman
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.
  • A simple Scala implementation of Kalman Filter is described in Discrete Kalman Predictor in Scala


The Challenge

The Internet of Things (IoT) produces an immense volume of data points. Implementing Kalman filtering on these measurements can require significant computational resources. A potential approach to manage this is by sampling the data to approximate its distribution. However, it's important to note that there's no assurance that the chosen sampling technique will maintain the original distribution of the raw data. 


Employing a combination of different types of samplers could help mitigate the effects of a reduced dataset on the precision of the Kalman filter's predictions.

Illustration of sampling and distribution of Kalman predictions using Spark

Kalman filter

First let's look at the Kalman optimal filter, and its implementation on Spark using fast recursion. Renowned in the realms of signal processing and statistical analysis, the Kalman filter serves as a potent tool to measure or estimate noise arising from processes and the disturbances introduced by measurement instruments [ref 1]. 

Overview

Kalman filter serves as an ideal estimator, determining parameters from imprecise and indirect measurements. Its goal is to reduce the mean square error associated with the model's parameters. Being recursive in nature, this algorithm is suited for real-time signal processing. However, one notable constraint of the Kalman filter is its need for the process to be linear, represented as y = a.f(x) + b.g(x) + .... 

Illustration of the Kalman prediction process

Noise

The state of a deterministic time linear dynamic system is the smallest vector that summarizes the past of the system in full and allow a theoretical prediction of the future behavior, in the absence of noise.
There are two source of noise:
  • Noise generated by the process following a normal distribution with zero mean and a Q variance, N(0,Q)
  • Noise generated by the measurement devices that also follows a Normal distribution N(0, R)
Based on an observation or measurement zn, the true state xn is forecasted using the previous state xn1 and the prior measurement zn1 through a process that alternates between prediction and updating, as illustrated in the diagram below:

Illustration of the sequence of operations in Kalman Filter

Prediction

After initialization, the Kalman Filter forecasts the system's state for the upcoming step and estimates the uncertainty associated with this prediction.

Considering An as the state transition model applied to the state xn1, Bn as the control input model applied to the control vector un if it exists, Qn as the covariance of the process noise, and Pn as the error covariance matrix, the forecasted state xis \[\begin{matrix} \widetilde{x}_{n/n-1}=A_{n}.\widetilde{x}_{n-1/n-1} + B_{n}.u_{n} \ \ (1)\\ P_{n/n-1}=A_{n}.P_{n-1/n-1}.A_{n}^{T}+Q_{n} \ \ (2) \end{matrix}\]

Measurement update & optimal gain

Upon receiving a measurement, the Kalman Filter adjusts or corrects the forecast and uncertainty of the current state. It then proceeds to predict future states, continuing this process. 
Thus, with a measurement zn1, a state xn1, and the innovation Sn, the Kalman Gain  and the error covariance are calculated according. \[\begin{matrix} S_{n}=H.P_{n/n-1}.H^{T} +R_{n} \ \ \ \ \  (3)\ \ \ \ \ \ \ \ \ \ \ \ \ \\ G_{n} = \frac{1}{S_{n}}.P_{n/n-1}.H^{T}\ \ \ \ \  (4) \ \ \ \ \ \ \ \ \ \ \ \ \ \ \  \ \ \ \ \ \ \\ \widetilde{x}_{n/n} = \widetilde{x}_{n/n-1}+G_{n}(z_{n}-H.\widetilde{x}_{n/n-1}) \ \ \ (5) \\ g_{n}=I - G_{n}.H \ \ \ \ \ (6) \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \\ P_{n,n}= g_{n}.P_{n/n-1}.g_{n}^{T}+G_{n}.R_{n}.G_{n}^{T} \ \ \ \ (7) \ \ \end{matrix}\]

Apache Spark

Apache Spark is a free, open-source framework for cluster computing, specifically designed to process data in real time via distributed computing [ref 2]. Its primary applications include:
  • Analytics: Spark's capability to quickly produce responses allows for interactive data handling, rather than relying solely on predefined queries.
  • Data Integration: Often, the data from various systems is inconsistent and cannot be combined for analysis directly. To obtain consistent data, processes like Extract, Transform, and Load (ETL) are employed. Spark streamlines this ETL process, making it more cost-effective and time-efficient.
  • Streaming: Managing real-time data, such as log files, is challenging. Spark excels in processing these data streams and can identify and block potentially fraudulent activities.
  • Machine Learning: The growing volume of data has made machine learning techniques more viable and accurate. Spark's ability to store data in memory and execute repeated queries swiftly facilitates the use of machine learning algorithms.

Implementation

Tail recursion

The Kalman filter operates as a recursive estimator, implying that it computes the current state's estimate using just the previous time step's estimated state and the current measurement. Consequently, its reliance on recursion for implementation is highly logical.
A recursive function is said to be tail-recursive if the recursive call is the last thing done in the function before returning. A tail-recursive is significantly faster than non-tail recursion because it does not require stack frames [ref 3].

For measurements zn the recursion process one measurement at the time:
    execute(zn, predictions):
        IF zn last measurement:
            return predictions
        ELSE:
           predict
           predicted_z = update
           predictions.add(predicted_z )
           execute(zn+1, prediction)


Process & measurement noises

We use Spark linear algebra types, DenseVector and DenseMatrix to represent vector and matrices associated with the Kalman parameters [ref 4].

case class KalmanNoise(
  qNoise: Double,          // Standard deviation of process noise
  rNoise: Double,          // Standard deviation of the measurement noise
  length: Int,                  // Number of features or rows associated with the noise
  noiseGen: Double => Double) {  // Distribution function for generating noise
 
  final def processNoise: DenseMatrix =
    new DenseMatrix(length, length, randMatrix(length, qNoise, noiseGen).flatten)
  
  final def measureNoise: DenseMatrix =
    new DenseMatrix(length, length, Array.fill(length*length)(noiseGen(qNoise)))
}

The KalmanNoise class is designed to not specifically assume that the process and measurement noises are Gaussian (white noise) to allow for a more generalized approach. However, in practical applications, these noise components often adhere to a normal distribution with a mean of 0 and standard deviations denoted by qNoise for process noise and rNoise for measurement noise, respectively.

Kalman parameters

To enhance clarity, the source code adopts the same naming convention for variables as those found in the equations for Kalman prediction and update. Adhering to object-oriented design principles, the KalmanParameters class specifies how to calculate the residuals, innovation, and the optimal Kalman gain using these parameters.

case class KalmanParameters(
  A: DenseMatrix,     // State transition dense matrix
  B: DenseMatrix,     // Optional Control dense matrix
  H: DenseMatrix,     // Measurement dense matrix
  P: DenseMatrix,     // Error covariance dense matrix
  x: DenseVector) {   // Estimated value dense vector

  private def HTranspose: DenseMatrix = H.transpose
  def ATranspose: DenseMatrix = A.transpose

  /**. Compute the difference residual = z - H.x. */
  def residuals(z: DenseVector): DenseVector = subtract(z, H.multiply(x))

  /** Compute S = H.P.H_transpose + measurement noise.  equation 3 */
  def innovation(measureNoise: DenseMatrix): DenseMatrix =
     add(H.multiply(P).multiply(HTranspose), measureNoise)

  /**. Compute the Kalman gain G = P * H_transpose/S.  equation 4*/
  def gain(S: DenseMatrix): DenseMatrix = {
     val invStateMatrix = inv(S)
     P.multiply(HTranspose).multiply(invStateMatrix)
  }
}


Kalman predictor

The RKalman class, designed for implementing the recursive sequence of predict-update, accepts two parameters:
  • Initial parameters for the Kalman filter, named initialParams.
  • The implicit process and measurement noises, referred to as kalmanNoise.

The predict method executes the two predictive equations, labeled (1) and (2). It allows for an optional control input variable U as its argument. The update method then proceeds to refresh the Kalman parameters, specifically x and the error covariance matrix P, before calculating the optimal Kalman gain.


class RKalman(initialParams: KalmanParameters)(implicit kalmanNoise: KalmanNoise){
  private[this] var kalmanParams: KalmanParameters = initialParams


  def apply(z: Array[DenseVector]): List[DenseVector] = { .. }

  /**   x(t+1) = A.x(t) + B.u(t) + Q
   *    P(t+1) = A.P(t)A^T^ + Q    */
  def predict(U: Option[DenseVector] = None): DenseVector = {
    // Compute the first part of the state equation S = A.x
    val newX = kalmanParams.A.multiply(kalmanParams.x) // Equation (1)
    
    // Add the control matrix if u is provided  S += B.u
    val correctedX = U.map(u => kalmanParams.B.multiply(u)).getOrElse(newX) 
    
    // Update the error covariance matrix P as P(t+1) = A.P(t).A_transpose + Q
    val newP = add(             // Equation (2)
      kalmanParams.A.multiply(kalmanParams.P).multiply(kalmanParams.ATranspose),
      kalmanNoise.processNoise
    )
    // Update the kalman parameters
    kalmanParams = kalmanParams.copy(x = correctedX, P = newP)
    kalmanParams.x
  }

  /** Implement the update of the state x and error covariance P given the 
   *  measurement z and compute the Kalman gain */
  def update(z: DenseVector): DenseMatrix = {
    val y = kalmanParams.residuals(z)
    val S = kalmanParams.innovation(kalmanNoise.measureNoise) // Equation (3)
    val kalmanGain: DenseMatrix = kalmanParams.gain(S)          // Equation (4)
    
    val nextX = add(kalmanParams.x, kalmanGain.multiply(y))     // Equation (5)
    kalmanParams = kalmanParams.copy(x = nextX)
    
    val nextP = updateErrorCovariance(kalmanGain)               // Equation (7)
    kalmanParams = kalmanParams.copy(P = nextP)
    kalmanGain
  }
}

Recursive method

Let's apply tail recursion to an array of measurements specified as a DenseVector type. The recursion, wrapped in method recurse, stops after processing the final measurement, zlast. If not, the Kalman filter's current parameters are subjected to a predict-update cycle. This cycle is executed for the measurement at zindex, after which the method recursively invokes itself for the subsequent measurement at zindex+1.

def recurse(z: Array[DenseVector]): List[DenseVector] = {

  @tailrec
  def execute(
    z: Array[DenseVector],
    index: Int,
    predictions: ListBuffer[DenseVector]): List[DenseVector] = {
      if (index >= z.length)  // Criteria to end recursion
        predictions.toList
      else {
        val nextX = predict()
        val estimatedZ: DenseVector = kalmanParams.H.multiply(nextX)
        predictions.append(estimatedZ)
        update(z(index))
            
         // Execute the next measurement points
        execute(z, index + 1, predictions)
     }
  }

  execute(z, 0, ListBuffer[DenseVector]())
}

Sampling-based estimator

We are prepared to utilize the recursive method, apply(), to sample the raw measurement set z using various sampling algorithms and process each resulting sample in parallel with Apache Spark, where each sample is allocated to a partition. In our approach, we employ a random uniform sampler with varying parameters. 

For the current measurement zn, the subsequent value to be gathered is \[z_{next} \leftarrow z_{n+a+rand[0, b]}\]After creation, the samples are processed in parallel using Spark's mapPartitions methods. Each worker node creates an instance of RKalman and makes a recursive call using the method, recurse.

def apply(
  kalmanParams: KalmanParameters,// Kalman parameters used by the filter/predictor
  z: Array[DenseVector],         // Series of observed measurements as dense vector
  numSamplingMethods: Int,  // Number of samples to be processed concurrently
  minSamplingInterval: Int,     // Minimum number of samples to be ignored between sampling
  samplingInterval: Int            // Range of random sampling
)(implicit sparkSession: SparkSession): Seq[Seq[DenseVector]] = {

  // Generate the various samples from the large set of raw measurements
  val samples: Seq[Seq[DenseVector]] = (0 until numSamplingMethods).map(
   _ => sampling(z, minSamplingInterval, samplingInterval)
  )

  // Distribute the Kalman prediction-correction cycle over Spark workers
  // by assigning a partition to a Kalman process and sampled measurement.
  val samplesDS = samples.toDS()
    val predictionsDS = samplesDS.mapPartitions(
     (sampleIterator: Iterator[Seq[DenseVector]]) => {
       val acc = ListBuffer[Seq[DenseVector]]()

       while(sampleIterator.hasNext) {
         implicit val kalmanNoise: KalmanNoise = KalmanNoise(kalmanParams.A.numRows)
          
         val rKalman = new RKalman(kalmanParams)
         val z = sampleIterator.next()
         acc.append(rKalman.recurse(z))
      }
      acc.iterator
    }
  ).persist()
 
  predictionsDS.collect()
}

The code for our sampling method is listed in the appendix.


Use case

Our implementation is evaluated with measurement of the velocity of a rocket given a constant acceleration. The error covariance, P is initialized with a mean value of 0.5.

def velocity(x: Array[Double]): KalmanParameters = {
   val acceleration = 0.0167

   val A = Array[Array[Double]](            //  State transition dense matrix
     Array[Double](1.0, acceleration), Array[Double](0.0, 1.0)
   )
   val H = Array[Array[Double]](          // Measurement dense matrix
     Array[Double](1.0, 0.0), Array[Double](0.0, 0.0)
   )
   val P = Array[Array[Double]](         // Error covariance dense matrix
     Array[Double](0.5, 0.0), Array[Double](0.0, 0.5)
   )
   KalmanParameters(A, None, H, Some(P), x)
}

We simulate the raw measurements for the velocity using the simple formula \[\begin{vmatrix} v(x) \\ 1 \end{vmatrix} \ \ with\ \ v(x)=0.01.x^{2}+\frac{0.002}{x+2}+N(0, 0.2)\].
 // Simulated velocity measurements
val z = Array.tabulate(2000)(n =>
  Array[Double](n * n * 0.01 + 0.002 / (n + 2) + normalRandomValue(0.2), 1.0)
)
val zVec = z.map(new DenseVector(_))
    
  // Initial velocity and acceleration
val xInitial = Array[Double](0.001, acceleration)
val recursiveKalman = new RKalman(velocity(xInitial))
val predictedStates = recursiveKalman.apply(zVec)

The output of the Kalman predictor 𝑥 ̃is compared with the original measurements of velocity z and the output of the moving average with a window of 5 seconds, 𝑧 ̃.
The Kalman predictor uses a uniform random sampling rate over a window [4, 8].
Measurement sample:  ..,zn, zn+4+rand[0, 4], ....

The following plot illustrates the behavior of the Kalman predictor and non-weighted moving average for the first 95 seconds.

Raw measurements, 5 sec window Moving average, and Kalman predictions plot

The following plot compares the output of the Kalman predictor using a uniform random 
sampling rate over a window [4, 8] and [6, 12].

Impact of sampling method/interval on the output of Kalman predictor

References

[1] Introduction to Kalman Filter University of North Carolina G. Welsh, G. Bishop
[2] Apache Spark

-------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3

Appendix

This method updates the error covariance matrix P using the Kalman filter as defined in the equation 7.

 def updateErrorCovariance(kalmanGain: DenseMatrix): DenseMatrix = {
    val identity = DenseMatrix.eye(kalmanGain.numRows)
    val kHP = subtract(identity, kalmanGain.multiply(kalmanParams.H))
                                 .multiply(kalmanParams.P)
    val kH = subtract(identity, 
                                kalmanGain.multiply(kalmanParams.H).transpose)
    val kR = (kalmanGain.multiply(kalmanNoise.measureNoise))
                     .multiply(kalmanGain.transpose)
    
    add(kHP.multiply(kH), kR)
  }
}

The sampling method extract a subset of the original raw measurement using the formula \[z_{newIndex} \leftarrow z_{currentIndex+a+rand[0, b]}\]
def sampling(
  measurements: Array[DenseVector],    // Raw measurements (z)
  minSamplingInterval: Int,                      // Minimum sampling interval
  samplingInterval: Int): Seq[DenseVector] = {

  val rand = new Random(42L)

   // Next data: z(n+1) = z(n + minSamplingInterval + rand[0, sampling interval]
  val interval = rand.nextInt(samplingInterval) + minSamplingInterval
  
  measurements.indices.foldLeft(ListBuffer[DenseVector]())(
   (acc, index) => {
     if (index % interval == 0)
       acc.append(measurements(index))
     acc
   }
 )
}


Friday, December 1, 2023

Modular Markov Chain Monte Carlo in Python

Target audience: Intermediate
Estimated reading time: 5'

Sampling sits at the core of data science and analysis. This article explores a category of numerical sampling techniques, known as Markov Chain Monte Carlo, and how they can be implemented via reusable design patterns, using the Metropolis-Hastings model as an example.


       Theory
What you will learn: How to build modular Python Classes for MCMC with Bridge Pattern: A Metropolis-Hastings Implementation.

Notes:
  • Environments: Python 3.10.10, Numpy 1.25.1, Matplotlib 3.7.2
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.
  • Source code available at Github.com Data_Exploration/Stats

Markov Chain Monte Carlo 

Markov Chain Monte Carlo (MCMC) is a class of algorithms used in computational statistics to sample from probability distributions based on constructing a Markov chain that has the desired distribution as its equilibrium distribution. The core idea is to use a Markov chain to model the complex probability distribution you're interested in, where the chain's states represent the possible outcomes in the distribution. Over time, as the chain progresses, the states it visits are used to generate samples that are representative of the desired distribution.

MCMC methods are particularly useful in situations where direct sampling is difficult due to the complexity of the distribution or high-dimensional spaces. They are widely used in Bayesian statistics, where one needs to compute posterior distributions that are not analytically tractable [ref 1].


The result of a Markov Chain Monte Carlo (MCMC) procedure is a joint posterior probability distribution, or the combined probability of parameters theta, given certain data and a model/hypothesis, denoted as p(theta | Data, Model). 
The generic Naive Bayes rule is defined as \[posterior = \frac{likelihood\ .\ prior}{evidence}\]
In the case of MCMC, the log of the posterior is computed as
\[log(p(\theta |\mathfrak{D},\mathfrak{M}))=log(p(\mathfrak{D} |\theta,\mathfrak{M})) + log(p(\theta | \mathfrak{M})) - log(p(\mathfrak{D} |\mathfrak{M}))\]
\[for\ model\ \mathfrak{M},\ a\ sampled\ value\ \theta\ and\ data \ \mathfrak{D}\]
Essentially, MCMC can be understood as a method for estimating a credible or plausible range of model parameters that align with the observed data. The process of fitting a probability distribution emerges as a secondary outcome.

Metropolis-Hastings algorithm

The Metropolis-Hastings (MH) algorithm is one of the most famous MCMC techniques. It allows for sampling from a distribution by generating a sequence of sample points in a way that, even if you can't directly sample from the distribution itself, you can still obtain a set of samples that approximates the distribution after a sufficient number of iterations [ref 2]. Other popular MCMC methods include the Gibbs sampler and Hamiltonian Monte Carlo.

Theory

For this study, we consider a single variable probability distribution.

Generic algorithm:
The procedure at step i is as follows:
1 Sample the proposal distribution q \[(1)\ \  x^{*} \leftarrow q(x_{i}|x_{i-1})\] Compute the probability u of accepting the sample \[(2)\ \  u(\theta^{*} | \theta_{i})=min\left ( 1, \frac{\widetilde{p}(\theta^{*} ).q\left ( \theta_{i} | \theta^{*}\right )}{\widetilde{p}(\theta_{i} ).q\left ( \theta^{*} | \theta_{i}\right )}\right )\]
3 Select the next value \[(3)\ \  IF\ [ urand_{[0, 1]} < u\left (\theta ^{*} | \theta_{i} \right )]\ \theta_{i+1} = \theta^{*}\ \ \ ELSE\ \theta_{i+1} = \theta_{i}\]

Random walk:
This is the case where the proposal distribution is symmetric. Therefore the probability of accepting the sample is simplified as \[u(\theta^{*} | \theta_{i})=min\left ( 1, \frac{\widetilde{p}(\theta^{*} )}{\widetilde{p}(\theta_{i} )}\right )\]

Reusable design pattern

Reusable software design patterns are standardized solutions to common problems in software design and architecture. They represent best practices evolved through collective experience, providing a template or blueprint for tackling recurring design challenges. Design patterns help in making the software development process more efficient and the software itself more maintainable and scalable by offering proven solutions to typical issues [ref 3].
  • Creational Patterns deal with object creation mechanism.
  • Structural Patterns concern class and object composition to ensure a flexible architecture.
  • Behavioral Patterns are all about class's objects interactions. 

The bridge pattern is a structural pattern. It aims to separate an abstraction from its implementation, making them independent of each other. In this context, the structure of the proposal distribution is kept separate from the structure of the Markov Chain Monte Carlo, allowing any algorithm to choose any distribution freely.

Bridge pattern for Markov Chain Monte Carlo algorithms

The two abstract classes, ProposalDistribution and MCMC define the signature of method to be overridden by each specialized class.

Proposal distribution implementation

First let's define the signature of the proposal distribution

class ProposalDistribution(object):
    from abc import abstractmethod

    @abstractmethod
    def log_prior(self, theta: float) -> float:
        pass

    @abstractmethod
    def log_likelihood(self, theta: float) -> float:
        pass

    @abstractmethod
    def step(self, theta: float, sigma: float) -> float:
        pass

    def log_posterior(self, theta: float, prior: float) -> float:
        pass


Beta distribution:
The beta distribution is most appropriate for discrete distributions
The log likelihood for the Beta distribution is \[log(\mathfrak{L})=log\left ( \binom{N}{x} h^{\theta} h^{N-\theta} \right )\]
class ProposalBeta(ProposalDistribution):
    pi_2_inv = np.sqrt(2 * np.pi)

    def __init__(self, alpha: int, beta: int, num_trials: int, h: int):
        self.alpha = alpha
        self.beta = beta
        self.num_trials = num_trials
        self.h = h

    def log_prior(self, theta: float) -> float:
        x = stats.beta(self.alpha, self.beta).pdf(theta)
        return x if x > 0.0 else 1e-5

    def log_likelihood(self, theta: float) -> float:
        return math.log(stats.binom(self.num_trials, theta).pmf(self.h))

    def step(self, theta: float, sigma: float) -> float:
        return theta + stats.norm(0.0, sigma).rvs()

    def log_posterior(self, theta: float, prior: float) -> float:
        return self.log_likelihood(theta) + np.log(prior)


Normal distribution:
The log likelihood for the normal distribution is \[log(\mathfrak{L})=-\frac{N}{2}log(2\pi)-\frac{1}{2}\sum_{i=1}^{N}\left [ 2.log(\sigma_i) -{\frac{\theta_i^2}{\sigma_i^2}} \right ]\]

Metropolis-Hastings implementation

First, let's outline the signature or interface common, MCMC, to all Markov Chain Monte Carlo algorithms. The method 'sample' accepts the most recently sampled value, and produces a new sample.
class MCMC(object):

    @abstractmethod
    def sample(self, theta: float) -> float:
        pass

The MetropolisHastings class encapsulates the implementation of the Metropolis-Hastings (MH) algorithm. Its constructor sets up the algorithm's parameters:
  • proposal: The proposal distribution.
  • num_iterations: The total number of iterations or samples to be gathered.
  • burn_in_ratio: The proportion of samples allocated to the burn-in phase.
  • sigma_delta: Variance of Normal distribution used as step size for computing next sample.
Note:  It's important to note that the initial or seed value for the samples may not be precise. In fact, it's anticipated that the early samples generated before the algorithm stabilizes are unreliable and ought to be disregarded [ref 4]. These discarded samples are commonly known as the burn-in phase.

class MetropolisHastings(MCMC):

    def __init__(self,
                 proposal: ProposalDistribution,
                 num_iterations: int,
                 burn_in_ratio: float,
                 sigma_delta: float = 0.2):
       
        self.proposed_distribution = proposed_distribution
        self.num_iterations = num_iterations
        self.sigma_delta = sigma_delta
        self.burn_ins = int(num_iterations*burn_in_ratio)

    def sample(self, theta_0: float) -> (np.array, float):

The sampling method, sample, initiates the process with an initial, approximate sample. It outputs a tuple consisting of the samples gathered post-burn-in phase and the acceptance rate.
def sample(self, theta_0: float) -> (np.array, float):
     num_valid_thetas = self.num_iterations - self.burn_ins.  #1
     theta_walk = np.zeros(num_valid_thetas)
     accepted_count = 0
     theta = theta_0
     theta_walk[0] = theta       #2

     j = 0
     for i in range(self.num_iterations):
        theta_star = self.proposal_dist.step(theta, self.sigma_delta) #3 

        try:
           cur_prior = self.proposal_dist.log_prior(theta)  #4
           new_prior = self.proposal_dist.log_prior(theta_star)

           if cur_prior > 0.0 and new_prior > 0.0:
             cur_log_posterior = self.proposal_dist.posterior(theta, cur_prior)  #5
             new_log_posterior = self.proposal_dist.posterior(theta_star, new_prior)

             # 6 Apply the acceptance/rejection criteria
             if MetropolisHastings.__acceptance_rule(cur_log_posterior, new_log_posterior):
               theta = theta_star
               if i > self.burn_ins:   #7
                     accepted_count += 1
                     theta_walk[j + 1] = theta_star
                     j += 1
            else:
               if i > self.burn_ins:
                  theta_walk[j + 1] = theta_walk[j]
                  j += 1

         except ArithmeticError as e:
                print(f'Error {e}')
        return theta_walk, float(accepted_count) / num_valid_thetas


Theta values to be recorded must omit those sampled during the initial burn-in phase (1). The first sample essentially serves as the starting point for Theta (2). At every step, the process involves:
  1. Calculating the subsequent Theta value with the stepping function tied to the proposal distribution (3).
  2. Determining the logarithm of the prior for both the last and the proposed, current sample (4).
  3. Calculating the posterior by applying the logarithm to the Naive Bayes formula (5).
  4. The new posterior probability undergoes the acceptance criteria test (6).
  5. Provided the new sample isn't part of the burn-in phase, it is then recorded (7).
The acceptance probability, denoted as u, is determined by calculating the logarithms of the prior, likelihood, and posterior probability. Consequently, the acceptance test outlined in equation (3) necessitates transforming the difference (proposed theta minus previous theta) into an exponential form.
@staticmethod
def __acceptance_rule(current: float, new: float) -> bool:
    residual = new - current
    return True if new > current else np.random.uniform(0, 1) < np.exp(residual)


Evaluation

The goal is to assess how different elements influence both the acceptance rate and the results produced by the Metropolis-Hastings (MH) algorithm. The elements under consideration include:
  • Num iterations: The total number of iterations performed in the Monte Carlo simulation.
  • Proposal: Utilizes the Beta distribution with shapes 4 and 6 as the proposal distribution. The evaluation focuses solely on the success rate of the Binomial distribution applied in calculating the likelihood.
  • Initial Theta: The initially sampled value for the theta variable.
  • Theta step: The increment size used when sampling the subsequent value of theta.

Num iterationsProposalInitial ThetaTheta stepAcceptance rate
50004,6,0.50.50.10           0.480 
50004,6,0.50.50.95           0.061 
200004,6,0.50.50.95           0.077 
10004,6,0.50.50.95           0.052 
120004,6,0.50.10.50           0.121 
120004,6,0.50.90.50           0.134 
120004,6,0.50.50.50           0.125 
120004,6,0.90.50.50           0.080 
120004,6,0.10.50.50           0.079 


Impact of proposal distribution
In this first test we evaluate the impact of the proposal distribution on the prediction of the output theta. We consider 5000 iterations with no burn-in, initial value of 0.8 and compare two proposal distributions
  • Beta(4, 6) with success rate 0.88
  • Beta(4, 6) with success rate 0.12
Output MH with Beta(4,6) proposal w success rate 0.88, no burn-in, initial value 0.8
Output MH with Beta(4,6) proposal w success rate 0.12, no burn-in, initial value 0.8

Impact of step size
This test evaluates the impact of the step size for computing the next sample as: \[\theta^{*} = \theta_{i-1}+ N(0, \triangle \sigma)\]
Output MH with Beta(4,6) proposal 20% burn-in phase, initial value 0.8, delta sigma 0.05
Output MH with Beta(4,6) proposal 20% burn-in phase, initial value 0.8, delta sigma 0.95


Impact of initial value
This second test evaluates the impact of the initial value on the output of the MH algorithm. We use a 10% burn-in ratio with 12,000 iteration and a Beta(4, 2) proposal distribution with a 16% success rate.
Output MH with Beta(4,2) proposal w success rate 0.16, 10% burn-in, initial value 0.1
Output MH with Beta(4,2) proposal with success rate 0.16, 10% burn-in, initial value 0.1


Limitations

The primary challenges associated with the Metropolis-Hastings algorithm are computational in nature. When proposals are generated randomly, it often requires a significant number of iterations to reach regions of higher (or more probable) posterior densities. Moreover, even the more efficient versions of the Metropolis-Hastings algorithm may accept fewer than 25% of the proposed samples.

Finally, the Metropolis-Hastings algorithm dependents heavily on the selection of the proposal distribution. An inappropriate choice for the proposal distribution can result in suboptimal performance of the algorithm.