Monday, December 16, 2024
Posts History
Wednesday, December 13, 2023
Distributed Recursive Kalman Filter on Large Datasets
Estimated reading time: 7'
- Environments: Apache Spark 3.4.0, Scala 2.12.11
- Source code available at GitHub github.com/patnicolas/kalman
- To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.
- A simple Scala implementation of Kalman Filter is described in Discrete Kalman Predictor in Scala
The Challenge
The Internet of Things (IoT) produces an immense volume of data points. Implementing Kalman filtering on these measurements can require significant computational resources. A potential approach to manage this is by sampling the data to approximate its distribution. However, it's important to note that there's no assurance that the chosen sampling technique will maintain the original distribution of the raw data.
Employing a combination of different types of samplers could help mitigate the effects of a reduced dataset on the precision of the Kalman filter's predictions.
Kalman filter
Overview
Noise
- Noise generated by the process following a normal distribution with zero mean and a Q variance, N(0,Q)
- Noise generated by the measurement devices that also follows a Normal distribution N(0, R)
Prediction
Measurement update & optimal gain
Apache Spark
- Analytics: Spark's capability to quickly produce responses allows for interactive data handling, rather than relying solely on predefined queries.
- Data Integration: Often, the data from various systems is inconsistent and cannot be combined for analysis directly. To obtain consistent data, processes like Extract, Transform, and Load (ETL) are employed. Spark streamlines this ETL process, making it more cost-effective and time-efficient.
- Streaming: Managing real-time data, such as log files, is challenging. Spark excels in processing these data streams and can identify and block potentially fraudulent activities.
- Machine Learning: The growing volume of data has made machine learning techniques more viable and accurate. Spark's ability to store data in memory and execute repeated queries swiftly facilitates the use of machine learning algorithms.
Implementation
Tail recursion
Process & measurement noises
case class KalmanNoise(
qNoise: Double, // Standard deviation of process noise
rNoise: Double, // Standard deviation of the measurement noise
length: Int, // Number of features or rows associated with the noise
noiseGen: Double => Double) { // Distribution function for generating noise
final def processNoise: DenseMatrix =
new DenseMatrix(length, length, randMatrix(length, qNoise, noiseGen).flatten)
final def measureNoise: DenseMatrix =
new DenseMatrix(length, length, Array.fill(length*length)(noiseGen(qNoise)))
}
Kalman parameters
case class KalmanParameters(
A: DenseMatrix, // State transition dense matrix
B: DenseMatrix, // Optional Control dense matrix
H: DenseMatrix, // Measurement dense matrix
P: DenseMatrix, // Error covariance dense matrix
x: DenseVector) { // Estimated value dense vector
private def HTranspose: DenseMatrix = H.transpose
def ATranspose: DenseMatrix = A.transpose
/**. Compute the difference residual = z - H.x. */
def residuals(z: DenseVector): DenseVector = subtract(z, H.multiply(x))
/** Compute S = H.P.H_transpose + measurement noise. equation 3 */
def innovation(measureNoise: DenseMatrix): DenseMatrix =
add(H.multiply(P).multiply(HTranspose), measureNoise)
/**. Compute the Kalman gain G = P * H_transpose/S. equation 4*/
def gain(S: DenseMatrix): DenseMatrix = {
val invStateMatrix = inv(S)
P.multiply(HTranspose).multiply(invStateMatrix)
}
}
Kalman predictor
- Initial parameters for the Kalman filter, named initialParams.
- The implicit process and measurement noises, referred to as kalmanNoise.
The predict method executes the two predictive equations, labeled (1) and (2). It allows for an optional control input variable U as its argument. The update method then proceeds to refresh the Kalman parameters, specifically x and the error covariance matrix P, before calculating the optimal Kalman gain.
class RKalman(initialParams: KalmanParameters)(implicit kalmanNoise: KalmanNoise){
private[this] var kalmanParams: KalmanParameters = initialParams
def apply(z: Array[DenseVector]): List[DenseVector] = { .. }
/** x(t+1) = A.x(t) + B.u(t) + Q
* P(t+1) = A.P(t)A^T^ + Q */
def predict(U: Option[DenseVector] = None): DenseVector = {
// Compute the first part of the state equation S = A.x
val newX = kalmanParams.A.multiply(kalmanParams.x) // Equation (1)
// Add the control matrix if u is provided S += B.u
val correctedX = U.map(u => kalmanParams.B.multiply(u)).getOrElse(newX)
// Update the error covariance matrix P as P(t+1) = A.P(t).A_transpose + Q
val newP = add( // Equation (2)
kalmanParams.A.multiply(kalmanParams.P).multiply(kalmanParams.ATranspose),
kalmanNoise.processNoise
)
// Update the kalman parameters
kalmanParams = kalmanParams.copy(x = correctedX, P = newP)
kalmanParams.x
}
/** Implement the update of the state x and error covariance P given the
* measurement z and compute the Kalman gain */
def update(z: DenseVector): DenseMatrix = {
val y = kalmanParams.residuals(z)
val S = kalmanParams.innovation(kalmanNoise.measureNoise) // Equation (3)
val kalmanGain: DenseMatrix = kalmanParams.gain(S) // Equation (4)
val nextX = add(kalmanParams.x, kalmanGain.multiply(y)) // Equation (5)
kalmanParams = kalmanParams.copy(x = nextX)
val nextP = updateErrorCovariance(kalmanGain) // Equation (7)
kalmanParams = kalmanParams.copy(P = nextP)
kalmanGain
}
}
Recursive method
def recurse(z: Array[DenseVector]): List[DenseVector] = {
@tailrec
def execute(
z: Array[DenseVector],
index: Int,
predictions: ListBuffer[DenseVector]): List[DenseVector] = {
if (index >= z.length) // Criteria to end recursion
predictions.toList
else {
val nextX = predict()
val estimatedZ: DenseVector = kalmanParams.H.multiply(nextX)
predictions.append(estimatedZ)
update(z(index))
// Execute the next measurement points
execute(z, index + 1, predictions)
}
}
execute(z, 0, ListBuffer[DenseVector]())
}
Sampling-based estimator
def apply(
kalmanParams: KalmanParameters,// Kalman parameters used by the filter/predictor
z: Array[DenseVector], // Series of observed measurements as dense vector
numSamplingMethods: Int, // Number of samples to be processed concurrently
minSamplingInterval: Int, // Minimum number of samples to be ignored between sampling
samplingInterval: Int // Range of random sampling
)(implicit sparkSession: SparkSession): Seq[Seq[DenseVector]] = {
// Generate the various samples from the large set of raw measurements
val samples: Seq[Seq[DenseVector]] = (0 until numSamplingMethods).map(
_ => sampling(z, minSamplingInterval, samplingInterval)
)
// Distribute the Kalman prediction-correction cycle over Spark workers
// by assigning a partition to a Kalman process and sampled measurement.
val samplesDS = samples.toDS()
val predictionsDS = samplesDS.mapPartitions(
(sampleIterator: Iterator[Seq[DenseVector]]) => {
val acc = ListBuffer[Seq[DenseVector]]()
while(sampleIterator.hasNext) {
implicit val kalmanNoise: KalmanNoise = KalmanNoise(kalmanParams.A.numRows)
val rKalman = new RKalman(kalmanParams)
val z = sampleIterator.next()
acc.append(rKalman.recurse(z))
}
acc.iterator
}
).persist()
predictionsDS.collect()
}
Use case
Our implementation is evaluated with measurement of the velocity of a rocket given a constant acceleration. The error covariance, P is initialized with a mean value of 0.5.def velocity(x: Array[Double]): KalmanParameters = {
val acceleration = 0.0167
val A = Array[Array[Double]]( // State transition dense matrix
Array[Double](1.0, acceleration), Array[Double](0.0, 1.0)
)
val H = Array[Array[Double]]( // Measurement dense matrix
Array[Double](1.0, 0.0), Array[Double](0.0, 0.0)
)
val P = Array[Array[Double]]( // Error covariance dense matrix
Array[Double](0.5, 0.0), Array[Double](0.0, 0.5)
)
KalmanParameters(A, None, H, Some(P), x)
}
// Simulated velocity measurements
val z = Array.tabulate(2000)(n =>
Array[Double](n * n * 0.01 + 0.002 / (n + 2) + normalRandomValue(0.2), 1.0)
)
val zVec = z.map(new DenseVector(_))
// Initial velocity and acceleration
val xInitial = Array[Double](0.001, acceleration)
val recursiveKalman = new RKalman(velocity(xInitial))
val predictedStates = recursiveKalman.apply(zVec)
References
[1] Introduction to Kalman Filter University of North Carolina G. Welsh, G. BishopPatrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning.
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3
Appendix
def updateErrorCovariance(kalmanGain: DenseMatrix): DenseMatrix = {
val identity = DenseMatrix.eye(kalmanGain.numRows)
val kHP = subtract(identity, kalmanGain.multiply(kalmanParams.H))
.multiply(kalmanParams.P)
val kH = subtract(identity,
kalmanGain.multiply(kalmanParams.H).transpose)
val kR = (kalmanGain.multiply(kalmanNoise.measureNoise))
.multiply(kalmanGain.transpose)
add(kHP.multiply(kH), kR)
}
}
def sampling(
measurements: Array[DenseVector], // Raw measurements (z)
minSamplingInterval: Int, // Minimum sampling interval
samplingInterval: Int): Seq[DenseVector] = {
val rand = new Random(42L)
// Next data: z(n+1) = z(n + minSamplingInterval + rand[0, sampling interval] val interval = rand.nextInt(samplingInterval) + minSamplingInterval
measurements.indices.foldLeft(ListBuffer[DenseVector]())(
(acc, index) => {
if (index % interval == 0)
acc.append(measurements(index))
acc
}
)
}