Monday, June 21, 2021

Open Source Lambda architecture for Deep Learning

Target audience: Beginner
Estimated reading time: 4'

Data scientists familiar with Python's scientific libraries have found their landscape transformed with the rise of 'big data' frameworks like Apache Hadoop, Spark, and Kafka.

This article unveils a modified Lambda architecture and succinctly delves into the smooth amalgamation of diverse open-source elements. Herein, we provide a broad perspective on the pivotal services inherent to a standard architecture.


Table of contents
References
Follow me on LinkedIn

Conceptual data flow

The concept and architecture are versatile enough to accommodate a variety of open source, commercial solutions and services beside the frameworks prescribed in this presentation. The open source framework PyTorch is used to illustrate the integration of big data framework such as Apache Kafka and Spark with deep learning library to train, validate and test deep learning models.
Alternative libraries such as Keras or Tensor Flow could be also used.

Let's consider the use case of training and validating a deep learning model, using Apache Spark to load, parallelize and pre-process the data. Apache Spark takes advantage of large number of servers and CPU cores.



In this simple design, the workflow is broken down into 6 steps
  1. Apache Spark load then parallelize training data from AWS S3 
  2. Spark distributed the data pre-processing, cleansing, normalization across multiple worker nodes
  3. Spark forward the processed data to PyTorch cluster
  4. Flask converts requests to prediction query to PyTorch model
  5. PyTorch model generate a prediction
  6. Run-time metrics are broadcast through Kafka

Key services


PyTorch is an optimized tensor library for deep learning using GPUs and CPUs.It extends the functionality of Numpy and Scikit-learn to support the training, evaluation and commercialization of complex machine learning models.



Apache Spark is an open source cluster computing framework for fast real-time processing. 
It supports Scala, Java, Python and R programming languages and includes streaming, graph and machine learning libraries.


Apache Kafka is an open-source distributed event streaming framework to large scale, real-time data processing and analytics. 
It captures data from various sources in real-time as a continuous flow and routes it to the appropriate processor. 



Ray-tune is a distributed hyper-parameters tuning framework particularly suitable to deep learning models.  It reduces significantly the cost of optimizing the configuration of a model. It is a wrapper around other open source library 



Apache Hive is an open source data warehouse platform that facilitates reading, writing, and managing large datasets residing in distributed storages such as Hadoop and Apache Spark


Flask is Python-based web development platform built as a micro-framework to support REST protocol. Its minimalist approach to web interface makes is a very intuitive tool to be build micro-services.


Amazon Simple Storage Server (S3) is a highly available, secure object storage service with a very high durability factor (11 sigma) and scalability and support for versioning. It is versatile enough to accommodate any kind of data format.



Thank you for reading this article. For more information ...

References


Note: This informational post introduced the high level components of a Lambda architecture. Such orchestration of services is the foundation of iterative machine learning modeling concept known as MLOps. MLOps will be discussed in a future post.


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3



Sunday, May 30, 2021

Discrete Kalman Predictor in Scala

Target audience: Advanced
Estimated reading time: 5'

In this article, we embark on an exploration of the Kalman optimal filter, employing the Scala programming language for its implementation. Renowned in the realms of signal processing and statistical analysis, the Kalman filter serves as a potent tool to measure or estimate noise arising from processes and the disturbances introduced by measurement instruments.


Table of contents
Follow me on LinkedIn
Notes
  • The implementation relies on Scala 2.12.4
  • For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.

Overview  

A Kalman filter serves as an ideal estimator, determining parameters from imprecise and indirect measurements. Its goal is to reduce the mean square error associated with the model's parameters. Being recursive in nature, this algorithm is suited for real-time signal processing. However, one notable constraint of the Kalman filter is its need for the process to be linear, represented as y = a.f(x) + b.g(x) + .... 

Both the process and measurements introduce gaussian noise that affects the state.

Illustration of the Kalman filter as an estimator

Covariance 

The  Kalman filter represents the estimated state and error state as a covariance matrix. The covariance of a random vector x = { ..  x  .. } is a n by n positive definite, symmetric matrix with the variance of each variable as diagonal elements \[cov(\mathbf{x})= E[(\mathbf{x} - \overline{\mathbf{x}})(\mathbf{x} - \overline{\mathbf{x}} )^{t}] = \int_{-\infty }^{\infty} .. \int_{-\infty }^{\infty}(\mathbf{x} - \overline{\mathbf{x}})(\mathbf{x} - \overline{\mathbf{x}} )^{t}\,p(\mathbf{x})\,dx_{1} .. dx_{n}\] Such a matrix can be diagonalized by computing the eigenvectors (or basis vectors) in order to decouple the contribution of the noise or errors.


Process and measurement noises

The state of a deterministic time linear dynamic system is the smallest vector that summarizes the past of the system in full and allow a theoretical prediction of the future behavior, in the absence of noise. If x(k) is the n-dimension state vector at step k, u(k) the input vector, w(k) the unknown process noise vector normalized for a zero mean, and R(k) the covariance matrix for the measurement noise at step k, z the actual measurement of the state at step k, then \[\mathbf{x}_{k+1} = A_{k}\mathbf{x}_{k} + B_{k}\mathbf{u}_{k} + \mathbf{w}_{k}\,\,\,(1)\,\,with\,\,R_{k} = E[\mathbf{w}_{k}.\mathbf{w}_{k}^T]\\\mathbf{z}_{k} = H_{k}.\mathbf{x}_{k} + \mathbf{v}_{k}\,\,\,(2)\,\,\,\,with\,\,\,Q_{k} = E[\mathbf{v}_{k}.\mathbf{v}_{k}^T]\] where H(k) is the measurement equation related to the state A(k) and Q(k) is covariance matrix of the process noise at step k. We assume that the covariance matrix for the measurement noise, R and the covariance matrix for the error noise Q follow a Gaussian probability distribution.

We use the matrix classes and methods defined in the Apache Common Math open source library Commons Math 3.3

Filter

We leverage the support for functional constructs provided in Scala. Validation of method arguments, exceptions and non essential supporting methods are omitted for the sake of readability of the code snippets.

First we need to define the noise q generated by the linear process and the noise r generated by the measurement device. Those functions are defines as members of the QRNoise class. These white noise are indeed Gaussian random distributions for the noise on the process and measurement. The validation that the noise distribution follows a normal distribution is omitted.

case class QRNoise(qr: XY, white: Double=> Double) {
    // Compute the white noise for process Q
   private def q = white(qr._1) 

    // Compute the white noise for measurement R
   private def r = white(qr._2) 

    // Compute the white noise of the measurement  
   def noisyQ: Array[Double] = Array[Double](q, q)

    // Compute the white noise on the measurement
   def noisyR: Array[Double] = Array[Double](r, r)
}

Next, we need to define some internal types The discrete Kalman class, DKalman class has two objectives: 
  • Encapsulates the types and method defined in Apache Common Math used in the generation of the state and error equations
  • Manage the state of the process

The class DKalman takes 5 arguments
  • A: State transition matrix (or model) defined in the first state equation (line 4)
  • B: Control/input matrix/model of the state equation (line 5)
  • H: Observation matrix/model of the prediction equation(line 6)
  • P: Noise covariance matrix (line 7)
  • qrNoise: Implicit process/model and measurement, observation noises

These model input parameters are used to compute the following values
  • Q Process white noise (line 12)
  • R Measurement white noise (line 17)
  • filter Instance of the Apache common math Kalman filter class (line 21)
  • x Current state vector (line 22)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type DblMatrix = Array[Array[Double]]

final protected class DKalman(
     A: DblMatrix,  
     B: DblMatrix,  
     H: DblMatrix, 
     P: DblMatrix)(implicit qrNoise: QRNoise) {
 
  type XYSeries = Array[(Double, Double)]

       // Process related white noise (mean = 0.0)
  private[this] val Q = new DblMatrix(A.size).map(_ =>
      Array.fill(A(0).size)(qrNoise.qr._1) 
  )

       // Measurement related white noise (mean = 0.0)
  private[this] val R = new DblMatrix(A.size).map(_ => 
      Array.fill(A(0).size)(qrNoise.qr._2) 
  )

  private var filter: KalmanFilter = _
  private var x: RealVector = _
  
   // Main filtering routine
  def filter(xt: XYSeries): XYSeries  = {}
  private def initialize(input: DblVector): Unit = {}
   
   // Compute the new state of the Kalman iterative computation
  private def newState: DblVector = {}

  // Compute the least squared errors of two vectors of type 'RealVector'
  def lsError(x: RealVector, z: RealVector): Double = {}
}

The method filter. below implements the basic sequences of the execution of an iteration of the update of the state of the process
  1. predict the state of the process at the next step (x, A)
  2. extract or generate noise for the process and the measurement devices (w, v)
  3. update the state of the process (x)
  4. computes the error on the measurement (z)
  5. adjust the covariance matrices

Note: The control input u and initial state x0 are defined as arguments of the main method because they are independent from the model.

The two main stages of the Kalman filter are
- Initialization of the components used in the prediction and correction equation initialize (line 8)
- Execution of the prediction/correction cycle nextState (line 11)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import org.apache.commons.math3.linear._
import org.apache.commons.math3.filter._

def filter(xt: XYSeries): XYSeries = xt.map{ 
  case (x, y) => {

     // Initialize the filter
     initialize(Array[Double](x, y))  

     // Extract the new state a two values vector
     val nState = newState
     (nState(0), nState(1))
  }
}

State equation

The method newState implements the iterative state equation of the model
x = A.x + B.u + Q (line 7)
z - H.x + R (line 11)
described in the introduction.


The prediction phase is executed by invoking the Kalman.predict method of the Apache Commons Math library (line 6)and the correction phase relies on the Kalman.correct of the same Java library (line 14).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
private def newState: Vector[Double] = {

    // Update the filter with the predictive value for x
    // and update it with the A transition matrix with the 
    // process noise qr.Q
    filter.predict
    x = A.operate(x).add(qrNoise.noisyQ) 

    // Compute the measured value z with the new update value 
    // using the measurement-statement dependency matrix H
    val z = H.operate(x).add(qrNoise.noisyR)
  
    // Update the filter with the new estimated measure z
    filter.correct(z)
    filter.getStateEstimation 
}

The method initalize create two instances of classes defined in the Apache commons math
  • pModel to model the process with the relevant matrices (line 2)
  • mModel for the measurement (line 3)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def initialize(input: Vector[Double]): Unit = {   
    val pModel = new DefaultProcessModel(A, B, Q, input, P)
    val mModel = new DefaultMeasurementModel(H, R)
   
    // Create a Kalman filter with a model pModel for 
    // the process and a model mModel for the measurement.
    filter = new KalmanFilter(pModel, mModel)

    // Conversion to Apache Commons Math internal types
    x = new ArrayRealVector(input)
}

Lastly, we need to compute the least squares error of the predicted states compared to the actual state values. The type RealVector is defined in the Apache commmon math library.

def lsError(x: RealVector, z: RealVector): Double = {
   val sumSqr = x.toArray
             .zip(z.toArray)
             .map(xz => (xz._1 - xz._2))
             .map( x => x*x).sum
   sqrt(sumSqr)
}


Use case

We will use a simple example of the Newton law of gravity.  If x is the weight of an object, the differential equation can be integrated with a step 1 as follows \[\frac{\mathrm{d}^2 y_{t}}{\mathrm{d} t^{2}} + g = 0\,\,\Rightarrow\\ y_{t+dt} = y_{t}+ \dot{y}_{t}\,dt - \frac{g}{2}\,dt^{2}\,\,\,;\,\,\dot{y}_{t+1} = \dot{y}_{t} - g\,dt\,\,\,(\mathbf{3})\] The state vector x(k) for object at time k is defined by \[\mathbf{x}_{k} = [y_{k},\frac{\mathrm{d} y_{k}}{\mathrm{d} x}]^{T}\] and the state equation   \[\mathbf{x}_{k+1} = A_{k}.\mathbf{x}_{k} + B_{k}.\mathbf{u}_{k}\,\,\,with\,\,\,A_{k} =\begin{vmatrix} 1 & dt\\ 0 & 1 \end{vmatrix}\,\,,\,B_{k}=\begin{vmatrix}0.5.dt.dt \\ dt \end{vmatrix}\] We use the Apache Commons Math library version 3.0 (Apache Commons Math User Guide to filter and predict the motion of a body subjected to gravity.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
val dt = 0.05
val g = 9.81
val initialHeight = 100.0
val initialSpeed = 0.0
val variance = 0.014

  // Forces an implicit type conversion
val A: DblMatrix = (
   (1.0, dt),
   (0.0, 1.0)
)
val B: DblMatrix = (
    0.5*g*dt*dt, 
    g*dt
)
val H: DblMatrix = (1.0, 0.0)
val Q: DblMatrix = (
   (0.25*variance*pow(dt, 4), variance*pow(dt,3)/2), 
   (variance*pow(dt,3)/2,     variance*dt*dt)
)
val P0: DblMatrix = (
   (0.02, 0.0), 
   (0.0, 0.03)
)

  // Initialize the drop at 100 feet with no speed
val x0 = Array[Double](initialHeight, initialSpeed)
  
val qrNoise = new QRNoise((0.7, 0.3), (m: Double) => m*Random.nextGaussian)   

  // Create the process and noise models
val model = new DKalman(A, H, P0))
val output = model.filter(trajectory)


The state transition matrix A is initialized with the diagonal element (x and dx/dt) of 1 and dt (lines 8 - 10). The control vector B implements the coefficients of the first equation (3) with the values g*dt*dt/2 and g.dt (lines 12 -14). The trajectory is the only variable that is actually observed (speed and acceleration are not observed). Therefore, the measurement matrix H has only one non-zero entry (line 16). 
The values for the noise on trajectory and speed for the model and the measurement is defined by the matrix Q (lines 17 - 19). The covariance matrix P0 (lines 21 - 23) completes the initialization for the process and measurement.

Thank you for reading this article. For more information ...

References

Sunday, March 28, 2021

MLOps for Data Scientists

Target audience: Beginner
Estimated reading time: 4'

A few of my colleagues in data science are hesitant about embracing MLOps. Why should it matter to them?   Actually a lot!

This article presents a comprehensive overview of MLOps, especially from a data scientist's perspective. Essentially, MLOps aims to address common issues of reliability and clarity that frequently arise during the development and deployment of machine learning models.


Table of contents
       Data-centric AI
Follow me on LinkedIn

AI productization

MLOps encompasses a suite of tools that facilitate the lifecycle of data-centric AI. This includes training models, performing error analysis to pinpoint data types where the algorithm underperforms, expanding the dataset through data augmentation, resolving discrepancies in data label definitions, and leveraging production data for ongoing model enhancement.

MLOps aims to streamline and automate the training and validation of machine learning models, enhancing their quality and ensuring they meet business and regulatory standards. It merges the roles of data engineering, data science, and dev-ops into a cohesive and predictable process across the following domains:
  • Deployment and automation
  • Reproducibility of models and predictions
  • Diagnostics
  • Governance and regulatory compliance (Socs-2, HIPAA)
  • Scalability and latency
  • Collaboration
  • Business use cases & metrics
  • Monitoring and management
  • Technical support

Predictable ML lifecycle

MLOps outlines the management of the entire machine learning lifecycle. This includes integrating model generation with software development processes (like Jira, Github), ensuring continuous testing and delivery, orchestrating and deploying models, as well as monitoring their health, diagnostics, performance governance, and aligning with business metrics. From a data science standpoint, MLOps involves a consistent and cyclical process of gathering and preprocessing data, training and assessing models, and deploying them in a production environment.

Data-centric AI

Andrew Ng pioneered the idea of data-centric AI, advocating for AI professionals to prioritize the quality of their training data rather than concentrating mainly on model or algorithm development. Unlike the conventional model-centric AI approach, where data is gathered with minimal focus on its quality to train and validate a model, data-centric AI emphasizes improving data quality. This approach enhances the likelihood of success for AI projects and machine learning models in practical applications.

MLOps, on the other hand, involves a continuous and iterative process encompassing data collection and pre-processing, model training and evaluation, and deployment in a production environment.
Fig 1. Overview of continuous development in data-centric AI - courtesy Andrew Ng


There are several difference between the traditional model-centric AI and data centric AI approaches.

Model Centric Data Centric
Goal is to collect all the data you can and develop a model good enough to deal with noise to avoid overfitting. Goal is to select a subset of the training data with the highest consistency and reliability so multiple models performs well.
Hold the data fixed and iteratively improve the model and code. Hold the model and code fixes and iteratively improve the data.


Repeatable processes
The objective is to implement established and reliable software development management techniques (such as Scrum, Kanban, etc.) and DevOps best practices in the training and validation of machine learning models. By operationalizing the training, tuning, and validation processes, the automation of data pipelines becomes more manageable and predictable.

The diagram below showcases how data acquisition, analysis, training, and validation tasks transition into operational data pipelines:
Fig 2. Productization in Model-centric AI

As shown in Figure 2, the deployment procedure in a model-centric AI framework offers limited scope for integrating model training and validation with fresh data. 

Fig 3. Productization in Data-centric AI

Conversely, in a data-centric AI approach, Figure 3, the model is put into action early in the development cycle. This early deployment facilitates ongoing integration and updates to the model(s), utilizing feedback and newly acquired data.

AI lifecycle management tools

While the development tools traditionally used by software engineers are largely applicable to MLOps, there has been an introduction of specialized tools for the ML lifecycle in recent years. Several open-source tools have emerged in the past three years to facilitate the adoption and implementation of MLOps across engineering teams.
  • DVC (Data Version Control) is tailored for version control in ML projects.
  • Polyaxon offers lifecycle automation for data scientists within a collaborative workspace.
  • MLFlow oversees the complete ML lifecycle, from experimentation to deployment, and features a model registry for managing different model versions.
  • Kubeflow streamlines workflow automation and deployment in Kubernetes containers.
  • Metaflow focuses on automating the pipeline and deployment processes.
Additionally, AutoML frameworks are increasingly popular for swift ML development, offering a user experience akin to GUI development.

Canary, frictionless release

A strong testing and deployment strategy is essential for the success of any AI initiative. Implementing a canary release smoothens the transition of a model from a development or staging environment to production. This method involves directing a percentage of user requests to a new version or a sandbox environment based on criteria set by the product manager (such as modality, customer type, metrics, etc.). 

This strategy minimizes the risk of deployment failures since it eliminates the need for rollbacks. If issues arise, it's simply a matter of ceasing traffic to the new version.



Thank you for reading this article. For more information ...

References



---------------------------

Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3