Thursday, September 26, 2024

Introduction to SE3 Lie Groups in Python

Target audience: Advanced
Estimated reading time: 5'

After years of feeling daunted by Lie groups and algebras, I finally took the plunge into exploring these fascinating smooth manifolds. This article offers an introduction to the widely-used 3D Special Euclidean group (SE3).
Contents
       Lie manifolds
       Geomstats
       Components
       Inversion
       Composition
References
Appendix
Follow me on LinkedIn

What you will learn:  How to calculate an element of the 3D Special Euclidean group (SE3) from a given rotation matrix and translation vector in the tangent space, including the implementation of operations for computing the inverse and composition of SE3 group elements.

Notes

  • This post is a follow up on articles related to differential geometry and geometry learning [ref 1, 23 and 4] and introduction to 3-dimension Special Orthogonal group [ref 5].
  • Environments: Python 3.11,  Matplotlib 3.9, Geomstats 2.8.0, Numpy 2.1.2
  • Source code is available at  Github.com/patnicolas/Data_Exploration/Lie
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statement.

Disclaimer : A thorough tutorial and explanation of Lie groups, Lie algebras, and geometric priors for deep learning models is beyond the scope of this article. Instead, the following sections concentrate on experiments involving key elements and operations on Lie groups using the Geomstats Python library [ref 6].

Overview

Lie manifolds

smooth manifold is a topological space that locally resembles Euclidean space and allows for smooth (infinitely differentiable) transitions between local coordinate systems. This structure allows for the use of calculus on the manifold. 

The tangent space at a point on a manifold is the set of tangent vectors at that point, like a line tangent to a circle or a plane tangent to a surface.
Tangent vectors can act as directional derivatives, where you can apply specific formulas to characterize these derivatives.

Fig. 1 Manifold with tangent space and exponential/logarithm maps

In differential geometry, a Lie group is a mathematical structure that combines the properties of both a group and a smooth manifold. It allows for the application of both algebraic and geometric techniques. As a group, it has an operation (like multiplication) that satisfies certain axioms (closure, associativity, identity, and invertibility) [ref 7].
A 'real' Lie group is a set G with two structures: G is a group and G is a (smooth, real) manifold. These structures agree in the following sense: multiplication (a.k.a. product or composition) and inversion are smooth maps.
A morphism of Lie groups is a smooth map which also preserves the group operation: f(gh) = f(g)f(h) and f(1) = 1.
Fig. 2 Manifold with tangent space and identity and group element 
(Courtesy A. Kirillov Jr Department of Mathematics SUNY at Stony Brook)

Special Euclidean Group

The Euclidean group is a subset of the broader affine transformation group. It contains the translational and orthogonal groups as subgroups. Any element of SE(n) can be represented as a combination of a translation and an orthogonal transformation, where the translation B can either precede or follow the orthogonal transformation A,
The 3-dimension Special Euclidean group (SE3) is described as a 4x4 matrix as 
equation

A previous article, Special Orthogonal Lie group SO3 introduced the 3-dimension Special Orthogonal Lie group (SO3). How SE3 group differs from SO3?


Geomstats

Geomstats is a free, open-source Python library designed for conducting machine learning on data situated on nonlinear manifolds, an area known as Geometric Learning. This library offers object-oriented, thoroughly unit-tested features for fundamental manifolds, operations, and learning algorithms, compatible with various execution environments, including NumPyPyTorch, and TensorFlow (Overview Geomstats library).

The library is structured into two principal components:
  • geometry: This part provides an object-oriented framework for crucial concepts in differential geometry, such as exponential and logarithm maps, parallel transport, tangent vectors, geodesics, and Riemannian metrics.
  • learning: This section includes statistics and machine learning algorithms tailored for manifold data, building upon the scikit-learn framework.


Evaluation

The purpose of this section is to demonstrate that the inverse of a SE3 element and the composition of two SE3 elements belong to SE3 manifold. 

Components

We adopt the same object-oriented approach as used with the Special Orthogonal Group to describe the components and operations on the SE(3) manifold. The LieSE3Group class encapsulates the definition of the Special Euclidean group and its associated operations.
We specify three constructors:
  • __init__: Default constructor that create a new element in the SE3 manifold, group_element, using a 4x4 matrix as rotation+translation matrix on the tangent space.
  • build_from_numpyAlternative constructor with a 3x3 Numpy array list for the rotation matrix and 1x3 Numpy array for the translation vector on the tangent space 
  • build_from_vec: Alternative constructor with a 9 elements list for rotation matrix and a 3 elements list as translation vector on the tangent space 
import geomstats.backend as gs
from geomstats.geometry.special_euclidean import SpecialEuclidean



class LieSE3Group(object):
   dim = 3
   # Lie group as defined in Geomstats library
   lie_group = SpecialEuclidean(n=dim, point_type='matrix', epsilon=0.15, equip=False)
  
   # Support conversion of rotation matrix and translation vector to 4x4 matrix
   extend_rotation = np.array([[0.0, 0.0, 0.0]])
   extend_translation = np.array([[1.0]])

   
# Default constructor with 4x4 matrix on tangent space as argument
   def __init__(self, se3_element: np.array) -> None:
      self.se3_element = se3_element
      # Apply the exponential map to generate a point on the SE3 manifold
      self.group_element = LieSE3Group.lie_group.exp(self.se3_element)


  # Constructor with 3x3 numpy array for rotation matrix and  
  # 1x3 numpy array as translation vector on the tangent space 
  @classmethod
  def build_from_numpy(cls, rot_matrix: np.array, trans_matrix: np.array) -> Self:
      rotation_matrix = gs.array(rot_matrix)
      translation_matrix = gs.array(trans_matrix)
      se3_element = LieSE3Group.__build_se3_matrix(rotation_matrix, translation_matrix)
      
      return cls(se3_element)



  # Constructor with a 9 elements list for rotation matrix and  a
  # 3 elements list as translation vector on the tangent space 
@classmethod def build_from_vec(cls, rot_matrix: List[float], trans_vector: List[float]) -> Self: np_rotation_matrix = np.reshape(rot_matrix, (3, 3)) np_translation_matrix = LieSE3Group.__convert_to_matrix(trans_vector) return LieSE3Group.build_from_numpy(np_rotation_matrix, np_translation_matrix)

The method build_from_numpy invoked the private static method, __build_se3_matrix to build the 4x4 numpy array from the 3x3 rotation matrix and 1x3 translation vector. Its implemented is included in the appendix.


The generation of point on SE3 manifold uses a rotation around Z axis (rot_matrix) and a translation along each of the 3 axis (trans_vector).

rot_matrix = [1.0, 0.0, 0.0, 0.0, 0.0, -1.0, 0.0, 1.0, 0.0]
trans_vector = [0.5, 0.3, 0.4]
print(f'\nRotation matrix:\n{np.reshape(rot_matrix, (3, 3))}')
print(f'Translation vector: {trans_vector}')

lie_se3_group = LieSE3Group.build_from_vec(rot_matrix, trans_vector)
print(lie_se3_group)
lie_se3_group.visualize_all(rot_matrix, trans_vector)

Output:
Rotation matrix:
[[ 1.  0.  0.]
 [ 0.  0. -1.]
 [ 0.  1.  0.]]

Translation vector: [0.5, 0.8, 0.6]
SE3 tangent space:
[[ 1.0  0.0  0.0  0.5]
 [ 0.0  0.0 -1.0  0.8]
 [ 0.0  1.0  0.0  0.6]
 [ 0.0  0.0  0.0  1.0]]
SE3 point:
[[ 2.718   0.000   0.000   1.359]
 [ 0.000   0.540  -0.841   0.806]
 [ 0.000   0.841   0.540   1.440]
 [ 0.000   0.000   0.000   2.718]]

The following plots illustrates the two inputs (rotation matrix and translation vector) on the tangent space and the resulting point on the SE3 manifold.



Fig 1. Visualization of 3x3 rotation matrix and 1x3 translation vector on SE3

Fig 2. Visualization of 4x4 matrix (point) on  SE3 manifold


Inversion

Let's validate that the inverse of an element on SE3 Lie group belongs to a SE3 group. The implementation, LieSE3Group method inverse, relies on the SpecialEuclidean.inverse method of Geomstats library.


def inverse(self) -> Self:
    inverse_group_point = LieSE3Group.lie_group.inverse(self.group_element)
    return LieSE3Group(inverse_group_point)


We reuse the 3x3 orthogonal rotation around Z axis (rot_matrix) with a new translation vector [0.5, 0.8, 0.6] on the SE3 tangent space to generate point on the manifold and its inverse.

rot_matrix = [1.0, 0.0, 0.0, 0.0, 0.0, -1.0, 0.0, 1.0, 0.0]
trans_vector = [0.5, 0.8, 0.6]
lie_se3_group = LieSE3Group.build_from_vec(rot_matrix, trans_vector)
   
inv_lie_se3_group = lie_se3_group.inverse()
print(f'\nSE3 element\n{lie_se3_group}\nInverse\n{inv_lie_se3_group}')
lie_se3_group.visualize(inv_lie_se3_group.group_element, 'Inverse')

Input
SE3 tangent space:
[[ 1.0  0.0  0.0  0.5]
 [ 0.0  0.0 -1.0  0.8]
 [ 0.0  1.0  0.0  0.6]
 [ 0.0  0.0  0.0  1.0]]

SE3 point:
[[ 2.718   0.000   0.000   1.359]
 [ 0.000   0.540  -0.841   0.806]
 [ 0.000   0.841   0.540   1.440]
 [ 0.000   0.000   0.000   2.718]]

Inverse SE3 point
[[ 15.154    0.000   0.000  -26.738]
 [   0.000    1.143   1.279    -3.307]
 [   0.000   -1.279   1.143     1.125]
 [   0.000    0.000   0.000     2.718]]

The inverse on the SE3 manifold is visualizes in the following heatmap.

Fig 3. Visualization of inverse of a 4x4 matrix (point) on  SE3 manifold

Composition

The second key property of a Lie group on a manifold is that the composition of two group elements also belongs to the group. The product method in the LieSE3Group class performs this operation by composing the current 4x4 SE(3) matrix with another SE(3) element (denoted as lie_se3_group).


def product(self, lie_se3_group: Self) -> Self:
     composed_group_point = LieSE3Group.lie_group.compose(
                 self.group_element,  
                 lie_se3_group.group_element)
     return LieSE3Group(composed_group_element)


First test
Let's compose this SE3 element with itself.

rot_matrix = [1.0, 0.0, 0.0, 0.0, 0.0, -1.0, 0.0, 1.0, 0.0]
trans_vector = [0.5, 0.8, 0.6]
se3_group = LieSE3Group.build_from_vec(rot_matrix, trans_vector)

# Composition of the same matrix
se3_group_product = se3_group.product(se3_group)
print(f'\nComposed SE3 point:\:{se3_group_product}')

SE3 tangent space:
[[ 7.389   0.000    0.000   7.389 ]
 [ 0.000  -0.416   -0.909   1.417]
 [ 0.000   0.909   -0.416    5.372]
 [ 0.000   0.000    0.000    7.389]]
Composed SE3 point:
[[ 1618.174    0.000     0.000  119568.075]
 [       0.000  40.518  -52.045       162.141]
 [       0.000  52.045   40.518        113.238]
 [       0.000    0.000     0.000      1618.174 ]]

Fig 4. Visualization of the composition of a 4x4 matrix (SE3 manifold point) with itself


Second test:
We compose a combine 3x3 rotation matrix (rotation around z axis) rot1_matrix and translation vector trans1_vector = [0.5, 0.8. 0.6] with a 3x3 rotation matrix (x axis), rot2_matrix and translation vector trans2_vector = [0.1, -0.3, 0.3].

rot1_matrix = [1.0, 0.0, 0.0, 0.0, 0.0, -1.0, 0.0, 1.0, 0.0]
trans1_vector = [0.5, 0.8, 0.6]
se3_group1 = LieSE3Group.build_from_vec(rot1_matrix, trans1_vector)
print(f'\nFirst SE3 matrix:{se3_group1}')

rot2_matrix = [0.0, -1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0]
trans2_vector = [0.1, -0.3, 0.3]
se3_group2 = LieSE3Group.build_from_vec(rot2_matrix, trans2_vector)
print(f'\nSecond SE3 matrix:{se3_group2}')

se3_composed_group = se3_group1.product(se3_group1)
print(f'\nComposed SE3 matrix:{se3_composed_group}')

First SE3 matrix:
SE3 tangent space:
[[ 1.   0.   0.   0.5]
 [ 0.   0.  -1.   0.8]
 [ 0.   1.   0.   0.6]
 [ 0.   0.   0.   1. ]]
SE3 point:
[[ 2.718   0.000   0.000  1.359 ]
 [ 0.000   0.540  -0.841  0.806 ]
 [ 0.000   0.841   0.540  1.440 ]
 [ 0.000   0.000   0.000   2.718 ]]

Second SE3 matrix:
SE3 tangent space:
[[ 0.  -1.   0.   0.1]
 [ 1.   0.   0.  -0.3]
 [ 0.   0.   1.   0.3]
 [ 0.   0.   0.   1. ]]
SE3 point:
[[ 0.540  -0.841   0.000   0.351 ]
 [ 0.841   0.540   0.000  -0.386 ]
 [ 0.000   0.000   2.718   0.815 ]
 [ 0.000   0.000   0.000   2.718 ]]

Composed SE3 matrix:
SE3 tangent space:
[[ 7.389   0.000   0.000   7.389 ]
 [ 0.000  -0.416  -0.909  1.417 ]
 [ 0.000   0.909  -0.416   5.372 ]
 [ 0.000   0.000   0.000   7.389 ]]
SE3 point:
[[ 1618.177    0.000   0.000  11956.808 ]
 [       0.000    0.405  -0.520     162.141 ]
 [       0.000    0.520   0.405    1132.388 ]
 [       0.000    0.000   0.000    1618.177 ]]

The following diagram visualizes the two input SE3 group elements used in the composition, se3_group1, se3_group2  and the resulting SE3 element, se3_composed_group.

Fig 5. Visualization of two SE3 4x4 matrices input to composition

Fig 6. Visualization of the composition of two SE3 4x4 matrices

References




----------------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3 and Geometric Learning in Python Newsletter on LinkedIn.



Appendix

A simple class method to build a 4 x 4 matrix on SE3 tangent space, from a 3x3 rotation matrix and 1x3 translation vector.

@staticmethod
def __build_se3_matrix(rot_matrix: np.array, trans_matrix: np.array) -> np.array:
   extended_rot = np.concatenate([rot_matrix, LieSE3Group.extend_rotation], axis=0)
   extended_trans = np.concatenate([trans_matrix.T, LieSE3Group.extend_translation])
   
   return np.concatenate([extended_rot, extended_trans], axis=1)



Wednesday, September 18, 2024

Generative AI with Kafka & Spark

Target audience: Advanced
Estimated reading time: 6'

While Python is predominantly recognized as the go-to programming language for data science, leveraging Java-based frameworks can offer substantial advantages, especially for rapid distributed inference.

In this article, we will outline the structure of a swift, distributed inference system for Bidirectional Encoder Representations from Transformers (BERT) models [ref 1], harnessing the capabilities of Apache Spark, Kafka, and the Deep Java Library (DJL).


Table of contents
     Architecture
Follow me on LinkedIn

What you will learn: How to leverage Apache Kafka, Spark & Deep Java Library for faster inference on transformer models.

Notes:
  • This article doesn't delve into the specifics of Apache Spark, Kafka, Deep Java Library, or BERT individually. Instead, it focuses on how these components are integrated to create an efficient solution for inference tasks.
  • Development environments: JDK 11, Scala 2.12.15, Apache Spark 3.3.1, Apache Kafka 2.8.0, Deep Java Library 0.20.0
  • Comments and ancillary code are omitted for the sake of clarity.
  • Source code available at https://github.com/patnicolas/bertspark

Combining the best of both worlds

Python is a popular environment for developing and training deep learning models like TensorFlow, PyTorch, and MXNet. As an interpreted language, it offers data scientists the flexibility of notebooks for interactive development, evaluation, and refinement of neural models. Python boasts an extensive library encompassing natural language processing, machine learning models, statistical algorithms, and data management tools.

However, this dynamic development environment faces two significant challenges in runtime inference:
  1. Python's limited capacity for task parallelization, whether through concurrent threads or distributing tasks across a network.
  2. Commercial applications often depend on web services running on Java Virtual Machine (JVM) and make extensive use of Apache's open-source libraries.
This raises the question: Can we use Python to define, train, and evaluate deep learning models, and then employ JVM-based languages for real-time inference?

The solution hinges on the fact that deep learning frameworks like PyTorch and TensorFlow are fundamentally binary executables written in C++. The binary versions of these deep learning libraries can be accessed by both Python and Java through their respective interfaces.

Java/Scala for inference

As mentioned earlier, Python frameworks are frequently employed for training deep learning models. However, when it comes to deployment into production (specifically for inference purposes), these models need to be merged with current applications that are written in Java or Scala. This integration often involves utilizing data processing frameworks like Flink, Presto, or Spark.

This particular study concentrates on the integration of a BERT model into an existing Spark application. Apache Spark is renowned for its ability to rapidly process large datasets concurrently across multiple distributed services, as noted [ref 1]. Meanwhile, the Deep Java Library serves as a Java library that implements the most popular deep learning models, providing access via a Java native interface.

Training (Python) and Inference (Java/Scala) stack


Apache Spark and Amazon's Deep Java Library (DJL) tackle the two main challenges associated with deploying machine learning models in production that were developed using Python.

Typically, the process involves creating models in a Python environment like Jupyter, an IDE, or Anaconda, and then saving the model parameters. DJL then takes over by loading these saved parameters and initializing the inference model, which is then ready to handle runtime requests.

Distributed inference pipeline

The goal is to utilize Apache Spark for distributed computation and Kafka for asynchronous, or non-blocking, data queuing.
By integrating these two technologies, we can enhance the scalability of predictions by parallelizing the execution of deep learning models. The critical components of this distributed inference pipeline include:
  • Apache Spark: This tool segments runtime requests for predictions into batches. These batches are then processed concurrently across remote worker nodes.
  • Apache Kafka: This acts as an asynchronous messaging queue, effectively separating the client application from the inference pipeline, ensuring smooth data flow without bottlenecks.
  • Deep Java Library (DJL): It connects with the binary executables of the deep learning models.
  • Kubernetes: This system containerizes the instances of the inference pipelines, facilitating scalable and automated deployment. Notably, Spark version 3.2 and later versions offer direct integration with Kubernetes.
  • Deep Learning Frameworks: This includes well-known frameworks like TensorFlow, MXNet, and PyTorch, which are part of the overall architecture.
Through this combination, we achieve a robust and scalable system for managing and executing deep learning model inferences efficiently.

Generic data flow for Inference of deep learning models with DJL

The two main benefits of such pipeline are simplicity (all tasks/processes run on JVM) and low latency.

Note: Spark and DJL can also be used in the training phase to distribute the training of a mini batch.

Apache Kafka

Apache Kafka is an open-source distributed event streaming platform for high volume data pipelines, streaming analytics, data integration, and mission-critical applications. Kafka supports event streaming ensures a continuous flow of data through a pipeline or sequence of transformation such as Extract, Transform and Load [ref 2].

First ,we construct the handler class, KafkaPrediction that

  1. consumes requests from Kafka topic consumeTopic
  2. invokes the prediction model and transformation, predictionPipeline
  3. produces prediction into Kafka topic produceTopic
The actual request is wrapped into the consumed message, RequestMessage. Same for the prediction produced back to the Kafka queue.

class KafkaPrediction(
 consumeTopic: String,
 produceTopic: String,
 predictionPipeline: Seq[Request] => Seq[Prediction])  {
     
      // 1 - Constructs the transform of Kafka messages for prediction
  val transform = (requestMsg: Seq[RequestMessage]) => {
      // 2- Invoke the execution of the pipeline
      val predictions = predictionPipeline(requestMsg.map(_.requestPayload))
      predictions.map(ResponseMessage(_))
  } 
    
    // 3- Build the Kafka consumer for prediction request
  val consumer = new KafkaConsumer[RequestMessage](
    RequestSerDe.deserializingClass,
    consumeTopic
  )
    // 4- Build the Kafka producer for prediction response
  val producer = new KafkaProducer[ResponseMessage](
     ResponseSerDe.serializingClass, 
     produceTopic
  )
  .....
}

  1. We first need to create a wrapper function, transform to generate a prediction. The  function converts a request message of type RequestMessage into a prediction of type ResponseMessage.
  2. The wrapper, transform invoke the prediction pipeline predictionPipeline after converting the messages of type RequestMessage consumed from Kafka into actual request (Request). The predictions are converted into message of type ResponseMessage produced to Kafka
  3. The consumer is fully defined by the de-serialization of data consumed from Kafka and its associated topic
  4. The producer serialized the response back to Kafka service.
def executeBatch(
  consumeTopic: String, 
  produceTopic: String, 
  maxNumResponses: Int): Unit = { 
 
   // 1 - Initialize the prediction pipeline
 val kafkaHandler = new KafkaPrediction(
    consumeTopic, 
    produceTopic, 
    predictionPipeline
  )

  while(running)  {
      // 2 - Pool the request topic (has its own specific Kafka exception handler)
   val consumerRecords = kafkaHandler.consumer.receive
 
   if(consumerRecords.nonEmpty) {
        // 3 - Generate and apply transform to the batch
     val input: Seq[RequestMessage] = consumerRecords.map(_._2)
     val responses = kafkaHandler.predict(input) 
 
     if(responses.nonEmpty) {
         // 4 - Produce to the output topic
         val respMessages = responses.map(
             response =>(response.payload.id, response)
         ) 
 
         // 5- Produce the batch of response messages to Kafka
        kafkaHandler.producer.send(respMessages)
             
        // 6 - Get confirmation from Kafka has indeed processed the response
        kafkaHandler.consumer.asyncCommit
     }
     else
        logger.error("No response is produced to Kafka")
   }
   kafkaHandler.close
}
  1. First we instantiate the Kafka message handler class, KafkaPrediction we created earlier
  2. At regular interval, we pull a batch of new requests from Kafka
  3. If the batch is not empty, we invoke the handler, predict to the prediction models
  4. Once done, we encapsulate the predictions into the ResponseMessage instances
  5. The messages are produced into the producer topic in the Kafka queue 
  6. Finally, Kafka acknowledges the correct reception of the responses, asynchronously.
Next, we leverage Spark to distribute the batch of requests across multiple computation nodes (workers)


Apache Spark

Apache Spark, an open-source distributed processing system, is adept at handling large-scale data sets. It leverages in-memory caching and refined query execution strategies for real-time analytics [ref 3].

In our specific use case, we employ Spark to distribute a batch of requests, which are sourced from Kafka, across a network. This setup enables the simultaneous execution of multiple BERT models. Such an architecture not only prevents a single point of failure, ensuring fault tolerance, but also permits the use of generic, cost-efficient hardware.

Leveraging Spark data set and partitioning is surprisingly simple.

def predict(
   requests: Seq[Request]
)(implicit sparkSession: SparkSession): Seq[Prediction] = {
  import sparkSession.implicits._

    // 1 - Convert request into a Spark data set
  val requestDataset = requests.toDS()

    // 2 - Execute the prediction by invoking the DJL model
  val responseDataset: Dataset[Prediction] = requestDataset(predict(_))

    // 3 - Convert Spark data set response 
  responseDataset.collect() 
}
  1. Once the spark session (context) is initiated, the batch of requests is converted into a data set, requestDataset
  2. Spark applies the prediction model (DJL) on each request on the partitioned data 
  3. Finally, the predictions are collected from the Spark worker nodes before been returned to the Kafka handler

Note: The Spark context is assumed to be created and passed as implicit parameter to the prediction method.

 

Deep Java Library

This component is crucial as it connects the flow of incoming and outgoing data with the deep learning models. The Deep Java Library (DJL) is an open-source Java framework that accommodates popular deep learning frameworks like MXNet, PyTorch, and TensorFlow.

DJL's capability to adapt to any hardware setup (be it CPU or GPU) and its integration with big data frameworks position it as an ideal choice for a high-performance distributed inference engine [ref 4]. The library is particularly well-suited for constructing transformer encoders like BERT or GPT, as well as decoders such as GPT and ChatGPT.

In this setup, the input tensors are processed by the deep learning models on a GPU. Importantly, the data is allocated in the native memory space, which is external to the JVM and its garbage collector. The DJL library supports native tensor types such as NDArray and lists of tensors like NDList, along with a straightforward memory management tool, NDManager.

The classifier operates on the Spark worker node. The following code snippet, though a simplified version, illustrates the steps involved in invoking a BERT-based classifier using the DJL framework. 

class BERTClassifier(
   minTermFrequency: Int, 
   path: Path)(implicit sparkSession: SparkSession) {

  // 1 - Manage tensor allocation as NDArray
  val ndManager = NDManager.newManager()
 
  // 2 - Define the configuration of the classifier
  val classifyCriteria: Criteria[NDList, NDList] = Criteria.builder()
     .optApplication(Application.UNDEFINED)
     .setTypes(classOf[NDList], classOf[NDList])
     .optOptions(options)
     .optModelUrls(s"file://${path.toAbsolutePath}")
     .optBlock(classificationBlock)
     .optEngine(Engine.getDefaultEngineName())
     .optProgress(new ProgressBar())
     .build()
 
  // 3- Load the model from a local file
  val thisModel = classifyCriteria.loadModel()

  // 4 - Instantiate a new predictor
  val predictor = thisModel.newPredictor()

  // 5 - Execute this request on this worker node
  def predict(requests: Request): Prediction = {
    predictor.predict(ndManager, requests)
  }

  // 6- Close resources
  def close(): Unit = {
    model.close()
    predictor.close()
    ndManager.close()
  }
}  
  1. Set the manager for tensor in native memory
  2. Configure the classifier with its related neural block (classificationBlock)
  3. Load the model (MXNet, PyTorch or TensorFlow) from local file
  4. Instantiate a predictor from the model
  5. Submit the request to the DL model and return a prediction
  6. Close all the resources allocated in the native memory at the end of the run
NoteDJL can be optionally used for training. 


Use case: BERT

In order to illustrate the application of Spark and DJL to BERT we consider a model to predict a topic given a document. 

Architecture

Our model has 3 components:
  • Text processor (Tokenizer, Document segmentation,...)
  • Pre-trained BERT
  • Fully-connected neural network classifier (supervised)

A transformer model consists of two main components: an encoder and a decoder. The encoder's role is to convert sentences and paragraphs into an internal format, typically a numerical matrix, that captures the context of the input. Conversely, the decoder interprets and reverses this process. When combined, the encoder and decoder enable the transformer to execute sequence-to-sequence tasks like translation. Interestingly, isolating the encoder part of the transformer provides insights into the context, enabling various intriguing applications.

BERT particularly capitalizes on the attention mechanism to gain a more nuanced understanding of language context. BERT is composed of several layers of encoder blocks. In this model, the input text is divided into tokens, akin to the traditional transformer model, and each token is subsequently converted into a vector at BERT's output.

BERT has been applied to various problems including the automation of medical coding [ref 5]

Neural blocks

The practice of arranging components of neural networks, such as layers and activation functions, into modular, reusable blocks is a common strategy to simplify and deconstruct complex models [ref 6] .

In DJL, a block is a composable function that forms a neural network. It can represent single operation, parts of a neural network, and even the whole neural network. What makes blocks special is that they contain a number of parameters that are used in their function and are trained during deep learning. As these parameters are trained, the functions represented by the blocks get more and more accurate.

The core purpose of a block is to perform an operation on the inputs, and return an output. It is defined in the forward method. The forward function could be defined explicitly in terms of parameters or implicitly and could be a combination of the functions of the child blocks. 

The following code snippet illustrates the composition of blocks for a transformer encoder using Deep Java Library blocks. The 3 main components are
  • Transformer, self-attention block with token, position and sentence order embeddings
  • Masked Language Model (MLM) block
  • Next Sentence Prediction (NSP) block
class CustomPretrainingBlock (
  bertModelType: String
  activationType: String,
  vocabularySize: Long) extends BaseNetBlock {
 
  // First block: BERT transformer
  val bertBlock = getBertConfig(bertModelType)
        .setTokenDictionarySize(Math.toIntExact(vocabularySize))
        .build
  val activationFunc: java.util.function.Function[NDArray, NDArray] = 
         ActivationConfig.getNDActivationFunc(activationType)

    // Second block: Masked Language Model
  val bertMLMBlock = new BertMaskedLanguageModelBlock(bertBlock, activationFunc)

   // Third: block: Next Sentence Predictor
  val bertNSPBlock = new BertNextSentenceBlock
  val pretrainingBlocks = new BERTPretrainingBlocks(
      ("transformer", bertBlock),
      ("mlm", bertMLMBlock),
      ("nsp", bertNSPBlock)
   )

  override protected def forwardInternal(
    parameterStore: ParameterStore,
    inputNDList: NDList,
    training : Boolean,
    params: PairList[String, java.lang.Object]): NDList

BERT has several models with various number of encoder blocks, attention heads, embedding sizes and dimensions.

def getBertConfig(bertModelType: String): BertBlock.Builder = bertModelType match {
  case `nanoBertLbl` => 
      // 4 encoders, 4 attention heads, embedding size: 256, dimension 256x4
    BertBlock.builder().nano()
  
  case `microBertLbl`=>
      // 12 encoders,8 attention heads, embedding size: 512, dimension 512x4
    BertBlock.builder().micro()
  
  case `baseBertLbl` =>
      // 12 encoders,12 attention heads, embedding size: 768, dimension 768x4
    BertBlock.builder().base()
  
  case `largeBertLbl` =>
      // 24 encoders,16 attention heads, embedding size: 1024, dimension 1024x4
    BertBlock.builder().large()
  
  case _ =>
}

The appendix provides a detailed implementation guide for executing the 'forward' method used in pre-training, written in Scala, for reference purposes.



Thank you for reading this article. For more information ...

References

[1BiDirectional Encoder Representations from Transformer 

Appendix

You can implement your own variant of BERT by overriding the method forwardInternal.

override protected def forwardInternal(
  parameterStore: ParameterStore,
  inputNDList: NDList,
  training : Boolean,
  params: PairList[String, java.lang.Object]): NDList = {

    // Dimension batch_size x max_sentence_size
  val tokenIds = inputNDList.get(0)
  val typeIds = inputNDList.get(1)
  val inputMasks = inputNDList.get(2)

    // Dimension batch_size x num_masked_token
  val maskedIndices = inputNDList.get(3)

  try {
    val ndChildManager = NDManager.subManagerOf(tokenIds)
    ndChildManager.tempAttachAll(inputNDList)

      // Step 1: Process the transformer block for Bert
    val bertBlockNDInput = new NDList(tokenIds, typeIds, inputMasks)
    val ndBertResult = transformerBlock.forward(parameterStore, bertBlockNDInput, training)

      // Step 2 Process the Next Sentence Predictor block
      // Embedding sequence dimensions are batch_size x max_sentence_size x embedding_size
    val embeddedSequence = ndBertResult.get(0)
    val pooledOutput = ndBertResult.get(1)

      // Need to un-squeeze for batch size =1,   (embedding_vector) => (1, embedding_vector)
    val unSqueezePooledOutput =
      if(pooledOutput.getShape.dimension() == 1) {
         val expanded = pooledOutput.expandDims(0)
         ndChildManager.tempAttachAll(expanded)
         expanded
      }
      else
         pooledOutput

      // We compute the NSP probabilities in case there are more than one single sentences
    val logNSPProbabilities: NDArray =
       bertNSPBlock.forward(parameterStore, new NDList(unSqueezePooledOutput), training)
                 .singletonOrThrow

        // Step 3: Process the Masked Language Model block
        // Embedding table dimension are vocabulary_size x Embeddings size
    val embeddingTable = transformerBlock
            .getTokenEmbedding
            .getValue(parameterStore, embeddedSequence.getDevice, training)

        // Dimension:  (batch_size x maskSize) x Vocabulary_size
    val logMLMProbabilities: NDArray = bertMLMBlock
        .forward(
           parameterStore,
           new NDList(embeddedSequence, maskedIndices, embeddingTable),
           training)
        .singletonOrThrow

        // Finally build the output
    val ndOutput = new NDList(logNSPProbabilities, logMLMProbabilities)
      ndChildManager.ret(ndOutput)
  }
  catch { ... }
}
  



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3