Tuesday, February 27, 2024

Geometric Learning in Python: Basics

Target audience: Beginner
Estimated reading time: 4'

Facing challenges with high-dimensional, densely packed but limited data, and complex distributions? Differential geometry offers a solution by enabling data scientists to grasp the true shape and distribution of data.

Table of contents
      Deep learning
Follow me on LinkedIn

What you will learn: You'll discover how differential geometry tackles the challenges of scarce data, high dimensionality, and the demand for independent representation in creating advanced machine learning models, such as graph or physics-informed neural networks.

Note
This article does not deal with the mathematical formalism of differential geometry or its implementation in Python.


Challenges 

Deep learning

Data scientists face challenges when building deep learning models that can be addressed by differential geometry. Those challenges are:
  • High dimensionality: Models related to computer vision or images deal with high-dimensional data, such as images or videos, which can make training more difficult due to the curse of dimensionality.
  • Availability of quality data: The quality and quantity of training data significantly affect the model's ability to generate realistic samples. Insufficient or biased data can lead to overfitting or poor generalization.
  • Underfitting or overfitting: Balancing the model's ability to generalize well while avoiding overfitting to the training data is a critical challenge. Models that overfit may generate high-quality outputs that are too similar to the training data, lacking novelty.
  • Embedding physics law or geometric constraints: Incorporating domain constraints such as boundary conditions or differential equations model s very challenging for high-dimensional data.
  • Representation dependence: The performance of many learning algorithms is very sensitive to the choice of representation (i.e. z-normalization impact on predictors).

Generative modeling

Generative modeling includes techniques such as auto-encoders, generative adversarial networks (GANs), Markov chains, transformers, and their various derivatives.

Creating generative models presents several specific challenges beyond plain vanilla deep learning models for data scientists and engineers, primarily due to the complexity of modeling and generating data that accurately reflects real-world distributions. The challenges that can be addressed with differential geometry include:
  • Performance evaluation: Unlike supervised learning models, assessing the performance of generative models is not straightforward. Traditional metrics like accuracy do not apply, leading to the development of alternative metrics such as the Frechet Inception Distance (FID) or Inception Score, which have their limitations.
  • Latent space interpretability: Understanding and interpreting the latent space of generative models, where the model learns a compressed representation of the data, can be challenging but is crucial for controlling and improving the generation process.


What is differential geometry

Differential geometry is a branch of mathematics that uses techniques from calculus, algebra and topology to study the properties of curves, surfaces, and higher-dimensional objects in space. It focuses on concepts such as curvature, angles, and distances, examining how these properties vary as one moves along different paths on a geometric object [ref 1]. 
Differential geometry is crucial in understanding the shapes and structures of objects that can be continuously altered, and it has applications in many fields including
physics (I.e., general relativity and quantum mechanics), engineering, computer science, and data exploration and analysis.

Moreover, it is important to differentiate between differential topology and differential geometry, as both disciplines examine the characteristics of differentiable (or smooth) manifolds but aim for different goals. Differential topology is concerned with the overarching structure or global aspects of a manifold, whereas differential geometry investigates the manifold's local and differential attributes, including aspects like connection and metric [ref 2].

In summary differential geometry provides data scientists with a mathematical framework facilitates the creation of models that are accurate and complex by leveraging geometric and topological insights [ref 3].


Applicability of differential geometry

Why differential geometry?

The following highlights the advantages of utilizing differential geometry to tackle the difficulties encountered by researchers in the creation and validation of generative models.

Understanding data manifolds: Data in high-dimensional spaces often lie on lower-dimensional manifolds. Differential geometry provides tools to understand the shape and structure of these manifolds, enabling generative models to learn more efficient and accurate representations of data.

Improving latent space interpolation: In generative models, navigating the latent space smoothly is crucial for generating realistic samples. Differential geometry offers methods to interpolate more effectively within these spaces, ensuring smoother transitions and better quality of generated samples.

Optimization on manifolds: The optimization processes used in training generative models can be enhanced by applying differential geometric concepts. This includes optimizing parameters directly on the manifold structure of the data or model, potentially leading to faster convergence and better local minima.

Geometric regularization: Incorporating geometric priors or constraints based on differential geometry can help in regularizing the model, guiding the learning process towards more realistic or physically plausible solutions, and avoiding overfitting.

Advanced sampling techniques: Differential geometry provides sophisticated techniques for sampling from complex distributions (important for both training and generating new data points), improving upon traditional methods by considering the underlying geometric properties of the data space.

Enhanced model interpretability: By leveraging the geometric structure of the data and model, differential geometry can offer new insights into how generative models work and how their outputs relate to the input data, potentially improving interpretability.

Physics-Informed Neural Networks:  Projecting physics law and boundary conditions such as set of partial differential equations on a surface manifold improves the optimization of deep learning models.

Innovative architectures: Insights from differential geometry can lead to the development of novel neural network architectures that are inherently more suited to capturing the complexities of data manifolds, leading to more powerful and efficient generative models. 

In summary, differential geometry equips researchers and practitioners with a deep toolkit for addressing the intrinsic challenges of generative AI, from better understanding and exploring complex data landscapes to developing more sophisticated and effective models [ref 3].

Representation independence

The effectiveness of many learning models greatly depends on how the data is represented, such as the impact of z-normalization on predictors. Representation Learning is the technique in machine learning that identifies and utilizes meaningful patterns from raw data, creating more accessible and manageable representations. Deep neural networks, as models of representation learning, typically transform and encode information into a different subspace. 
In contrast, differential geometry focuses on developing constructs that remain consistent regardless of the data representation method. It gives us a way to construct objects which are intrinsic to the manifold itself [ref 4].

Manifold and latent space

A manifold is essentially a space that, around every point, looks like Euclidean space, created from a collection of maps (or charts) called an atlas, which belongs to Euclidean space. Differential manifolds have a tangent space at each point, consisting of vectors. Riemannian manifolds are a type of differential manifold equipped with a metric to measure curvature, gradient, and divergence. 
In deep learning, the manifolds of interest are typically Riemannian due to these properties.

It is important to keep in mind that the goal of any machine learning or deep learning model is to predict p(y) from p(y|x) for observed features y given latent features x.\[p(y)=\int_{\Omega }^{} p(y|x).p(x)dx\].The latent space x can be defined as a differential manifold embedding in the data space (number of features of the input data).
Given a differentiable function f on a domain  a manifold of dimension d is defined by:
\[\mathit{M}=f(\Omega) \ \ \ with \ f: \Omega \subset \mathbb{R}^{d}\rightarrow \mathbb{R}^{d}\]
In a Riemannian manifold, the metric can be used to 
  • Estimate kernel density
  • Approximate the encoder function of an auto-encoder
  • Represent the vector space defined by classes/labels in a classifier
A manifold is usually visualized with a tangent space at give point/coordinates.

Illustration of a manifold and its tangent space


The manifold hypothesis states that real-world high-dimensional data lie on low-dimensional manifolds embedded within the high-dimensional space.

Studying data that reside on manifolds can often be done without the need for Riemannian Geometry, yet opting to perform data analysis on manifolds presents three key advantages [ref 5]:
  • By analyzing data directly on its residing manifold, you can simplify the system by reducing its degrees of freedom. This simplification not only makes calculations easier but also results in findings that are more straightforward to understand and interpret.
  • Understanding the specific manifold to which a dataset belongs enhances your comprehension of how the data evolves over time.
  • Being aware of the manifold on which a dataset exists enhances your ability to predict future data points. This knowledge allows for more effective signal extraction from datasets that are either noisy or contain limited data points.

Graph Neural Networks

Graph Neural Networks (GNNs) are a type of deep learning models designed to perform inference on data represented as graphs. They are particularly effective for tasks where the data is structured in a non-Euclidean manner, capturing the relationships and interactions between nodes in a graph.

Graph Neural Networks operate by conducting message passing across a graph, in which features are transmitted from one node to another through the connecting edges (diffusion process). For instance, the concept of Ricci curvature from differential geometry helps to alleviate congestion in the flow of messages [ref 6].

Physics-Informed Neural Networks

Physics-informed neural networks (PINNs) are versatile models capable of integrating physical principles, governed by partial differential equations, into the learning mechanism. They utilize these physical laws as a form of soft constraint or regularization during training, effectively addressing the challenge of limited data in certain engineering applications [ref 7].

Information geometry

Information geometry is a field that combines ideas from differential geometry and information theory to study the geometric structure of probability distributions and statistical models. It focuses on the way information can be quantified, manipulated, and interpreted geometrically, exploring concepts like distance and curvature within the space of probability distributions
This approach provides a powerful framework for understanding complex statistical models and the relationships between them, making it applicable in areas such as machine learning, signal processing, and more [ref 8].


Python libraries for differential geometry

There are numerous open-source Python libraries available, with a variety of focuses not exclusively tied to machine learning or generative modeling:

References

[8] Information Geometry: Near Randomness and Near Independence - 
      K. Arvin, CT Dodson - Springer-Verlag 2008



-------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3


Monday, February 12, 2024

Distributed Recursive Kalman Filter on Large Datasets

Target audience: Expert
Estimated reading time: 7'

Have you ever pondered the best way to efficiently operate a Kalman filter with a significantly large set of measurements? The predict and update/correct cycle of Kalman filtering can be computationally demanding when dealing with millions of records. Utilizing recursion and Apache Spark might be the solution you're looking for.

Table of contents

       Overview
       Noise
       Prediction
       Tail recursion
       Kalman predictor
       Recursive method
Follow me on LinkedIn

What you will learn: How to implement the Kalman Filter on a vast set of measurements utilizing multiple sampling techniques, recursion, and Apache Spark.

Notes:
  • Environments: Apache Spark 3.4.0, Scala 2.12.11
  • Source code available at GitHub github.com/patnicolas/kalman
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.
  • A simple Scala implementation of Kalman Filter is described in Discrete Kalman Predictor in Scala


The Challenge

The Internet of Things (IoT) produces an immense volume of data points. Implementing Kalman filtering on these measurements can require significant computational resources. A potential approach to manage this is by sampling the data to approximate its distribution. However, it's important to note that there's no assurance that the chosen sampling technique will maintain the original distribution of the raw data. 


Employing a combination of different types of samplers could help mitigate the effects of a reduced dataset on the precision of the Kalman filter's predictions.

Illustration of sampling and distribution of Kalman predictions using Spark

Kalman filter

First let's look at the Kalman optimal filter, and its implementation on Spark using fast recursion. Renowned in the realms of signal processing and statistical analysis, the Kalman filter serves as a potent tool to measure or estimate noise arising from processes and the disturbances introduced by measurement instruments [ref 1]. 

Overview

Kalman filter serves as an ideal estimator, determining parameters from imprecise and indirect measurements. Its goal is to reduce the mean square error associated with the model's parameters. Being recursive in nature, this algorithm is suited for real-time signal processing. However, one notable constraint of the Kalman filter is its need for the process to be linear, represented as y = a.f(x) + b.g(x) + .... 

Illustration of the Kalman prediction process

Noise

The state of a deterministic time linear dynamic system is the smallest vector that summarizes the past of the system in full and allow a theoretical prediction of the future behavior, in the absence of noise.
There are two source of noise:
  • Noise generated by the process following a normal distribution with zero mean and a Q variance, N(0,Q)
  • Noise generated by the measurement devices that also follows a Normal distribution N(0, R)
Based on an observation or measurement zn, the true state xn is forecasted using the previous state xn1 and the prior measurement zn1 through a process that alternates between prediction and updating, as illustrated in the diagram below:

Illustration of the sequence of operations in Kalman Filter

Prediction

After initialization, the Kalman Filter forecasts the system's state for the upcoming step and estimates the uncertainty associated with this prediction.

Considering An as the state transition model applied to the state xn1, Bn as the control input model applied to the control vector un if it exists, Qn as the covariance of the process noise, and Pn as the error covariance matrix, the forecasted state xis \[\begin{matrix} \widetilde{x}_{n/n-1}=A_{n}.\widetilde{x}_{n-1/n-1} + B_{n}.u_{n} \ \ (1)\\ P_{n/n-1}=A_{n}.P_{n-1/n-1}.A_{n}^{T}+Q_{n} \ \ (2) \end{matrix}\]

Measurement update & optimal gain

Upon receiving a measurement, the Kalman Filter adjusts or corrects the forecast and uncertainty of the current state. It then proceeds to predict future states, continuing this process. 
Thus, with a measurement zn1, a state xn1, and the innovation Sn, the Kalman Gain  and the error covariance are calculated according. \[\begin{matrix} S_{n}=H.P_{n/n-1}.H^{T} +R_{n} \ \ \ \ \  (3)\ \ \ \ \ \ \ \ \ \ \ \ \ \\ G_{n} = \frac{1}{S_{n}}.P_{n/n-1}.H^{T}\ \ \ \ \  (4) \ \ \ \ \ \ \ \ \ \ \ \ \ \ \  \ \ \ \ \ \ \\ \widetilde{x}_{n/n} = \widetilde{x}_{n/n-1}+G_{n}(z_{n}-H.\widetilde{x}_{n/n-1}) \ \ \ (5) \\ g_{n}=I - G_{n}.H \ \ \ \ \ (6) \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \\ P_{n,n}= g_{n}.P_{n/n-1}.g_{n}^{T}+G_{n}.R_{n}.G_{n}^{T} \ \ \ \ (7) \ \ \end{matrix}\]

Apache Spark

Apache Spark is a free, open-source framework for cluster computing, specifically designed to process data in real time via distributed computing [ref 2]. Its primary applications include:
  • Analytics: Spark's capability to quickly produce responses allows for interactive data handling, rather than relying solely on predefined queries.
  • Data Integration: Often, the data from various systems is inconsistent and cannot be combined for analysis directly. To obtain consistent data, processes like Extract, Transform, and Load (ETL) are employed. Spark streamlines this ETL process, making it more cost-effective and time-efficient.
  • Streaming: Managing real-time data, such as log files, is challenging. Spark excels in processing these data streams and can identify and block potentially fraudulent activities.
  • Machine Learning: The growing volume of data has made machine learning techniques more viable and accurate. Spark's ability to store data in memory and execute repeated queries swiftly facilitates the use of machine learning algorithms.

Implementation

Tail recursion

The Kalman filter operates as a recursive estimator, implying that it computes the current state's estimate using just the previous time step's estimated state and the current measurement. Consequently, its reliance on recursion for implementation is highly logical.
A recursive function is said to be tail-recursive if the recursive call is the last thing done in the function before returning. A tail-recursive is significantly faster than non-tail recursion because it does not require stack frames [ref 3].

For measurements zn the recursion process one measurement at the time:
    execute(zn, predictions):
        IF zn last measurement:
            return predictions
        ELSE:
           predict
           predicted_z = update
           predictions.add(predicted_z )
           execute(zn+1, prediction)


Process & measurement noises

We use Spark linear algebra types, DenseVector and DenseMatrix to represent vector and matrices associated with the Kalman parameters [ref 4].

case class KalmanNoise(
  qNoise: Double,          // Standard deviation of process noise
  rNoise: Double,          // Standard deviation of the measurement noise
  length: Int,                  // Number of features or rows associated with the noise
  noiseGen: Double => Double) {  // Distribution function for generating noise
 
  final def processNoise: DenseMatrix =
    new DenseMatrix(length, length, randMatrix(length, qNoise, noiseGen).flatten)
  
  final def measureNoise: DenseMatrix =
    new DenseMatrix(length, length, Array.fill(length*length)(noiseGen(qNoise)))
}

The KalmanNoise class is designed to not specifically assume that the process and measurement noises are Gaussian (white noise) to allow for a more generalized approach. However, in practical applications, these noise components often adhere to a normal distribution with a mean of 0 and standard deviations denoted by qNoise for process noise and rNoise for measurement noise, respectively.

Kalman parameters

To enhance clarity, the source code adopts the same naming convention for variables as those found in the equations for Kalman prediction and update. Adhering to object-oriented design principles, the KalmanParameters class specifies how to calculate the residuals, innovation, and the optimal Kalman gain using these parameters.

case class KalmanParameters(
  A: DenseMatrix,     // State transition dense matrix
  B: DenseMatrix,     // Optional Control dense matrix
  H: DenseMatrix,     // Measurement dense matrix
  P: DenseMatrix,     // Error covariance dense matrix
  x: DenseVector) {   // Estimated value dense vector

  private def HTranspose: DenseMatrix = H.transpose
  def ATranspose: DenseMatrix = A.transpose

  /**. Compute the difference residual = z - H.x. */
  def residuals(z: DenseVector): DenseVector = subtract(z, H.multiply(x))

  /** Compute S = H.P.H_transpose + measurement noise.  equation 3 */
  def innovation(measureNoise: DenseMatrix): DenseMatrix =
     add(H.multiply(P).multiply(HTranspose), measureNoise)

  /**. Compute the Kalman gain G = P * H_transpose/S.  equation 4*/
  def gain(S: DenseMatrix): DenseMatrix = {
     val invStateMatrix = inv(S)
     P.multiply(HTranspose).multiply(invStateMatrix)
  }
}


Kalman predictor

The RKalman class, designed for implementing the recursive sequence of predict-update, accepts two parameters:
  • Initial parameters for the Kalman filter, named initialParams.
  • The implicit process and measurement noises, referred to as kalmanNoise.

The predict method executes the two predictive equations, labeled (1) and (2). It allows for an optional control input variable U as its argument. The update method then proceeds to refresh the Kalman parameters, specifically x and the error covariance matrix P, before calculating the optimal Kalman gain.


class RKalman(initialParams: KalmanParameters)(implicit kalmanNoise: KalmanNoise){
  private[this] var kalmanParams: KalmanParameters = initialParams


  def apply(z: Array[DenseVector]): List[DenseVector] = { .. }

  /**   x(t+1) = A.x(t) + B.u(t) + Q
   *    P(t+1) = A.P(t)A^T^ + Q    */
  def predict(U: Option[DenseVector] = None): DenseVector = {
    // Compute the first part of the state equation S = A.x
    val newX = kalmanParams.A.multiply(kalmanParams.x) // Equation (1)
    
    // Add the control matrix if u is provided  S += B.u
    val correctedX = U.map(u => kalmanParams.B.multiply(u)).getOrElse(newX) 
    
    // Update the error covariance matrix P as P(t+1) = A.P(t).A_transpose + Q
    val newP = add(             // Equation (2)
      kalmanParams.A.multiply(kalmanParams.P).multiply(kalmanParams.ATranspose),
      kalmanNoise.processNoise
    )
    // Update the kalman parameters
    kalmanParams = kalmanParams.copy(x = correctedX, P = newP)
    kalmanParams.x
  }

  /** Implement the update of the state x and error covariance P given the 
   *  measurement z and compute the Kalman gain */
  def update(z: DenseVector): DenseMatrix = {
    val y = kalmanParams.residuals(z)
    val S = kalmanParams.innovation(kalmanNoise.measureNoise) // Equation (3)
    val kalmanGain: DenseMatrix = kalmanParams.gain(S)          // Equation (4)
    
    val nextX = add(kalmanParams.x, kalmanGain.multiply(y))     // Equation (5)
    kalmanParams = kalmanParams.copy(x = nextX)
    
    val nextP = updateErrorCovariance(kalmanGain)               // Equation (7)
    kalmanParams = kalmanParams.copy(P = nextP)
    kalmanGain
  }
}

Recursive method

Let's apply tail recursion to an array of measurements specified as a DenseVector type. The recursion, wrapped in method recurse, stops after processing the final measurement, zlast. If not, the Kalman filter's current parameters are subjected to a predict-update cycle. This cycle is executed for the measurement at zindex, after which the method recursively invokes itself for the subsequent measurement at zindex+1.

def recurse(z: Array[DenseVector]): List[DenseVector] = {

  @tailrec
  def execute(
    z: Array[DenseVector],
    index: Int,
    predictions: ListBuffer[DenseVector]): List[DenseVector] = {
      if (index >= z.length)  // Criteria to end recursion
        predictions.toList
      else {
        val nextX = predict()
        val estimatedZ: DenseVector = kalmanParams.H.multiply(nextX)
        predictions.append(estimatedZ)
        update(z(index))
            
         // Execute the next measurement points
        execute(z, index + 1, predictions)
     }
  }

  execute(z, 0, ListBuffer[DenseVector]())
}

Sampling-based estimator

We are prepared to utilize the recursive method, apply(), to sample the raw measurement set z using various sampling algorithms and process each resulting sample in parallel with Apache Spark, where each sample is allocated to a partition. In our approach, we employ a random uniform sampler with varying parameters. 

For the current measurement zn, the subsequent value to be gathered is \[z_{next} \leftarrow z_{n+a+rand[0, b]}\]After creation, the samples are processed in parallel using Spark's mapPartitions methods. Each worker node creates an instance of RKalman and makes a recursive call using the method, recurse.

def apply(
  kalmanParams: KalmanParameters,// Kalman parameters used by the filter/predictor
  z: Array[DenseVector],         // Series of observed measurements as dense vector
  numSamplingMethods: Int,  // Number of samples to be processed concurrently
  minSamplingInterval: Int,     // Minimum number of samples to be ignored between sampling
  samplingInterval: Int            // Range of random sampling
)(implicit sparkSession: SparkSession): Seq[Seq[DenseVector]] = {

  // Generate the various samples from the large set of raw measurements
  val samples: Seq[Seq[DenseVector]] = (0 until numSamplingMethods).map(
   _ => sampling(z, minSamplingInterval, samplingInterval)
  )

  // Distribute the Kalman prediction-correction cycle over Spark workers
  // by assigning a partition to a Kalman process and sampled measurement.
  val samplesDS = samples.toDS()
    val predictionsDS = samplesDS.mapPartitions(
     (sampleIterator: Iterator[Seq[DenseVector]]) => {
       val acc = ListBuffer[Seq[DenseVector]]()

       while(sampleIterator.hasNext) {
         implicit val kalmanNoise: KalmanNoise = KalmanNoise(kalmanParams.A.numRows)
          
         val rKalman = new RKalman(kalmanParams)
         val z = sampleIterator.next()
         acc.append(rKalman.recurse(z))
      }
      acc.iterator
    }
  ).persist()
 
  predictionsDS.collect()
}

The code for our sampling method is listed in the appendix.


Use case

Our implementation is evaluated with measurement of the velocity of a rocket given a constant acceleration. The error covariance, P is initialized with a mean value of 0.5.

def velocity(x: Array[Double]): KalmanParameters = {
   val acceleration = 0.0167

   val A = Array[Array[Double]](            //  State transition dense matrix
     Array[Double](1.0, acceleration), Array[Double](0.0, 1.0)
   )
   val H = Array[Array[Double]](          // Measurement dense matrix
     Array[Double](1.0, 0.0), Array[Double](0.0, 0.0)
   )
   val P = Array[Array[Double]](         // Error covariance dense matrix
     Array[Double](0.5, 0.0), Array[Double](0.0, 0.5)
   )
   KalmanParameters(A, None, H, Some(P), x)
}

We simulate the raw measurements for the velocity using the simple formula \[\begin{vmatrix} v(x) \\ 1 \end{vmatrix} \ \ with\ \ v(x)=0.01.x^{2}+\frac{0.002}{x+2}+N(0, 0.2)\].
 // Simulated velocity measurements
val z = Array.tabulate(2000)(n =>
  Array[Double](n * n * 0.01 + 0.002 / (n + 2) + normalRandomValue(0.2), 1.0)
)
val zVec = z.map(new DenseVector(_))
    
  // Initial velocity and acceleration
val xInitial = Array[Double](0.001, acceleration)
val recursiveKalman = new RKalman(velocity(xInitial))
val predictedStates = recursiveKalman.apply(zVec)

The output of the Kalman predictor 𝑥 ̃is compared with the original measurements of velocity z and the output of the moving average with a window of 5 seconds, 𝑧 ̃.
The Kalman predictor uses a uniform random sampling rate over a window [4, 8].
Measurement sample:  ..,zn, zn+4+rand[0, 4], ....

The following plot illustrates the behavior of the Kalman predictor and non-weighted moving average for the first 95 seconds.

Raw measurements, 5 sec window Moving average, and Kalman predictions plot

The following plot compares the output of the Kalman predictor using a uniform random 
sampling rate over a window [4, 8] and [6, 12].

Impact of sampling method/interval on the output of Kalman predictor

References

[1] Introduction to Kalman Filter University of North Carolina G. Welsh, G. Bishop
[2] Apache Spark

-------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3

Appendix

This method updates the error covariance matrix P using the Kalman filter as defined in the equation 7.

 def updateErrorCovariance(kalmanGain: DenseMatrix): DenseMatrix = {
    val identity = DenseMatrix.eye(kalmanGain.numRows)
    val kHP = subtract(identity, kalmanGain.multiply(kalmanParams.H))
                                 .multiply(kalmanParams.P)
    val kH = subtract(identity, 
                                kalmanGain.multiply(kalmanParams.H).transpose)
    val kR = (kalmanGain.multiply(kalmanNoise.measureNoise))
                     .multiply(kalmanGain.transpose)
    
    add(kHP.multiply(kH), kR)
  }
}

The sampling method extract a subset of the original raw measurement using the formula \[z_{newIndex} \leftarrow z_{currentIndex+a+rand[0, b]}\]
def sampling(
  measurements: Array[DenseVector],    // Raw measurements (z)
  minSamplingInterval: Int,                      // Minimum sampling interval
  samplingInterval: Int): Seq[DenseVector] = {

  val rand = new Random(42L)

   // Next data: z(n+1) = z(n + minSamplingInterval + rand[0, sampling interval]
  val interval = rand.nextInt(samplingInterval) + minSamplingInterval
  
  measurements.indices.foldLeft(ListBuffer[DenseVector]())(
   (acc, index) => {
     if (index % interval == 0)
       acc.append(measurements(index))
     acc
   }
 )
}