Tuesday, March 21, 2023

ChatGPT API Python Client

Target audience: Beginner
Estimated reading time: 3' 

This post describes a generic implementation of a client to the OpenAI ChatGPT REST web service. This implementation has been written and tested with Python 3.9. Comments in the source code are omitted for the sake of clarity.

ChatGPT client application in Python

Table of contents
ChatGPT API overview
       HTTP parameters
       Request body
Follow me on LinkedIn
Notes
  • Environments:  Python 3.9.16,  ChatGPT 3.5
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.

ChatGPT API overview

Let's review the parameters for OpenAI Chat Completion REST API

HTTP Parameters

The connectivity parameters for the HTTP post are
  • OPEN_AI_KEY=xxxxx
  • URLhttps://api.openai.com/v1/chat/completions
  • CONTENT_TYPE=application/json
  • AUTHORIZATION="Bearer ${OPEN_AI_KEY}" 

Request body

The parameters of the POST content:

  • model: Identifier of the model (i.e. gpt-3.5-turbo)
  • messagesText of the conversation 
  • user: Identifier for the user
  • role:  Role of the user {system|user|assistant}
  • content: Content of the message or request
  • name:  Name of the author
  • temperatureHyper-parameter that controls the "creativity" of the language model by adjusting the distribution (softmax) for the prediction of the next word/token. The higher the value (> 0) the more diverse the prediction (default: 0)
  • top_pSample the tokens with top_p probability. It is an alternative to temperature (default: 1)
  • nNumber of solutions/predictions (default 1)
  • max_tokens: Limit the number of tokens used in the response (default: Infinity)
  • presence_penalty: Penalize new tokens which appear in the text so far if positive. A higher value favors most recent topics (default: 0)
  • frequency_penalty: Penalize new tokens which appear in the text with higher frequency if the value is positive (default: 0)
NoteOpenAI models are non-deterministic, as identical requests may yield different answers. Setting temperature = 0 will make the outputs mostly deterministic.

Chat completion request

The first step is to implement the data structure for the content of the request as described in the previous section. The content of the request, ChatGPTRequest is a read-only and therefore implemented as a data class. The class members reflect the semantic of the OpenAI chat completion API.
  • modelIdentifier of the model (i.e. gpt-3.5-turbo, code-davinci-002 ...)
  • role Role of the user as system, user or assistant
  • temperatureHyper-parameter that adjusts the distribution for the prediction of the next token
  • max_tokensLimit the number of tokens used in the response 
  • top_p: Sample the tokens with p highest probability (default 1)
  • n Number predictions/choices (default 1)
  • presence_penalty: Penalize new tokens which appear in the text so far 
  • frequency_penalty: Penalize new tokens which appear in the text with higher frequency
from dataclasses import dataclass

@dataclass
class ChatGPTRequest:
   model: str
   role: str
   temperature: float
   max_tokens: int
   top_p: int
   n: int
   presence_penalty: int
   frequency_penalty: int


ChatGPT client

There are two constructors for the client of type ChatGPTClient 
  • Default constructor for a fully customizable request ChatGPTRequest as argument.
  • Constructor build for simple request that required only model, role and temperature (evaluated with values 0 and 1) with all other parameters using the default values specified in the openAI API, openapi.ChatCompletion, documentation [ref 1]. We use the annotated type, InstanceType to specify the type of instance to create [ref 2]
from typing import Type, TypeVar, Callable

Instancetype = TypeVar('Instancetype', bound='ChatGPTClient')


class ChatGPTClient(object):
    import constants

      # static variable for the API key and the default maximum 
      # number of tokens returned
    openai.api_key = constants.openai_api_key
    default_max_tokens = 1024

    def __init__(self, chatGPTRequest: ChatGPTRequest):
       self.chatGPTRequest = chatGPTRequest

   @classmethod
   def build(cls, model: str, role: str, temperature: float) -> Instancetype: 
       chatGPTRequest = ChatGPTRequest( 
           model, 
           role, 
           temperature,
           ChatGPTClient.default_max_tokens, 
           1, 
           1, 
           0, 
           0)
       return cls(chatGPTRequest)


Post

Let's start with a simple version of the invocation that returns only the answer without any explanation, status or usage. The only argument of the HTTP post is the user prompt. The response is extracted from the message of the first choice of the answer.

def post(self, prompt: str) -> str:     
   import logging

   try:
       response = openai.ChatCompletion.create(  
           model=self.chatGPTRequest.model,
           messages=[{'role': self.chatGPTRequest.user, 'content': prompt}],
           temperature=self.chatGPTRequest.temperature,
           max_tokens=self.chatGPTRequest.max_tokens
       )
       return response['choices'][0].message.content
   
   except  openai.error.AuthenticationError as e:
       logging.error(f'Failed as {str(e)}')


Some advanced client applications may require processing and evaluating some metadata included in the response. In this case, the JSON content of the Chat completion answer is objectified.
The key variables of the ChatCompletion response are
  • idConversation identifier
  • object: Payload of the response
  • createdCreation date
  • usage.prompt_tokens:  Number of tokens used in the prompt
  • usage.completion_tokens: Number of tokens used in the completion
  • usage.total_tokens Total number of tokens
  • choices
  • choices.message.role Role used in the request
  • choices.message.content Response content
  • choices.finish_reason Description of the state of completion of the request
As with the request content, the difference components of the answer are implemented as data classes to reflect the structure of the JSON response from the ChatCompletion API.
The previous implementation of the post method is upgraded by adding a conversion of the JSON response into ChatGPTResponse.

@dataclass
class ChatGPTChoice:
   messages: []
   index: int
   finish_reason: str

@dataclass
class ChatGPTUsage:
   prompt_tokens: int
   completion_tokens: int
   total_tokens: int

@dataclass
class ChatGPTResponse:
   id: str
   object: str
   created: int
   model: int
   choices: []
   usage: ChatGPTUsage


def post_dev(self, prompt: str) -> ChatGPTResponse:
   import json
   import logging
        
   try:
      response = openai.ChatCompletion.create(
          model=self.chatGPTRequest.model,
          messages=[{'role': self.chatGPTRequest.user, 'content': prompt}],
          temperature=self.chatGPTRequest.temperature,
          max_tokens=self.chatGPTRequest.max_tokens
      )
      return json.loads(response)
 
   except  openai.error.AuthenticationError as e:
       logging.error(f'Failed as {str(e)}')

Note: We are using the built-in json Python library [ref 3] to decode the ChatGPT response. The list of alternative libraries include Orjson [ref 4] and SimpleJson [ref 5]

References

[4Orjson




---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3




Wednesday, February 15, 2023

Improve Python with Pydantic 2.x

Target audience: Advanced
Estimated reading time: 3'

In my tenure as a Scala developer, I frequently underscored the importance of robust type checking mechanisms. When transitioning to Python, it was the incorporation of the Pydantic library and the native typing package that significantly underscored Python's capabilities as a mature programming language, bolstering its appeal to developers like me.

This article delves into the contemporary features of the Pydantic library, elucidating concepts such as serialization, annotated validators, and discriminated unions.


Table of contents
Follow me on LinkedIn
Notes:
  • Environments: Python 3.10, Pydantic 2.4.2
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.

Why pydantic?

Pydantic [ref 1] is a Python library designed for data modeling and parsing, equipping Python developers with customizable tools for error handling and validation. This library seamlessly integrates built-in features for JSON encoding and decoding, combined with validation. It offers a declarative approach to data representation.

Mirroring the typing module, Pydantic aims to emulate the type checking seen in static languages like Java and C++ through IDE-enforced hints. Moreover, it enhances the Python dataclasses module by integrating agile error handling and data validation capabilities.

Pydantic has several benefits:
  • Dedicated, specific types such as FilePath or HttpUrl for validation.
  • Custom validators.
  • Options (Optional type) similar to Java and Scala.
  • Integration with lint functionality in most common IDE.
  • Declarative syntax for modeling data.
  • Wrapper (Field type) for data validation.

To install the Pydantic library (MacOS)
       pip install pydantic

Notes: 
  • This is a sample of features available in version 2.2.1 [ref 2]. The reader is invited to explore the entire functionality of pedantic.
  • A migration from Pydantic version 1.x is available to Python developers [ref 3].
  • FastAPI web framework leverages Pydantic library [ref 4].

Serialization validators

One basic function is validation of JSON representation of an object. The following example combines Pydantic data class Algorithm as inherited from BaseModel and data type from typing module.
An algorithm instance if completely defined by its name (i.e., Binary tree), time complexity (i.e., O(logN), asymptotic search time complexity [ref 5] and supported programming languages (i.e., C++, Python, Go, ...). For sake of clarity, time and space complexity is defined as an enumerator.

from pydantic import BaseModel, ValidationError
from typing import AnyStr, Tuple, Dict, Optional
from enum import Enum

""
    # Time complexity for average case for search operation
class AlgorithmComplexity(Enum): O1 = 0 # Hash table OlogN = 1 # Binary search, red-black tree, ... ON = 2 # Linked list, stack, queue, ... ONlogN = 3 # Quick, merge, shell sort, ... ON2 = 4 # Insertion sort, ... class Algorithm(BaseModel): name: AnyStr time_complexity: AlgorithmComplexity space_complexity: AlgorithmComplexity languages: Tuple
# Tuple of supported programming languages


In the following code snippet, the Pydantic constructor Algorithm.__init__ validates the textual/JSON representation of the data which fails for the improperly definition of the linked list.

if __name__ == '__main__':

       # Correct definition of a binary tree
binary_tree_json = { 'name': 'Binary Tree', 'time_complexity': AlgorithmComplexity.OlogN, 'space_complexity': AlgorithmComplexity.OlogN, 'languages': ('java', 'C++', 'Python') } try: binary_tree = Algorithm(**binary_tree_json) except ValidationError as err: logging.error(err.errors())

      # Incorrect specification of the time complexity to access a linked list 
linked_list_json = { 'name': 'LinkedList', 'time_complexity': 'N', 'space_complexity': AlgorithmComplexity.ON, 'languages': ('java', 'C++', 'Scala') } try: linked_list = Algorithm(**linked_list_json) except ValidationError as err: logging.error(err.errors()) # [{'type': 'enum', 'loc': ('time_complexity',), 'msg': 'Input should be 0,1,2,3 or 4', # 'input': 'N2', 'ctx': {'expected': '0,1,2,3 or 4'}}]



Annotated validator

Pydantic provides a way to apply validators on a field value or types . You should use this whenever you want to bind validation to a type instead of model or field.

There are several types of validators:
  • After validator which is run after Pydantic internal parser and validator.
  • Before validator, run before parsing and validation
  • Plain validator which is a variant of before validator terminating the validation once it fails.
  • Wrap validator which supports the functionality of before, after and plain validators.
The following example implements a 'before validator': the languages listed for a given algorithm should be valid as specified in the tuple, valid_languages.

Here are the 2 steps:
  1. Define a validation function, validate_languages. Note that the type of the output, Tuple is the same as the type of input (argument)
  2. Convert the variable, languages as an annotation which takes the input/output type, Tuple as first argument and the validation function as second argument.

from typing_extensions import Annotated
from pydantic.functional_validators import BeforeValidator



# Formal list of programming languages used to implement the algorithms
valid_languages: Tuple = ('python', 'go', 'scala', 'c++', 'java')

"""
    Remove the languages which are not correctly specified (typos, non-compliant, ..)
"""
def validate_languages(languages: Tuple) -> Tuple:
    return (lang for lang in languages if lang.lower() in valid_languages)


        # We add the annotator for the programming language field.
class Algorithm(BaseModel):
    name: AnyStr
    time_complexity: AlgorithmComplexity   
    space_complexity: AlgorithmComplexity
                         # is the list of supported languages legitimate?
    languages: Annotated[Tuple, BeforeValidator(validate_languages)]



The simplest validation scheme is to make sure the programming languages available for this algorithm are valid. We do not make any assumption whether programming languages are specified with lower or upper case characters.

  
binary_tree_json = {
     'name': 'Binary Tree',
     'time_complexity': AlgorithmComplexity.OlogN,
     'space_complexity': AlgorithmComplexity.OlogN,
     'languages': ('Java', 'C+++', 'Python')    # C+++ is not a valid programming language
 }
    
 try:
     binary_tree = Algorithm(**binary_tree_json)
     logging.info(str(binary_tree.languages))   # ('Java', 'Python')
 except ValidationError as err:
     logging.error(err.errors())



Discriminated unions for composition

The goal of discriminated unions is to resolve the need of multi-inheritance by creating a composite of mutual exclusive types.
Let's consider a simple hierarchy of HTML elements:


Class diagram in sub-classing using union discriminated on html_type

The HTMLText class for the HTML fragment <p>[text]</p> and HTMLImage class for the HTML component <img src=[url] /> inherit from the base abstract class HTMLElement. The member html_type declared with type Literal defines explicitly the type of element.
The goal is to create a new composite type, WebElement, that can invoke either HTMLText or (exclusive) HTMLImage class.

import abc
from typing import AnyStr, Union, Literal
from pydantic import BaseModel, Field


class HTMLElement(BaseModel):
   html_type: AnyStr

   @abc.abstractmethod
   def get(self) -> AnyStr:
      pass


class HTMLText(HTMLElement):
   html_type: Literal["p"]
   doc: AnyStr
    
   def get(self) -> AnyStr:
      return f'<{self.html_type}>{self.doc}<{self.html_type}/>'


class HTMLImage(HTMLElement):
   html_type: Literal["img"]
   img_url: AnyStr
    
   def get(self) -> AnyStr:
       return f'<{self.html_type} src={self.img_url} />'



The objective is to create a mutually exclusive composite, WebElement of HTMLImage and HTMLText using html_type as discriminator.
The method polymorphic method get illustrates the mechanism by generating the actual textual representation of the HTML components.

class WebElement(BaseModel):
  element: Union[HTMLImage, HTMLText] = Field(discriminator="html_type")

  def get(self) -> AnyStr:
        return self.element.get()


The simple test consists of creating a dictionary, test_element to initialize a WebElement instance with type 'img'.

if __name__ == '__main__':
  html_text = HTMLText(html_type='p', doc='This is correct')
  print(html_text.get())  # <p>This is correct<p/>
    
  test_element: Dict = {"img_url": "python.png", "text": "", "html_type": "img"}
  web_element = WebElement(element=test_elementl)
  print(web_element.get()) # <img src=python.png />


Note: For readers familiar with Scala, discriminated unions are somewhat similar to mixins.

Thank you for reading this article. For more information ...

References

[4FastAPI


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Saturday, January 21, 2023

Manage Spark Pipelines with Airflow

Target audience: Intermediate
Estimated reading time: 5'

Ever grappled with piecing together a data pipeline from diverse, sometimes mismatched components or processes? Apache Airflow might just be the remedy you have been looking for! 

This article delves into the orchestration design and implementation of a pipeline geared towards testing an Apache Spark application crafted in Scala.

Table of contents

Follow me on LinkedIn
Notes:
  • In order to demonstrate the flexibility of Airflow orchestration capability, our workflow combines process implemented in Python, Bash and Spark/Scala.
  • The implementation of the Spark application used in the article is included in the Appendix for reference.
  • Environments: Python 3.10, Apache Airflow 2.7.0, Apache Spark 3.3.1, Scala 2.13.2
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.

Introduction 

Organizations often develop and implement scripts in multiple languages (like Batch, SQL, Python) to handle data pipelines. This approach can rapidly become unmanageable. Using an orchestration tool like Airflow, data and MLOps engineers can unify these scripts into one integrated platform.

Airflow is an orchestration platform that support modular architecture, dynamic pipeline generation, editable execution graph (DAG), customizable and reusable tasks [ref 1]. 

Key concepts

Airflow's design was influenced by very specific considerations, specifically:
  • Purpose: Airflow serves as a platform designed for the programmatic creation, scheduling, and monitoring of workflows. With its ability to define workflows via Python code, it offers significant flexibility and adaptability.
  • Components: The core components of Airflow include a visualization web server, a scheduler that oversees job execution based on triggers or schedules, and workers responsible for task execution.
  • Use cases: Typical applications of Airflow encompass ETL (Extract, Transform, Load) processes, data analysis workflows, and regularly occurring tasks.
  • Execution graph: Workflows in Airflow are framed using Directed Acyclic Graphs (DAG). Within these DAGs, each node denotes a task, while the connecting edges symbolize task dependencies.
The implementation and resources for Apache Airflow framework is available on Github [ref 2].

Setup

Here is the steps required to install and deploy Airflow using a simple Bash script:

export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.7.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL=https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_ VERSION}/constraints-${PYTHON_VERSION}.txt

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL"}
airflow standalone

Notes
  • Airflow uses SQLite database by default. For production deployment uses MySQL or PostgreSQL database (not SQLite) and do not use SequentialExecutor.
  • Default users and password are defined in $AIRFLOW_HOME/standalone_admin_passwords.txt

Operations

Start airflow:  airflow webserver –port 8082
Stop airflow:  Ctrl-C inside the terminal running airflow
              OR  rm -f ${AIRFLOW_HOME}/airflow-scheduler.pid
                      pkill -f “airflow-scheduler”

Testing a task:  airflow tasks test dag_id task_id start_date
Example: airflow tasks test spark_eval_v1 spark_job 2023-10-09

Use case

The goal is to create and orchestrate a pipeline for testing a Spark application that cleanses an employee dataset of anomalies before feeding it into a deep learning model. 
Each 'Employee' entry is characterized by a 'name', a 'role' within the company (e.g., engineer), a 'score' (rating)  from their most recent performance evaluation, and 'tenure' represented as the number of years they've been with the organization.

The objective is to design and manage a pipeline to test a Spark application: cleanse a data set of employees from outliers prior to input to a deep learning model.
An instance of Employee is defined by a name, a role in the organization (i.e. engineer), a score from his/her last performance review and tenure as number of years in the organization.

valid_employee_role = ['intern',  'engineer',  'manager',  'director',  'VP']

"""
    Data class for employee record
"""
@dataclass
class Employee:
    name: AnyStr
    role: AnyStr
    score: int
    tenure: float

The three orchestration tasks are
  1. Generate employee records with random attributes for position, rating and seniority to the stored on HDFS (generate_to_hdfs)
  2. Load records from HDFS (load_from_hdfs)
  3. Remove outliers records using Apache Spark defined by condition: Valid role, 0 < score <= 10 and 0.0 < tenure < 50.0 (spark_job)

Airflow operators for Spark application

The first two task 
generate_to_hdfs, and generate_to_hdfs are implemented in Python and invoked through a bash script while the spark application is written in Scala [ref 3] (see Appendix).


Orchestration

There are 3 components in creating an Airflow orchestration:
  1. Define the directed acyclic graph (DAG) for execution with the appropriate scheduling, retries,... arguments
  2. Invoke the appropriate operators which define each of the 3 tasks
  3. Specify the order of tasks in the workflow
We reference the following Python code of an Airflow DAG for our scenario to highlight these three elements. This source code serves as the basis for our discussion in this section.

from airflow.decorators import task
from airflow import DAG
from datetime import datetime, timedelta
from typing import List, Optional
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
import logging


# Step 1: Define the DAG with appropriate arguments 

default_args = { 'owner': 'airflow', 'retries': 4, 'retry_delay': timedelta(minutes=10) } dag = DAG( dag_id='spark_eval_v1', default_args=default_args, start_date=datetime(2023, 10, 3), schedule_interval='@hourly', catchup=False, tags=['first_version'] )
  
# Step 2: Create the 3 operators
with dag: """ Task 1 generates randomized employees """ loaded = BashOperator( task_id='generate_to_hdfs', bash_command=f'python3 {data_source_executor} generate' ) """ Task 2 loads the randomly generated data from HDFS """ employees = BashOperator( task_id='load_from_hdfs', bash_command=f'python3 {data_source_executor} load' ) """ Task 3 Launch the Spark job to process """ spark_job = SparkSubmitOperator( task_id='spark_job', conn_id='spark_default', application=spark_jar_path,
        java_class='org.batcheval.Cleanser',
conf={ # To be loaded from file
            'master: 'local[8]',
'spark.driver.memory': '18g', 'spark.executor.memory': '24g', 'spark.default.parallelism': '16', 'spark.local.dir': 'temp', 'spark.network.timeout': '2100', 'spark.driver.maxResultSize': '524288', 'spark.driver.cores': 4, 'spark.executor.cores': 3 }, application_args=[file_name] )

    # Step 3: Specify the workflow 
    loaded >> employees >> spark_job


Defining the DAG

The first step is to define the execution graph. The DAG is a data pipeline crafted in Python. Every DAG depicts a series of tasks structured as a graph. The inherent characteristics of DAGs make them ideal for constructing data pipelines:
  • Directed: When there are several tasks, each task should have a designated upstream or downstream connection.
  • Acyclic: Tasks should not depend on themselves, ensuring there are no never-ending loops.
  • Graph: All tasks can be graphically represented, with the interconnections between them indicated by nodes and edges.

Creating the operators

An operator is a conceptual wrapper around Python code crafted to execute a designated action, represented as a function with parameters. Every operator in your DAG code equates to a single Airflow task.
Operators can be categorized into three types:
  • Action operators: These initiate or set off specific actions, such as BashOperator, PythonOperator, or TriggerDagRunOperator.
  • Transfer operators: These are stateful and handle data migration from one spot to another.
  • Sensors: Derived from BaseSensorOperator, these operators pause execution until a certain condition or operation concludes.
The first two operators (type BatchOperator), loaded and employees, wraps the batch command line execution python data_source generate and python data_source load.

The actual data processing is performed through the Airflow Spark submit operator (SparkSubmitOperator). In this scenario, you need to setup the development environment in order to execute the operator as follow:
  1. Build a Spark jar file.
  2. Install apache-airflow-providers-apache-spark package.
  3. Implement the SparkSubmitOperator [See below]
  4. Configure the Spark connection in Airflow. UI/Admin/Connections, id=spark_default, type: Spark, host/port
The operator, spark_job, defines the minimum set of configuration parameters to execute a Spark submit implemented in Scala [ref 4]:
  • application: Path to the Jar file
  • java_class: Name of the class (Scala in our case) invoked by Spark submit
  • conf: Spark dynamically loaded properties defined in SparkConf or provided as part of the Spark submit command line arguments.
  • application_args: List of application specific arguments to the Spark submit command line.

Specifying the workflow

The final stage is to define the dependencies between the various tasks. This is accomplished very simply by using the self-explainable overloaded >> operator.

Tasks implementation

The Python functions triggered by the operators are provided to showcase how the Airflow operators work. However, they aren't necessary for grasping the Airflow framework itself.

Data generator

It relies on a random generator to generate the fields of each employee record then save the 'records' onto a file.

class DataOperator(object):

    def __init__(self, input_file: AnyStr):
        self.input_file = input_file


    @staticmethod
    def generate_and_save(this_file_name: AnyStr, num_entries: int) -> bool:
        """
           Generate employee records with random attributes (score, tenure,...)
           :param this_file_name HDFS file name, the records are stored
           :param num_entries Number of records to generate
           :return True if the records are correctly generated and save, False otherwise
        """
try: records = [f'emp_{i},{valid_employee_role[i % len(valid_employee_role)]},{int(random.uniform(1, 10))},{random.uniform(0, 25):.1f}' for i in range(num_entries)] header = 'name,role,score,tenure\n' content = header + '\n'.join(records)

            # State the records
with open(this_file_name, 'w') as f: f.write(content) return True except Exception as e: task_logger.error(f'generate and save failed {str(e)}') return False


Data loader

This Python method loads the records from the CSV file then generate the list employees. It returns an empty list in case of exception.

def load(self) -> List[Employee]:
   try:
       # Load the records from a CSV file
with open(self.input_file, 'rt') as f: csv_reader = csv.reader(f, delimiter=',', quotechar='\'')

         # Build the list of employee records to be cleansed.
employee_list = [] for idx, attributes in enumerate(csv_reader): if idx > 0: employee_list.append(DataOperator.__get_employee(attributes))  
         return employee_list

   except Exception as e:
      task_logger.error(f'Loader failed: {str(e)}')
      return []


@staticmethod
def __get_employee(fields: List[AnyStr]) -> Employee:
   return Employee(
         fields[0],           # Name of employee
         fields[1],           # Role of employee in the organization
         int(fields[2]),     # Score of the employee from performance review
         float(fields[3])   # Tenure or seniority of the employee
   )

Data cleanser

The outlier elimination is carried out by the Spark cluster, which is written in Scala. Its primary and straightforward role is to apply the filter outlined in the Use case section. While the Scala code for this task isn't crucial for grasping the Airflow orchestration aspects, it's provided in the Appendix for reference.


Airflow best practices

To ensure Airflow doesn't overconsume computational resources, adhere to the following guidelines:
  • View Airflow as a task orchestrator, not an execution platform. Delegate heavy processing to execution frameworks and maximize the use of ETL processes.
  • Refrain from importing large datasets directly into a task.
  • Segment intricate tasks (or elaborate Python functions encapsulated by the PythonOperator) into more manageable units to simplify debugging.
  • Utilize template variables using the Jinja template format {{...}}.
  • Maintain a one-to-one ratio of DAGs to Python files.
  • Steer clear of top-level code in your DAG files as it triggers execution during each DAG parse.
Note: DAG code undergoes parsing/analysis every 'min_file_process_interval' (set to 30 seconds by default).

Conclusion

The article's example understates Airflow's capabilities. In practical scenarios, more complex orchestration is necessary, involving Spark, Kafka, NoSQL databases, and deep learning models developed in different programming languages.

Apache Airflow stands out as the perfect instrument for orchestrating Spark pipelines, whether they're written in Python, Scala, or Java. The prowess of Airflow operators in handling diverse services can spare software and MLOps engineers numerous development hours.

Several other data orchestration platforms are available, including:
  • BMC's Control-M: This is a workflow orchestration solution that places a strong focus on adhering to Service Level Agreements [ref 5].
  • Flyte: Originally created at Lyft, this is a scalable platform for automating workflows [ref 6].
  • Y42: This platform offers a comprehensive solution for data orchestration, featuring an SQL design interface [ref 7].
  • Shipyard App: A data orchestration tool emphasizing code reusability, package management, version control, and observability [ref 8].

Thank you for reading this article. For more information ...

References

[7] y42.com

Appendix

A basic outlier removal function has been implemented in Spark using Scala. The validation function, termed 'validate', acts as a parameter for the 'OutlierRemoval' class. The primary method, named 'apply', retrieves the raw employee records from a CSV file named 'srcFilename', applies the specified filter, and then saves the results to another CSV file, called 'destFileName'.

final class OutlierRemoval(validate: Employee => Boolean)(implicit sparkSession: SparkSession){
    
  def apply(srcFilename: String, destFilename: String): Unit = try  {
      import sparkSession.implicits._

      // Load data from HDFS
      val employeeDS = sparkSession.read.option("delimiter", ",").csv(srcFilename).map(Employee(_))
      employeeDS.show(5)
      // Filter out the outlier
      val filteredEmployeeDS = employeeDS.filter(validate(_))
      filteredEmployeeDS.write.csv(destFilename)
    }
    catch {
      case e: Exception => logger.error(s"ERROR: ${e.getMessage}")
    }
}

object OutlierRemoval {
  // Default constructor
  def apply()(implicit sparkSession: SparkSession): OutlierRemoval = new OutlierRemoval(default_validate)

  private val valid_employee_role = Set[String](
    "intern", "engineer", "manager", "director", "VP"
  )

  private val default_validate = (employee: Employee) => {
    valid_employee_role.contains(employee.role) && employee.score <= 10 &&
    employee.score > 0 && employee.tenure > 0.0 && employee.tenure < 50.0
  }
}

Main function to be called by Spark submit script.

object Cleanser extends App{
    // Initialize the context
  implicit val sparkSession: SparkSession = {
     val conf = new SparkConf().setMaster("local[8]")
     SparkSession.builder()
         .appName("sparkjob")
         .config(conf)
         .getOrCreate()
  }

  val srcFilename = args(0)
  private val outputFilename = args(1)
  private val outlierRemoval = OutlierRemoval()
    // Apply the filter to remove outliers
  outlierRemoval(srcFilename, outputFilename)
    // Cleanup
sparkSession.close() }


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3