Table of contents
- In order to demonstrate the flexibility of Airflow orchestration capability, our workflow combines process implemented in Python, Bash and Spark/Scala.
- The implementation of the Spark application used in the article is included in the Appendix for reference.
- Environments: Python 3.10, Apache Airflow 2.7.0, Apache Spark 3.3.1, Scala 2.13.2
- To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.
Introduction
Key concepts
- Purpose: Airflow serves as a platform designed for the programmatic creation, scheduling, and monitoring of workflows. With its ability to define workflows via Python code, it offers significant flexibility and adaptability.
- Components: The core components of Airflow include a visualization web server, a scheduler that oversees job execution based on triggers or schedules, and workers responsible for task execution.
- Use cases: Typical applications of Airflow encompass ETL (Extract, Transform, Load) processes, data analysis workflows, and regularly occurring tasks.
- Execution graph: Workflows in Airflow are framed using Directed Acyclic Graphs (DAG). Within these DAGs, each node denotes a task, while the connecting edges symbolize task dependencies.
Setup
export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.7.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL=https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_ VERSION}/constraints-${PYTHON_VERSION}.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL"}
airflow standalone
- Airflow uses SQLite database by default. For production deployment uses MySQL or PostgreSQL database (not SQLite) and do not use SequentialExecutor.
- Default users and password are defined in $AIRFLOW_HOME/standalone_admin_passwords.txt
Operations
Stop airflow: Ctrl-C inside the terminal running airflow
pkill -f “airflow-scheduler”
Use case
valid_employee_role = ['intern', 'engineer', 'manager', 'director', 'VP']
"""
Data class for employee record
"""
@dataclass
class Employee:
name: AnyStr
role: AnyStr
score: int
tenure: float
- Generate employee records with random attributes for position, rating and seniority to the stored on HDFS (generate_to_hdfs)
- Load records from HDFS (load_from_hdfs)
- Remove outliers records using Apache Spark defined by condition: Valid role, 0 < score <= 10 and 0.0 < tenure < 50.0 (spark_job)
Orchestration
- Define the directed acyclic graph (DAG) for execution with the appropriate scheduling, retries,... arguments
- Invoke the appropriate operators which define each of the 3 tasks
- Specify the order of tasks in the workflow
from airflow.decorators import task
from airflow import DAG
from datetime import datetime, timedelta
from typing import List, Optional
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
import logging
# Step 1: Define the DAG with appropriate arguments
default_args = {
'owner': 'airflow',
'retries': 4,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
dag_id='spark_eval_v1',
default_args=default_args,
start_date=datetime(2023, 10, 3),
schedule_interval='@hourly',
catchup=False,
tags=['first_version']
)
# Step 2: Create the 3 operators
with dag:
"""
Task 1 generates randomized employees
"""
loaded = BashOperator(
task_id='generate_to_hdfs',
bash_command=f'python3 {data_source_executor} generate'
)
"""
Task 2 loads the randomly generated data from HDFS
"""
employees = BashOperator(
task_id='load_from_hdfs',
bash_command=f'python3 {data_source_executor} load'
)
"""
Task 3 Launch the Spark job to process
"""
spark_job = SparkSubmitOperator(
task_id='spark_job',
conn_id='spark_default',
application=spark_jar_path,
java_class='org.batcheval.Cleanser',
conf={ # To be loaded from file
'master: 'local[8]',
'spark.driver.memory': '18g',
'spark.executor.memory': '24g',
'spark.default.parallelism': '16',
'spark.local.dir': 'temp',
'spark.network.timeout': '2100',
'spark.driver.maxResultSize': '524288',
'spark.driver.cores': 4,
'spark.executor.cores': 3
},
application_args=[file_name]
)
# Step 3: Specify the workflow
loaded >> employees >> spark_job
Defining the DAG
- Directed: When there are several tasks, each task should have a designated upstream or downstream connection.
- Acyclic: Tasks should not depend on themselves, ensuring there are no never-ending loops.
- Graph: All tasks can be graphically represented, with the interconnections between them indicated by nodes and edges.
Creating the operators
Operators can be categorized into three types:
- Action operators: These initiate or set off specific actions, such as BashOperator, PythonOperator, or TriggerDagRunOperator.
- Transfer operators: These are stateful and handle data migration from one spot to another.
- Sensors: Derived from BaseSensorOperator, these operators pause execution until a certain condition or operation concludes.
- Build a Spark jar file.
- Install apache-airflow-providers-apache-spark package.
- Implement the SparkSubmitOperator [See below]
- Configure the Spark connection in Airflow. UI/Admin/Connections, id=spark_default, type: Spark, host/port
- application: Path to the Jar file
- java_class: Name of the class (Scala in our case) invoked by Spark submit
- conf: Spark dynamically loaded properties defined in SparkConf or provided as part of the Spark submit command line arguments.
- application_args: List of application specific arguments to the Spark submit command line.
Specifying the workflow
Tasks implementation
Data generator
class DataOperator(object):
def __init__(self, input_file: AnyStr):
self.input_file = input_file
@staticmethod
def generate_and_save(this_file_name: AnyStr, num_entries: int) -> bool:
"""
Generate employee records with random attributes (score, tenure,...)
:param this_file_name HDFS file name, the records are stored
:param num_entries Number of records to generate
:return True if the records are correctly generated and save, False otherwise
"""
try:
records = [f'emp_{i},{valid_employee_role[i % len(valid_employee_role)]},{int(random.uniform(1, 10))},{random.uniform(0, 25):.1f}'
for i in range(num_entries)]
header = 'name,role,score,tenure\n'
content = header + '\n'.join(records)
# State the records
with open(this_file_name, 'w') as f:
f.write(content)
return True
except Exception as e:
task_logger.error(f'generate and save failed {str(e)}')
return False
Data loader
def load(self) -> List[Employee]:
try:
# Load the records from a CSV file
with open(self.input_file, 'rt') as f:
csv_reader = csv.reader(f, delimiter=',', quotechar='\'')
# Build the list of employee records to be cleansed.
employee_list = []
for idx, attributes in enumerate(csv_reader):
if idx > 0:
employee_list.append(DataOperator.__get_employee(attributes))
return employee_list
except Exception as e:
task_logger.error(f'Loader failed: {str(e)}')
return []
@staticmethod
def __get_employee(fields: List[AnyStr]) -> Employee:
return Employee(
fields[0], # Name of employee
fields[1], # Role of employee in the organization
int(fields[2]), # Score of the employee from performance review
float(fields[3]) # Tenure or seniority of the employee
)
Data cleanser
Airflow best practices
- View Airflow as a task orchestrator, not an execution platform. Delegate heavy processing to execution frameworks and maximize the use of ETL processes.
- Refrain from importing large datasets directly into a task.
- Segment intricate tasks (or elaborate Python functions encapsulated by the PythonOperator) into more manageable units to simplify debugging.
- Utilize template variables using the Jinja template format {{...}}.
- Maintain a one-to-one ratio of DAGs to Python files.
- Steer clear of top-level code in your DAG files as it triggers execution during each DAG parse.
Conclusion
- BMC's Control-M: This is a workflow orchestration solution that places a strong focus on adhering to Service Level Agreements [ref 5].
- Flyte: Originally created at Lyft, this is a scalable platform for automating workflows [ref 6].
- Y42: This platform offers a comprehensive solution for data orchestration, featuring an SQL design interface [ref 7].
- Shipyard App: A data orchestration tool emphasizing code reusability, package management, version control, and observability [ref 8].
References
Appendix
final class OutlierRemoval(validate: Employee => Boolean)(implicit sparkSession: SparkSession){
def apply(srcFilename: String, destFilename: String): Unit = try {
import sparkSession.implicits._
// Load data from HDFS
val employeeDS = sparkSession.read.option("delimiter", ",").csv(srcFilename).map(Employee(_))
employeeDS.show(5)
// Filter out the outlier
val filteredEmployeeDS = employeeDS.filter(validate(_))
filteredEmployeeDS.write.csv(destFilename)
}
catch {
case e: Exception => logger.error(s"ERROR: ${e.getMessage}")
}
}
object OutlierRemoval {
// Default constructor
def apply()(implicit sparkSession: SparkSession): OutlierRemoval = new OutlierRemoval(default_validate)
private val valid_employee_role = Set[String](
"intern", "engineer", "manager", "director", "VP"
)
private val default_validate = (employee: Employee) => {
valid_employee_role.contains(employee.role) && employee.score <= 10 &&
employee.score > 0 && employee.tenure > 0.0 && employee.tenure < 50.0
}
}
object Cleanser extends App{
// Initialize the context implicit val sparkSession: SparkSession = { val conf = new SparkConf().setMaster("local[8]") SparkSession.builder() .appName("sparkjob") .config(conf) .getOrCreate() } val srcFilename = args(0) private val outputFilename = args(1) private val outlierRemoval = OutlierRemoval()
// Apply the filter to remove outliers
outlierRemoval(srcFilename, outputFilename)
// Cleanup
sparkSession.close()
}