Wednesday, February 15, 2023

Improve Python with Pydantic 2.x

Target audience: Advanced
Estimated reading time: 3'

In my tenure as a Scala developer, I frequently underscored the importance of robust type checking mechanisms. When transitioning to Python, it was the incorporation of the Pydantic library and the native typing package that significantly underscored Python's capabilities as a mature programming language, bolstering its appeal to developers like me.

This article delves into the contemporary features of the Pydantic library, elucidating concepts such as serialization, annotated validators, and discriminated unions.


Table of contents
Follow me on LinkedIn
Notes:
  • Environments: Python 3.10, Pydantic 2.4.2
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.

Why pydantic?

Pydantic [ref 1] is a Python library designed for data modeling and parsing, equipping Python developers with customizable tools for error handling and validation. This library seamlessly integrates built-in features for JSON encoding and decoding, combined with validation. It offers a declarative approach to data representation.

Mirroring the typing module, Pydantic aims to emulate the type checking seen in static languages like Java and C++ through IDE-enforced hints. Moreover, it enhances the Python dataclasses module by integrating agile error handling and data validation capabilities.

Pydantic has several benefits:
  • Dedicated, specific types such as FilePath or HttpUrl for validation.
  • Custom validators.
  • Options (Optional type) similar to Java and Scala.
  • Integration with lint functionality in most common IDE.
  • Declarative syntax for modeling data.
  • Wrapper (Field type) for data validation.

To install the Pydantic library (MacOS)
       pip install pydantic

Notes: 
  • This is a sample of features available in version 2.2.1 [ref 2]. The reader is invited to explore the entire functionality of pedantic.
  • A migration from Pydantic version 1.x is available to Python developers [ref 3].
  • FastAPI web framework leverages Pydantic library [ref 4].

Serialization validators

One basic function is validation of JSON representation of an object. The following example combines Pydantic data class Algorithm as inherited from BaseModel and data type from typing module.
An algorithm instance if completely defined by its name (i.e., Binary tree), time complexity (i.e., O(logN), asymptotic search time complexity [ref 5] and supported programming languages (i.e., C++, Python, Go, ...). For sake of clarity, time and space complexity is defined as an enumerator.

from pydantic import BaseModel, ValidationError
from typing import AnyStr, Tuple, Dict, Optional
from enum import Enum

""
    # Time complexity for average case for search operation
class AlgorithmComplexity(Enum): O1 = 0 # Hash table OlogN = 1 # Binary search, red-black tree, ... ON = 2 # Linked list, stack, queue, ... ONlogN = 3 # Quick, merge, shell sort, ... ON2 = 4 # Insertion sort, ... class Algorithm(BaseModel): name: AnyStr time_complexity: AlgorithmComplexity space_complexity: AlgorithmComplexity languages: Tuple
# Tuple of supported programming languages


In the following code snippet, the Pydantic constructor Algorithm.__init__ validates the textual/JSON representation of the data which fails for the improperly definition of the linked list.

if __name__ == '__main__':

       # Correct definition of a binary tree
binary_tree_json = { 'name': 'Binary Tree', 'time_complexity': AlgorithmComplexity.OlogN, 'space_complexity': AlgorithmComplexity.OlogN, 'languages': ('java', 'C++', 'Python') } try: binary_tree = Algorithm(**binary_tree_json) except ValidationError as err: logging.error(err.errors())

      # Incorrect specification of the time complexity to access a linked list 
linked_list_json = { 'name': 'LinkedList', 'time_complexity': 'N', 'space_complexity': AlgorithmComplexity.ON, 'languages': ('java', 'C++', 'Scala') } try: linked_list = Algorithm(**linked_list_json) except ValidationError as err: logging.error(err.errors()) # [{'type': 'enum', 'loc': ('time_complexity',), 'msg': 'Input should be 0,1,2,3 or 4', # 'input': 'N2', 'ctx': {'expected': '0,1,2,3 or 4'}}]



Annotated validator

Pydantic provides a way to apply validators on a field value or types . You should use this whenever you want to bind validation to a type instead of model or field.

There are several types of validators:
  • After validator which is run after Pydantic internal parser and validator.
  • Before validator, run before parsing and validation
  • Plain validator which is a variant of before validator terminating the validation once it fails.
  • Wrap validator which supports the functionality of before, after and plain validators.
The following example implements a 'before validator': the languages listed for a given algorithm should be valid as specified in the tuple, valid_languages.

Here are the 2 steps:
  1. Define a validation function, validate_languages. Note that the type of the output, Tuple is the same as the type of input (argument)
  2. Convert the variable, languages as an annotation which takes the input/output type, Tuple as first argument and the validation function as second argument.

from typing_extensions import Annotated
from pydantic.functional_validators import BeforeValidator



# Formal list of programming languages used to implement the algorithms
valid_languages: Tuple = ('python', 'go', 'scala', 'c++', 'java')

"""
    Remove the languages which are not correctly specified (typos, non-compliant, ..)
"""
def validate_languages(languages: Tuple) -> Tuple:
    return (lang for lang in languages if lang.lower() in valid_languages)


        # We add the annotator for the programming language field.
class Algorithm(BaseModel):
    name: AnyStr
    time_complexity: AlgorithmComplexity   
    space_complexity: AlgorithmComplexity
                         # is the list of supported languages legitimate?
    languages: Annotated[Tuple, BeforeValidator(validate_languages)]



The simplest validation scheme is to make sure the programming languages available for this algorithm are valid. We do not make any assumption whether programming languages are specified with lower or upper case characters.

  
binary_tree_json = {
     'name': 'Binary Tree',
     'time_complexity': AlgorithmComplexity.OlogN,
     'space_complexity': AlgorithmComplexity.OlogN,
     'languages': ('Java', 'C+++', 'Python')    # C+++ is not a valid programming language
 }
    
 try:
     binary_tree = Algorithm(**binary_tree_json)
     logging.info(str(binary_tree.languages))   # ('Java', 'Python')
 except ValidationError as err:
     logging.error(err.errors())



Discriminated unions for composition

The goal of discriminated unions is to resolve the need of multi-inheritance by creating a composite of mutual exclusive types.
Let's consider a simple hierarchy of HTML elements:


Class diagram in sub-classing using union discriminated on html_type

The HTMLText class for the HTML fragment <p>[text]</p> and HTMLImage class for the HTML component <img src=[url] /> inherit from the base abstract class HTMLElement. The member html_type declared with type Literal defines explicitly the type of element.
The goal is to create a new composite type, WebElement, that can invoke either HTMLText or (exclusive) HTMLImage class.

import abc
from typing import AnyStr, Union, Literal
from pydantic import BaseModel, Field


class HTMLElement(BaseModel):
   html_type: AnyStr

   @abc.abstractmethod
   def get(self) -> AnyStr:
      pass


class HTMLText(HTMLElement):
   html_type: Literal["p"]
   doc: AnyStr
    
   def get(self) -> AnyStr:
      return f'<{self.html_type}>{self.doc}<{self.html_type}/>'


class HTMLImage(HTMLElement):
   html_type: Literal["img"]
   img_url: AnyStr
    
   def get(self) -> AnyStr:
       return f'<{self.html_type} src={self.img_url} />'



The objective is to create a mutually exclusive composite, WebElement of HTMLImage and HTMLText using html_type as discriminator.
The method polymorphic method get illustrates the mechanism by generating the actual textual representation of the HTML components.

class WebElement(BaseModel):
  element: Union[HTMLImage, HTMLText] = Field(discriminator="html_type")

  def get(self) -> AnyStr:
        return self.element.get()


The simple test consists of creating a dictionary, test_element to initialize a WebElement instance with type 'img'.

if __name__ == '__main__':
  html_text = HTMLText(html_type='p', doc='This is correct')
  print(html_text.get())  # <p>This is correct<p/>
    
  test_element: Dict = {"img_url": "python.png", "text": "", "html_type": "img"}
  web_element = WebElement(element=test_elementl)
  print(web_element.get()) # <img src=python.png />


Note: For readers familiar with Scala, discriminated unions are somewhat similar to mixins.

Thank you for reading this article. For more information ...

References

[4FastAPI


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Saturday, January 21, 2023

Manage Spark Pipelines with Airflow

Target audience: Intermediate
Estimated reading time: 5'

Ever grappled with piecing together a data pipeline from diverse, sometimes mismatched components or processes? Apache Airflow might just be the remedy you have been looking for! 

This article delves into the orchestration design and implementation of a pipeline geared towards testing an Apache Spark application crafted in Scala.

Table of contents

Follow me on LinkedIn
Notes:
  • In order to demonstrate the flexibility of Airflow orchestration capability, our workflow combines process implemented in Python, Bash and Spark/Scala.
  • The implementation of the Spark application used in the article is included in the Appendix for reference.
  • Environments: Python 3.10, Apache Airflow 2.7.0, Apache Spark 3.3.1, Scala 2.13.2
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.

Introduction 

Organizations often develop and implement scripts in multiple languages (like Batch, SQL, Python) to handle data pipelines. This approach can rapidly become unmanageable. Using an orchestration tool like Airflow, data and MLOps engineers can unify these scripts into one integrated platform.

Airflow is an orchestration platform that support modular architecture, dynamic pipeline generation, editable execution graph (DAG), customizable and reusable tasks [ref 1]. 

Key concepts

Airflow's design was influenced by very specific considerations, specifically:
  • Purpose: Airflow serves as a platform designed for the programmatic creation, scheduling, and monitoring of workflows. With its ability to define workflows via Python code, it offers significant flexibility and adaptability.
  • Components: The core components of Airflow include a visualization web server, a scheduler that oversees job execution based on triggers or schedules, and workers responsible for task execution.
  • Use cases: Typical applications of Airflow encompass ETL (Extract, Transform, Load) processes, data analysis workflows, and regularly occurring tasks.
  • Execution graph: Workflows in Airflow are framed using Directed Acyclic Graphs (DAG). Within these DAGs, each node denotes a task, while the connecting edges symbolize task dependencies.
The implementation and resources for Apache Airflow framework is available on Github [ref 2].

Setup

Here is the steps required to install and deploy Airflow using a simple Bash script:

export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.7.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL=https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_ VERSION}/constraints-${PYTHON_VERSION}.txt

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL"}
airflow standalone

Notes
  • Airflow uses SQLite database by default. For production deployment uses MySQL or PostgreSQL database (not SQLite) and do not use SequentialExecutor.
  • Default users and password are defined in $AIRFLOW_HOME/standalone_admin_passwords.txt

Operations

Start airflow:  airflow webserver –port 8082
Stop airflow:  Ctrl-C inside the terminal running airflow
              OR  rm -f ${AIRFLOW_HOME}/airflow-scheduler.pid
                      pkill -f “airflow-scheduler”

Testing a task:  airflow tasks test dag_id task_id start_date
Example: airflow tasks test spark_eval_v1 spark_job 2023-10-09

Use case

The goal is to create and orchestrate a pipeline for testing a Spark application that cleanses an employee dataset of anomalies before feeding it into a deep learning model. 
Each 'Employee' entry is characterized by a 'name', a 'role' within the company (e.g., engineer), a 'score' (rating)  from their most recent performance evaluation, and 'tenure' represented as the number of years they've been with the organization.

The objective is to design and manage a pipeline to test a Spark application: cleanse a data set of employees from outliers prior to input to a deep learning model.
An instance of Employee is defined by a name, a role in the organization (i.e. engineer), a score from his/her last performance review and tenure as number of years in the organization.

valid_employee_role = ['intern',  'engineer',  'manager',  'director',  'VP']

"""
    Data class for employee record
"""
@dataclass
class Employee:
    name: AnyStr
    role: AnyStr
    score: int
    tenure: float

The three orchestration tasks are
  1. Generate employee records with random attributes for position, rating and seniority to the stored on HDFS (generate_to_hdfs)
  2. Load records from HDFS (load_from_hdfs)
  3. Remove outliers records using Apache Spark defined by condition: Valid role, 0 < score <= 10 and 0.0 < tenure < 50.0 (spark_job)

Airflow operators for Spark application

The first two task 
generate_to_hdfs, and generate_to_hdfs are implemented in Python and invoked through a bash script while the spark application is written in Scala [ref 3] (see Appendix).


Orchestration

There are 3 components in creating an Airflow orchestration:
  1. Define the directed acyclic graph (DAG) for execution with the appropriate scheduling, retries,... arguments
  2. Invoke the appropriate operators which define each of the 3 tasks
  3. Specify the order of tasks in the workflow
We reference the following Python code of an Airflow DAG for our scenario to highlight these three elements. This source code serves as the basis for our discussion in this section.

from airflow.decorators import task
from airflow import DAG
from datetime import datetime, timedelta
from typing import List, Optional
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
import logging


# Step 1: Define the DAG with appropriate arguments 

default_args = { 'owner': 'airflow', 'retries': 4, 'retry_delay': timedelta(minutes=10) } dag = DAG( dag_id='spark_eval_v1', default_args=default_args, start_date=datetime(2023, 10, 3), schedule_interval='@hourly', catchup=False, tags=['first_version'] )
  
# Step 2: Create the 3 operators
with dag: """ Task 1 generates randomized employees """ loaded = BashOperator( task_id='generate_to_hdfs', bash_command=f'python3 {data_source_executor} generate' ) """ Task 2 loads the randomly generated data from HDFS """ employees = BashOperator( task_id='load_from_hdfs', bash_command=f'python3 {data_source_executor} load' ) """ Task 3 Launch the Spark job to process """ spark_job = SparkSubmitOperator( task_id='spark_job', conn_id='spark_default', application=spark_jar_path,
        java_class='org.batcheval.Cleanser',
conf={ # To be loaded from file
            'master: 'local[8]',
'spark.driver.memory': '18g', 'spark.executor.memory': '24g', 'spark.default.parallelism': '16', 'spark.local.dir': 'temp', 'spark.network.timeout': '2100', 'spark.driver.maxResultSize': '524288', 'spark.driver.cores': 4, 'spark.executor.cores': 3 }, application_args=[file_name] )

    # Step 3: Specify the workflow 
    loaded >> employees >> spark_job


Defining the DAG

The first step is to define the execution graph. The DAG is a data pipeline crafted in Python. Every DAG depicts a series of tasks structured as a graph. The inherent characteristics of DAGs make them ideal for constructing data pipelines:
  • Directed: When there are several tasks, each task should have a designated upstream or downstream connection.
  • Acyclic: Tasks should not depend on themselves, ensuring there are no never-ending loops.
  • Graph: All tasks can be graphically represented, with the interconnections between them indicated by nodes and edges.

Creating the operators

An operator is a conceptual wrapper around Python code crafted to execute a designated action, represented as a function with parameters. Every operator in your DAG code equates to a single Airflow task.
Operators can be categorized into three types:
  • Action operators: These initiate or set off specific actions, such as BashOperator, PythonOperator, or TriggerDagRunOperator.
  • Transfer operators: These are stateful and handle data migration from one spot to another.
  • Sensors: Derived from BaseSensorOperator, these operators pause execution until a certain condition or operation concludes.
The first two operators (type BatchOperator), loaded and employees, wraps the batch command line execution python data_source generate and python data_source load.

The actual data processing is performed through the Airflow Spark submit operator (SparkSubmitOperator). In this scenario, you need to setup the development environment in order to execute the operator as follow:
  1. Build a Spark jar file.
  2. Install apache-airflow-providers-apache-spark package.
  3. Implement the SparkSubmitOperator [See below]
  4. Configure the Spark connection in Airflow. UI/Admin/Connections, id=spark_default, type: Spark, host/port
The operator, spark_job, defines the minimum set of configuration parameters to execute a Spark submit implemented in Scala [ref 4]:
  • application: Path to the Jar file
  • java_class: Name of the class (Scala in our case) invoked by Spark submit
  • conf: Spark dynamically loaded properties defined in SparkConf or provided as part of the Spark submit command line arguments.
  • application_args: List of application specific arguments to the Spark submit command line.

Specifying the workflow

The final stage is to define the dependencies between the various tasks. This is accomplished very simply by using the self-explainable overloaded >> operator.

Tasks implementation

The Python functions triggered by the operators are provided to showcase how the Airflow operators work. However, they aren't necessary for grasping the Airflow framework itself.

Data generator

It relies on a random generator to generate the fields of each employee record then save the 'records' onto a file.

class DataOperator(object):

    def __init__(self, input_file: AnyStr):
        self.input_file = input_file


    @staticmethod
    def generate_and_save(this_file_name: AnyStr, num_entries: int) -> bool:
        """
           Generate employee records with random attributes (score, tenure,...)
           :param this_file_name HDFS file name, the records are stored
           :param num_entries Number of records to generate
           :return True if the records are correctly generated and save, False otherwise
        """
try: records = [f'emp_{i},{valid_employee_role[i % len(valid_employee_role)]},{int(random.uniform(1, 10))},{random.uniform(0, 25):.1f}' for i in range(num_entries)] header = 'name,role,score,tenure\n' content = header + '\n'.join(records)

            # State the records
with open(this_file_name, 'w') as f: f.write(content) return True except Exception as e: task_logger.error(f'generate and save failed {str(e)}') return False


Data loader

This Python method loads the records from the CSV file then generate the list employees. It returns an empty list in case of exception.

def load(self) -> List[Employee]:
   try:
       # Load the records from a CSV file
with open(self.input_file, 'rt') as f: csv_reader = csv.reader(f, delimiter=',', quotechar='\'')

         # Build the list of employee records to be cleansed.
employee_list = [] for idx, attributes in enumerate(csv_reader): if idx > 0: employee_list.append(DataOperator.__get_employee(attributes))  
         return employee_list

   except Exception as e:
      task_logger.error(f'Loader failed: {str(e)}')
      return []


@staticmethod
def __get_employee(fields: List[AnyStr]) -> Employee:
   return Employee(
         fields[0],           # Name of employee
         fields[1],           # Role of employee in the organization
         int(fields[2]),     # Score of the employee from performance review
         float(fields[3])   # Tenure or seniority of the employee
   )

Data cleanser

The outlier elimination is carried out by the Spark cluster, which is written in Scala. Its primary and straightforward role is to apply the filter outlined in the Use case section. While the Scala code for this task isn't crucial for grasping the Airflow orchestration aspects, it's provided in the Appendix for reference.


Airflow best practices

To ensure Airflow doesn't overconsume computational resources, adhere to the following guidelines:
  • View Airflow as a task orchestrator, not an execution platform. Delegate heavy processing to execution frameworks and maximize the use of ETL processes.
  • Refrain from importing large datasets directly into a task.
  • Segment intricate tasks (or elaborate Python functions encapsulated by the PythonOperator) into more manageable units to simplify debugging.
  • Utilize template variables using the Jinja template format {{...}}.
  • Maintain a one-to-one ratio of DAGs to Python files.
  • Steer clear of top-level code in your DAG files as it triggers execution during each DAG parse.
Note: DAG code undergoes parsing/analysis every 'min_file_process_interval' (set to 30 seconds by default).

Conclusion

The article's example understates Airflow's capabilities. In practical scenarios, more complex orchestration is necessary, involving Spark, Kafka, NoSQL databases, and deep learning models developed in different programming languages.

Apache Airflow stands out as the perfect instrument for orchestrating Spark pipelines, whether they're written in Python, Scala, or Java. The prowess of Airflow operators in handling diverse services can spare software and MLOps engineers numerous development hours.

Several other data orchestration platforms are available, including:
  • BMC's Control-M: This is a workflow orchestration solution that places a strong focus on adhering to Service Level Agreements [ref 5].
  • Flyte: Originally created at Lyft, this is a scalable platform for automating workflows [ref 6].
  • Y42: This platform offers a comprehensive solution for data orchestration, featuring an SQL design interface [ref 7].
  • Shipyard App: A data orchestration tool emphasizing code reusability, package management, version control, and observability [ref 8].

Thank you for reading this article. For more information ...

References

[7] y42.com

Appendix

A basic outlier removal function has been implemented in Spark using Scala. The validation function, termed 'validate', acts as a parameter for the 'OutlierRemoval' class. The primary method, named 'apply', retrieves the raw employee records from a CSV file named 'srcFilename', applies the specified filter, and then saves the results to another CSV file, called 'destFileName'.

final class OutlierRemoval(validate: Employee => Boolean)(implicit sparkSession: SparkSession){
    
  def apply(srcFilename: String, destFilename: String): Unit = try  {
      import sparkSession.implicits._

      // Load data from HDFS
      val employeeDS = sparkSession.read.option("delimiter", ",").csv(srcFilename).map(Employee(_))
      employeeDS.show(5)
      // Filter out the outlier
      val filteredEmployeeDS = employeeDS.filter(validate(_))
      filteredEmployeeDS.write.csv(destFilename)
    }
    catch {
      case e: Exception => logger.error(s"ERROR: ${e.getMessage}")
    }
}

object OutlierRemoval {
  // Default constructor
  def apply()(implicit sparkSession: SparkSession): OutlierRemoval = new OutlierRemoval(default_validate)

  private val valid_employee_role = Set[String](
    "intern", "engineer", "manager", "director", "VP"
  )

  private val default_validate = (employee: Employee) => {
    valid_employee_role.contains(employee.role) && employee.score <= 10 &&
    employee.score > 0 && employee.tenure > 0.0 && employee.tenure < 50.0
  }
}

Main function to be called by Spark submit script.

object Cleanser extends App{
    // Initialize the context
  implicit val sparkSession: SparkSession = {
     val conf = new SparkConf().setMaster("local[8]")
     SparkSession.builder()
         .appName("sparkjob")
         .config(conf)
         .getOrCreate()
  }

  val srcFilename = args(0)
  private val outputFilename = args(1)
  private val outlierRemoval = OutlierRemoval()
    // Apply the filter to remove outliers
  outlierRemoval(srcFilename, outputFilename)
    // Cleanup
sparkSession.close() }


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3