Wednesday, November 30, 2022

Manage Spark Pipelines with Airflow

Target audience: Intermediate
Estimated reading time: 5'

Ever grappled with piecing together a data pipeline from diverse, sometimes mismatched components or processes? Apache Airflow might just be the remedy you have been looking for! 

This article delves into the orchestration design and implementation of a pipeline geared towards testing an Apache Spark application crafted in Scala.

Table of contents

Follow me on LinkedIn
Notes:
  • In order to demonstrate the flexibility of Airflow orchestration capability, our workflow combines process implemented in Python, Bash and Spark/Scala.
  • The implementation of the Spark application used in the article is included in the Appendix for reference.
  • Environments: Python 3.10, Apache Airflow 2.7.0, Apache Spark 3.3.1, Scala 2.13.2
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.

Introduction 

Organizations often develop and implement scripts in multiple languages (like Batch, SQL, Python) to handle data pipelines. This approach can rapidly become unmanageable. Using an orchestration tool like Airflow, data and MLOps engineers can unify these scripts into one integrated platform.

Airflow is an orchestration platform that support modular architecture, dynamic pipeline generation, editable execution graph (DAG), customizable and reusable tasks [ref 1]. 

Key concepts

Airflow's design was influenced by very specific considerations, specifically:
  • Purpose: Airflow serves as a platform designed for the programmatic creation, scheduling, and monitoring of workflows. With its ability to define workflows via Python code, it offers significant flexibility and adaptability.
  • Components: The core components of Airflow include a visualization web server, a scheduler that oversees job execution based on triggers or schedules, and workers responsible for task execution.
  • Use cases: Typical applications of Airflow encompass ETL (Extract, Transform, Load) processes, data analysis workflows, and regularly occurring tasks.
  • Execution graph: Workflows in Airflow are framed using Directed Acyclic Graphs (DAG). Within these DAGs, each node denotes a task, while the connecting edges symbolize task dependencies.
The implementation and resources for Apache Airflow framework is available on Github [ref 2].

Setup

Here is the steps required to install and deploy Airflow using a simple Bash script:

export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.7.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL=https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_ VERSION}/constraints-${PYTHON_VERSION}.txt

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL"}
airflow standalone

Notes
  • Airflow uses SQLite database by default. For production deployment uses MySQL or PostgreSQL database (not SQLite) and do not use SequentialExecutor.
  • Default users and password are defined in $AIRFLOW_HOME/standalone_admin_passwords.txt

Operations

Start airflow:  airflow webserver –port 8082
Stop airflow:  Ctrl-C inside the terminal running airflow
              OR  rm -f ${AIRFLOW_HOME}/airflow-scheduler.pid
                      pkill -f “airflow-scheduler”

Testing a task:  airflow tasks test dag_id task_id start_date
Example: airflow tasks test spark_eval_v1 spark_job 2023-10-09

Use case

The goal is to create and orchestrate a pipeline for testing a Spark application that cleanses an employee dataset of anomalies before feeding it into a deep learning model. 
Each 'Employee' entry is characterized by a 'name', a 'role' within the company (e.g., engineer), a 'score' (rating)  from their most recent performance evaluation, and 'tenure' represented as the number of years they've been with the organization.

The objective is to design and manage a pipeline to test a Spark application: cleanse a data set of employees from outliers prior to input to a deep learning model.
An instance of Employee is defined by a name, a role in the organization (i.e. engineer), a score from his/her last performance review and tenure as number of years in the organization.

valid_employee_role = ['intern',  'engineer',  'manager',  'director',  'VP']

"""
    Data class for employee record
"""
@dataclass
class Employee:
    name: AnyStr
    role: AnyStr
    score: int
    tenure: float

The three orchestration tasks are
  1. Generate employee records with random attributes for position, rating and seniority to the stored on HDFS (generate_to_hdfs)
  2. Load records from HDFS (load_from_hdfs)
  3. Remove outliers records using Apache Spark defined by condition: Valid role, 0 < score <= 10 and 0.0 < tenure < 50.0 (spark_job)

Airflow operators for Spark application

The first two task 
generate_to_hdfs, and generate_to_hdfs are implemented in Python and invoked through a bash script while the spark application is written in Scala [ref 3] (see Appendix).


Orchestration

There are 3 components in creating an Airflow orchestration:
  1. Define the directed acyclic graph (DAG) for execution with the appropriate scheduling, retries,... arguments
  2. Invoke the appropriate operators which define each of the 3 tasks
  3. Specify the order of tasks in the workflow
We reference the following Python code of an Airflow DAG for our scenario to highlight these three elements. This source code serves as the basis for our discussion in this section.

from airflow.decorators import task
from airflow import DAG
from datetime import datetime, timedelta
from typing import List, Optional
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
import logging


# Step 1: Define the DAG with appropriate arguments 

default_args = { 'owner': 'airflow', 'retries': 4, 'retry_delay': timedelta(minutes=10) } dag = DAG( dag_id='spark_eval_v1', default_args=default_args, start_date=datetime(2023, 10, 3), schedule_interval='@hourly', catchup=False, tags=['first_version'] )
  
# Step 2: Create the 3 operators
with dag: """ Task 1 generates randomized employees """ loaded = BashOperator( task_id='generate_to_hdfs', bash_command=f'python3 {data_source_executor} generate' ) """ Task 2 loads the randomly generated data from HDFS """ employees = BashOperator( task_id='load_from_hdfs', bash_command=f'python3 {data_source_executor} load' ) """ Task 3 Launch the Spark job to process """ spark_job = SparkSubmitOperator( task_id='spark_job', conn_id='spark_default', application=spark_jar_path,
        java_class='org.batcheval.Cleanser',
conf={ # To be loaded from file
            'master: 'local[8]',
'spark.driver.memory': '18g', 'spark.executor.memory': '24g', 'spark.default.parallelism': '16', 'spark.local.dir': 'temp', 'spark.network.timeout': '2100', 'spark.driver.maxResultSize': '524288', 'spark.driver.cores': 4, 'spark.executor.cores': 3 }, application_args=[file_name] )

    # Step 3: Specify the workflow 
    loaded >> employees >> spark_job


Defining the DAG

The first step is to define the execution graph. The DAG is a data pipeline crafted in Python. Every DAG depicts a series of tasks structured as a graph. The inherent characteristics of DAGs make them ideal for constructing data pipelines:
  • Directed: When there are several tasks, each task should have a designated upstream or downstream connection.
  • Acyclic: Tasks should not depend on themselves, ensuring there are no never-ending loops.
  • Graph: All tasks can be graphically represented, with the interconnections between them indicated by nodes and edges.

Creating the operators

An operator is a conceptual wrapper around Python code crafted to execute a designated action, represented as a function with parameters. Every operator in your DAG code equates to a single Airflow task.
Operators can be categorized into three types:
  • Action operators: These initiate or set off specific actions, such as BashOperator, PythonOperator, or TriggerDagRunOperator.
  • Transfer operators: These are stateful and handle data migration from one spot to another.
  • Sensors: Derived from BaseSensorOperator, these operators pause execution until a certain condition or operation concludes.
The first two operators (type BatchOperator), loaded and employees, wraps the batch command line execution python data_source generate and python data_source load.

The actual data processing is performed through the Airflow Spark submit operator (SparkSubmitOperator). In this scenario, you need to setup the development environment in order to execute the operator as follow:
  1. Build a Spark jar file.
  2. Install apache-airflow-providers-apache-spark package.
  3. Implement the SparkSubmitOperator [See below]
  4. Configure the Spark connection in Airflow. UI/Admin/Connections, id=spark_default, type: Spark, host/port
The operator, spark_job, defines the minimum set of configuration parameters to execute a Spark submit implemented in Scala [ref 4]:
  • application: Path to the Jar file
  • java_class: Name of the class (Scala in our case) invoked by Spark submit
  • conf: Spark dynamically loaded properties defined in SparkConf or provided as part of the Spark submit command line arguments.
  • application_args: List of application specific arguments to the Spark submit command line.

Specifying the workflow

The final stage is to define the dependencies between the various tasks. This is accomplished very simply by using the self-explainable overloaded >> operator.

Tasks implementation

The Python functions triggered by the operators are provided to showcase how the Airflow operators work. However, they aren't necessary for grasping the Airflow framework itself.

Data generator

It relies on a random generator to generate the fields of each employee record then save the 'records' onto a file.

class DataOperator(object):

    def __init__(self, input_file: AnyStr):
        self.input_file = input_file


    @staticmethod
    def generate_and_save(this_file_name: AnyStr, num_entries: int) -> bool:
        """
           Generate employee records with random attributes (score, tenure,...)
           :param this_file_name HDFS file name, the records are stored
           :param num_entries Number of records to generate
           :return True if the records are correctly generated and save, False otherwise
        """
try: records = [f'emp_{i},{valid_employee_role[i % len(valid_employee_role)]},{int(random.uniform(1, 10))},{random.uniform(0, 25):.1f}' for i in range(num_entries)] header = 'name,role,score,tenure\n' content = header + '\n'.join(records)

            # State the records
with open(this_file_name, 'w') as f: f.write(content) return True except Exception as e: task_logger.error(f'generate and save failed {str(e)}') return False


Data loader

This Python method loads the records from the CSV file then generate the list employees. It returns an empty list in case of exception.

def load(self) -> List[Employee]:
   try:
       # Load the records from a CSV file
with open(self.input_file, 'rt') as f: csv_reader = csv.reader(f, delimiter=',', quotechar='\'')

         # Build the list of employee records to be cleansed.
employee_list = [] for idx, attributes in enumerate(csv_reader): if idx > 0: employee_list.append(DataOperator.__get_employee(attributes))  
         return employee_list

   except Exception as e:
      task_logger.error(f'Loader failed: {str(e)}')
      return []


@staticmethod
def __get_employee(fields: List[AnyStr]) -> Employee:
   return Employee(
         fields[0],           # Name of employee
         fields[1],           # Role of employee in the organization
         int(fields[2]),     # Score of the employee from performance review
         float(fields[3])   # Tenure or seniority of the employee
   )

Data cleanser

The outlier elimination is carried out by the Spark cluster, which is written in Scala. Its primary and straightforward role is to apply the filter outlined in the Use case section. While the Scala code for this task isn't crucial for grasping the Airflow orchestration aspects, it's provided in the Appendix for reference.


Airflow best practices

To ensure Airflow doesn't overconsume computational resources, adhere to the following guidelines:
  • View Airflow as a task orchestrator, not an execution platform. Delegate heavy processing to execution frameworks and maximize the use of ETL processes.
  • Refrain from importing large datasets directly into a task.
  • Segment intricate tasks (or elaborate Python functions encapsulated by the PythonOperator) into more manageable units to simplify debugging.
  • Utilize template variables using the Jinja template format {{...}}.
  • Maintain a one-to-one ratio of DAGs to Python files.
  • Steer clear of top-level code in your DAG files as it triggers execution during each DAG parse.
Note: DAG code undergoes parsing/analysis every 'min_file_process_interval' (set to 30 seconds by default).

Conclusion

The article's example understates Airflow's capabilities. In practical scenarios, more complex orchestration is necessary, involving Spark, Kafka, NoSQL databases, and deep learning models developed in different programming languages.

Apache Airflow stands out as the perfect instrument for orchestrating Spark pipelines, whether they're written in Python, Scala, or Java. The prowess of Airflow operators in handling diverse services can spare software and MLOps engineers numerous development hours.

Several other data orchestration platforms are available, including:
  • BMC's Control-M: This is a workflow orchestration solution that places a strong focus on adhering to Service Level Agreements [ref 5].
  • Flyte: Originally created at Lyft, this is a scalable platform for automating workflows [ref 6].
  • Y42: This platform offers a comprehensive solution for data orchestration, featuring an SQL design interface [ref 7].
  • Shipyard App: A data orchestration tool emphasizing code reusability, package management, version control, and observability [ref 8].

Thank you for reading this article. For more information ...

References

[7] y42.com

Appendix

A basic outlier removal function has been implemented in Spark using Scala. The validation function, termed 'validate', acts as a parameter for the 'OutlierRemoval' class. The primary method, named 'apply', retrieves the raw employee records from a CSV file named 'srcFilename', applies the specified filter, and then saves the results to another CSV file, called 'destFileName'.

final class OutlierRemoval(validate: Employee => Boolean)(implicit sparkSession: SparkSession){
    
  def apply(srcFilename: String, destFilename: String): Unit = try  {
      import sparkSession.implicits._

      // Load data from HDFS
      val employeeDS = sparkSession.read.option("delimiter", ",").csv(srcFilename).map(Employee(_))
      employeeDS.show(5)
      // Filter out the outlier
      val filteredEmployeeDS = employeeDS.filter(validate(_))
      filteredEmployeeDS.write.csv(destFilename)
    }
    catch {
      case e: Exception => logger.error(s"ERROR: ${e.getMessage}")
    }
}

object OutlierRemoval {
  // Default constructor
  def apply()(implicit sparkSession: SparkSession): OutlierRemoval = new OutlierRemoval(default_validate)

  private val valid_employee_role = Set[String](
    "intern", "engineer", "manager", "director", "VP"
  )

  private val default_validate = (employee: Employee) => {
    valid_employee_role.contains(employee.role) && employee.score <= 10 &&
    employee.score > 0 && employee.tenure > 0.0 && employee.tenure < 50.0
  }
}

Main function to be called by Spark submit script.

object Cleanser extends App{
    // Initialize the context
  implicit val sparkSession: SparkSession = {
     val conf = new SparkConf().setMaster("local[8]")
     SparkSession.builder()
         .appName("sparkjob")
         .config(conf)
         .getOrCreate()
  }

  val srcFilename = args(0)
  private val outputFilename = args(1)
  private val outlierRemoval = OutlierRemoval()
    // Apply the filter to remove outliers
  outlierRemoval(srcFilename, outputFilename)
    // Cleanup
sparkSession.close() }


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3



Wednesday, November 9, 2022

Enhance Stem-Based BERT WordPiece Tokenizer

Target audience: Advanced
Estimated reading time: 4'


Table of contents
Workpiece tokenizer
Follow me on LinkedIn

Notes:
  • Environments: Java 11, Scala 2.12.11
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.



One important challenge for any NLP task is to validate parsed words against a vocabulary (set of words or tokens). What happen is a word is not defined in the vocabulary? Is the word valid or a typo? Can it be derived from words in the current vocabulary?
The Bidirectional Encoder Representations from Transformers (BERT) uses a tokenizer that address this issue.


WordPiece tokenizer

The purpose of the WordPiece tokenizer is to handle out-of-vocabulary words that have not been identified and recorded in the vocabulary. This tokenizer and any of its variant are used in transformer models such as BERT or GPT.

Here are the steps for processing a word using the WordPiece tokenizer, given a vocabulary:

1    Check whether the word is present in our vocabulary

1a  If the word is present in the vocabulary, then use it as a token

1b  If the word is not present in the vocabulary, then split the word 

      into sub-words

2    Check whether the sub-word is present in the vocabulary.

2a  If the sub-word is present in the vocabulary, then use it as a token

2b  If the sub-word is not present in the vocabulary, then split the 

      sub-word

3    Repeat from 2



In the following example, two out of vocabulary words, obomasum and ablasting are matched against a vocabulary. The first word, obomasum is broken into two sub-words which each belong to the vocabulary. ablasting is broken into 3 sub-words for which only blast is found in the vocabulary.



Stem-based variant 

Let's modify the WordPiece tokenizer to leverage stemsStemming is the process of reducing a word to its stem. For instance the words "programmer", "programmers", "programming" and "programs" share the same stem/root, "program".



We can leverage this concept in upgrading the WordPiece tokenizer by building the original vocabulary using stems only.  The vocabulary is created by parsing all words from a large corpus, extracting their stems and populate the vocabulary

Let's re-implement the WordPiece tokenizer using a pre-defined vocabulary of stems


Java implementation

The following implementation in Java can be further generalized by implementing a recursive method to extract a stem from a sub-word.

import java.util.ArrayList;
import java.util.List;


public class StemWordPieceTokenizer {
  private int maxInputChars = 0;
 
  public StemWordPieceTokenizer(int maxInputChars) {
      this.maxInputChars = maxInputChars;
  }

  List<String> stemTokenizer(String sentence) {
     List<String> outputTokens = new ArrayList<>();
     String[] tokens = sentence.split("\\s+");

     for(String token: tokens) {

         // If the token is too long, ignore it
if(token.length() > maxInputChars) outputTokens.add("[UNK]");

         // If the token belongs to the vocabulary
else if(vocabulary.contains(token)) outputTokens.add(token); else { char[] chars = token.toCharArray(); int start = 0; int end = 0; while(start < chars.length-1) { end = chars.length; while(start < end) { String subToken = token.substring(start, end);

               // If the sub token is found in the vocabulary
if(vocabulary.contains(subToken)) { String prefix = token.substring(0, start);

                   // If the substring prior the token 
                   // is also contained in the vocabulary
if(vocabulary.contains(prefix)) outputTokens.add(prefix);

                   // Otherwise added as a word piece
                   else if(!prefix.isEmpty())
                      outputTokens.add("##" + prefix);
                }
                outputTokens.add(subToken);
              
                // Extract the substring after the token
                String suffix = token.substring(end);
                   if(!suffix.isEmpty()) {
                      // If this substring is already in the vocabulary..
if (vocabulary.contains(suffix)) outputTokens.add(suffix); else outputTokens.add("##" + suffix); } end = chars.length; start = end; } } } }

    return outputTokens;
  }
}

  

Scala implementation

For good measure, I include a Scala implementation. 

def stemTokenize(sentence: String): List[String] = {

  val outputTokens = ListBuffer[String]()
  val tokens = sentence.trim.split("\\s+")

  tokens.foreach(
    token => {
      // If the token is too long, ignore it
       if (token.length > maxInputChars) 
          outputTokens.append("[UNK]")
        // If the token belongs to the vocabulary
       else if (vocabulary.contains(token)) 
          outputTokens.append(token)

        // ... otherwise attempts to break it down 
       else {
          val chars = token.toCharArray
          var start = 0
          var end = 0

            // Walks through the token
          while (start < chars.length - 1) {
            end = chars.length

            while (start < end) {
               // extract the stem
              val subToken = token.substring(start, end)
                // If the sub token is found in the vocabulary
              if (vocabulary.contains(subToken)) {
                val prefix = token.substring(0, start)
                // If the substring prior the token 
                // is also contained in the vocabulary
                if (vocabulary.contains(prefix))
                   outputTokens.append(prefix)

                // Otherwise added as a word piece
else if(prefix.nonEmpty) outputTokens.append(s"##$prefix") outputTokens.append(subToken)

                  // Extract the substring after the token
                val suffix = token.substring(end)
                if (suffix.nonEmpty) {
                    // If this substring is already in the vocabulary..
                   if (vocabulary.contains(suffix)) {
                     outputTokens.append(suffix)
                     // otherwise added as a word piece
                   } 
                   else if(suffix.nonEmpty)
                     outputTokens.append(s"##$suffix")
                }
                end = chars.length
                start = chars.length
             }
             else
               end -= 1
           }
           start += 1
        }
      }
    }
  )
  outputTokens
}


Thank you for reading this article. For more information ...

Environments: JDK 11, Scala 2.12.8


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3