Tuesday, October 24, 2023

Boost Real-time Processing with Spark Structured Streaming

Target audience: Intermediate
Estimated reading time: 5'

Conventional distributed batch processing systems fall short in supporting applications like social media platforms, Internet of Things, or business-to-consumer online transactions. Fortunately, Apache Structured Streaming equips software developers with the necessary tools for large-scale, real-time stream processing.

This article delves into how the classic Extract-Transform-Load (ETL) pipeline is implemented within the realm of real-time streaming data.


What you will learn: How to implement real-time ETL using Spark Structured Streaming.

Table of contents
       Apache Spark
      Setup
      Transformation 
      Action
      Streams wrapper
      Extract
      Transform 
      Load
Follow me on LinkedIn
Notes:
  • It's presumed that the reader has a basic understanding of the Apache Spark framework
  • Environment Scala 2.12.11, Apache Spark 3.4.0, Spark streaming 3.4.0
  • Error handling and comments in source code has been omitted for sake of clarity.

Introduction

Apache Spark

Apache Spark is an open-source distributed computing system designed for handling large-scale data processing  [ref 1]. It leverages in-memory caching and refined query execution strategies to deliver rapid analytic responses, regardless of the data's size.

Spark streamlines the process by requiring only a single step: data is loaded into memory, operations are executed, and outcomes are written back, leading to significantly quicker execution. Additionally, Spark enhances efficiency in machine learning algorithms by caching data in memory, allowing for rapid repeated function calls on the same dataset.


Structured streaming

Apache Spark Structured Streaming is a stream processing framework that's both scalable and resilient to faults, built atop the Spark SQL engine. Its approach to streaming computation mirrors the batch processing model applied to static datasets. The Spark SQL engine manages the task of executing this process incrementally and perpetually, updating the outcomes as new streaming data flows in [ref 2].

In contrast with Spark's original streaming library that relied on RDDs, Structured Streaming facilitates processing based on event time, incorporates watermarking features, and utilizes the DataFrame API that is a part of Spark SQL.

Spark Streaming processes incoming data by splitting it into small batches, which are executed as Resilient Distributed Datasets (RDDs). On the other hand, Structured Streaming operates on a DataFrame linked to an infinite table, using an API that's fine-tuned for enhanced performance [ref 3].

Streaming components

In this section, we'll provide a concise overview of the essential elements of Spark Streaming that are employed in any Extract-Transform-Load (ETL) process.

Setup 

To develop a structured streaming application, at least three Spark libraries, in the form of jar files, are essential: Core, SQL, and Streaming. The Maven pom.xml snippet provided below demonstrates how to set up these three libraries:

<spark.version>3.4.0</spark.version>

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.12</artifactId>
   <version>${spark.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql_2.12</artifactId>
   <version>${spark.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming_2.12</artifactId>
   <version>${spark.version}</version>
</dependency>

Important note: The artifact for Spark structured streaming with input from Kafka is spark-streaming-kafka-0-10_2.12.

Our use case utilizes Spark's transformations and actions to construct an ETL (Extract, Transform, Load) pipeline.
A transformation refers to any operation in Spark that yields a DataFrame or Dataset, and is executed in a lazy manner, meaning it's not computed immediately.
An action, on the other hand, prompts a computation to deliver a result, thereby initiating the execution of all prior transformations in the sequence.

Transformation

The class STransform defined the data transformation of DataFrame (map function) using SQL, syntax.

The class attributes are:
  • selectFields: List of fields to display
  • whereConditions: WHERE conditions  if not empty
  • transformFunc: DataFrame transformation function DataFrame => DataFrame
  • descriptor: Optional descriptor
The selects fields and whereConditions are concatenate for the SQL statement. There is no validation of the generated SQL query prior execution.

TransformFunc = (DataFrame, String) => DataFrame

class STransform(
   selectFields: Seq[String],
   whereConditions: Seq[String],
   transformFunc: TransformFunc,
   descriptor: String = ""
){
   def apply(df: DataFrame): DataFrame = transformFunc(df, queryStmt)

   def queryStmt: String = {
      val whereConditionStr = if (whereConditions.nonEmpty) s"WHERE ${whereConditions.mkString("AND ")}" else ""
      s"SELECT ${selectFields.mkString(",")} FROM temptable $whereConditionStr"
   }
}


Action

The class SAggregator wraps the group by operation with an aggregation function.
  • groupByCol: Column used for grouping (groupBy)
  • aggrCol: Column used by the aggregation function
  • aggrFunc: Aggregation function that convert a Column into another Column
  • aggrAliasName: Alias name for the aggregated values
AggrFunc = Column => Column

class SAggregator(
   groupByCol: Column,
   aggrCol: Column,
   aggrFunc: AggrFunc,
   aggrAliasName: String
){
   def apply(inputDF: DataFrame): DataFrame = 
     inputDF.groupBy(groupByCol).agg(aggrFunc(aggrCol).alias(aggrAliasName))
}

Streams wrapper

SparkStructStreams defines the generic wrapper trait for structured streaming with the minimum set of required attributes to describe any ETL-based pipeline.
Each specific ETL pipeline has to override the following variables:
  • outputMode  Mode for writer stream (i.e. Append, Update, ...)
  • outputFormat  Format used by the stream writer (json, console, csv, ...)
  • outputColumn Name of the aggregated column
  • isConsoleSink Flag to enabled Console sink for debugging purpose
  • transform  Optional transformation (input dataframe, SQL statement) => Output data frame
  • aggregator Optional aggregator with groupBy (single column) and sql.functions._ aggregation function.
trait SparkStructStreams{
  val outputMode: OutputMode
  val outputFormat: String
  val outputColumn: String
  val isConsoleSink: Boolean
  val transform: Option[STransform]
  val aggregator: Option[SAggregator]
}

ETL

Streaming pipeline

Our data pipeline implements the conceptual Extract-Transform-Load pattern. 
The extraction consists of reading the data stream from HDFS in JSON format. The two fundamental types of data processing tasks in Apache Spark are transformations (map) and actions (reduce). They implements the transform section of the pipeline.
Finally the data stream is written into sink as CSV file, implementing the Load task. 

Spark streaming ETL data pipeline

We wrap the streaming pipeline into a class, SparkStructStreaminFromFile inherited from SparkStructStreams to which we add the path of the source, folderPath and an implicit reference to the SparkSession.
As the transform and aggregation tasks rely on SQL statements, we need to extract the schema from the data source. The data source consists of JSON files  so we infer the schema from the first record.

class SparkStructStreamsFromFile (
   folderPath: String,  // Absolute path for the source file
   override val outputMode: OutputMode, // Mode for writer stream (i.e. Append, Update, ...)
   override val outputFormat: String, //  Format used by the stream writer (json, console, csv, ...)
   override val outputColumn: String, // Name of the aggregated column
   override val isConsoleSink: Boolean,
   override val transform: Option[STransform], // Transformation (DataFrame, SQL) =>  DataFrame
   override val aggregator: Option[SAggregator] // groupBy (single column) +  sql.functions._
)(implicit  sparkSession: SparkSession)  extends SparkStructStreams {
    
     // Extract schema from files
  lazy val schema: StructType = 
    sparkSession.read.json(s"hdfs://${folderPath}").head().schema


  def execute(): Unit = {
        
   // --------------------   EXTRACT ------------------------
          // Step 1: Stream reader from a file 'folderPath'
    val readDF:  DataFrame = sparkSession
        .readStream
        .schema(schema)
        .json(s"hdfs://$folderPath")
    assert(readDF.isStreaming)


    // -----------------  TRANSFORM ---------------------
          // Step 2: Transform
    val transformedDF: DataFrame = transform.map(_(readDF)).getOrElse(readDF)

         // Step 3: Aggregator
    val aggregatedDF = aggregator.map(_(transformedDF)).getOrElse(transformedDF)

         // Step 4: Debugging to console
     aggregatedDF.writeStream.outputMode(OutputMode.Complete()).format("console").start()


    //--------------------  LOAD ---------------------------
         // Step 5: Stream writer into a table
    val query = aggregatedDF
        .writeStream
        .outputMode(OutputMode.Update())
        .foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
             batchDF.persist()
             batchDF.select(outputColumn)
                .write
                .mode(SaveMode.Overwrite)
                .format(outputFormat)
                .save(path = s"hdfs://output/$outputFormat")
             batchDF.unpersist()
         }
         // Step 6: Initiate streaming
      .trigger(Trigger.ProcessingTime("4 seconds"))
      .start()

    query.awaitTermination()
  }
}

The method execute implements the logic of the streaming pipeline. There are 6 steps
  1. Read stream from a set of JSON file located in  'folderPath'' into a data frame, readDF. The schema is inferred from the first JSON record in the constructor
  2. Apply the transformation on extracted data frame, readDF
  3. Apply the Spark action on the transformed data frame, transformedDF, on grouped data. 
  4. Use console sink to stream debugging information
  5. Stream the aggregated data, aggregatedDF into CSV files using a stream writer in Update mode.
  6. Initiate the streaming process

Extract

The extraction of data consists of loading the JSON data  into a partitioned data frame, df through API method, readStream.
df = sparkSession.readStream.schema(mySchema).json(path)

Transform

The transformation, myTransformFunc, convert the data frame using extracted data, readDF and SQL query, sqlStatement, to execute:  SELECT age, gender FROM table where age > 18; The result of the query is stored into a temporary view, 'temptable'.

def myTransformFunc(
  readDF: DataFrame,
  sqlStatement: String
)(implicit sparkSession: SparkSession): DataFrame = {
  readDF.createOrReplaceTempView("TempView")
  sparkSession.sql(sqlStatement)
}

val myTransform = new STransform(
   Seq[String]("age","gender"),
   Seq[String]("age > 18"),
   myTransformFunc,
   "Filter by age"
)


The second step is to compute the average age of grouped data as SELECT  gender, avg(age) FROM TempView GROUP BY gender;

def aggregatedFunc(inputColumn: Column): Column = {
  import org.apache.spark.sql.functions._
  avg(inputColumn)
}

val myAggregator = new SAggregator(
   new Column("gender"),
   new Column("age"),
   aggregatedFunc,
   "avg_age"
)

Load

The final task is to write the CSV file sink.

df.writeStream.outputMode(Update())
  .foreachBatch
    (df: DataFrame, batchId: Long) =>  
          df.persist()
          df.select('ave_age').write.mode(Overwrite).format("css").save(path)
          df.unpersist()
 }.trigger(Trigger.ProcessingTime("4 seconds")).start()

The foreachBatch function enables developers to define a specific operation to be applied to the output data from each micro-batch within a streaming query. However, this function cannot be used in continuous processing mode, where foreach would be the suitable alternative.

The mode defines the procedure to update the unbounded result table:
  • Append mode (Default) Rows are appended to the result table for query (select, where, map, flatMap, filter, join
  • Complete mode: The result table is output after a trigger, required for aggregation.
  • Update mode: Only the rows in the results table added since the last trigger event are output to the sink.

Putting all together

SparkStructStreamsFromFile(
   path,
   OutputMode.Update(),
   outputFormat = "csv",
   outputTable = "avg_age",
   debug = true,
   myTransform,
   myAggregator
 ).execute()

The output of the streaming pipeline in CSV format is 
gender, avg(age)
male,36
female,33


Thank you for reading this article. For more information ...

References




---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Monday, October 9, 2023

Tableau-like Visualization with PyGWalker

Target audience: Beginner
Estimated reading time: 3'
Ever thought about presenting test results in a format similar to Tableau, one that management is acquainted with? A visually appealing, business-centric display can effectively convey messages. 
In this article, we delve into the PyGWalker Python library, which mirrors the interactive visualization style of Tableau, especially when it comes to geospatial graphics.


Table of contents
      Installation

Follow me on LinkedIn
Notes:
  • Environments: Python 3.10, Pandas 2.12, PyGWalker 0.3.9, Pedantic 2.4.2, GeoPy 2.4.0
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.

Introduction

PyGWalker, which stands for Python binding for Graphic Walker, is a visual representation library in Python, designed to work seamlessly with Jupyter-style notebooks for data probing and assessment [ref 1]. 
This library renders a user interface reminiscent of Tableau [ref 2], generated directly from pandas data frames. Its user-friendly interface facilitates pattern visualization and analysis through effortless drag-and-drop actions.

Please refer to an older post, Setup Tableau with Amazon EMR-Spark for more information about Tableau configuration and deployment [ref 3].

Installation

pip:  pip install pygwalker --upgrade
conda: condo install -c condo-force pygwalker 
JupyterLab:  pip install jupyterlab
Notebook: pip install notebook

Integration with Jupyter notebook

PyGWalker utilizes the Jupyter engine to produce an interactive user interface (UI) that resonates with the business community. Unlike Matplotlib, where visualization is code-centric, in PyGWalker, plot customization is managed directly through the UI. Thus, Python's role is mostly restricted to setting up and initiating the UI. 

After importing the necessary libraries and setting up the Pandas data frame, activating the UI is just a single line of code away.

import pandas as pd
import pygwalker as pyg

my_csv_file = 'input/locations.csv'
df = pd.read_csv(my_csv_file)

  # Launch the PyGWalker interactive UI
walker = pyg.walk(df)


Use case

Depicting geospatial data graphically can be quite daunting. In this context, we aim to illustrate the spread of tech and science firms throughout California.

Our scenario leverage GeoPy, a Python tool that interfaces with multiple renowned geocoding online platforms [ref 4].
This tool streamlines the process for Python programmers, allowing them to determine the coordinates of various locations—be it addresses, cities, nations, or significant landmarks—by utilizing independent geocoders and assorted data resources. Notably, while GeoPy supports geocoding from platforms like Google Maps, Bing Maps, and Nominatim, it maintains no direct affiliations with any of them.

installation: pip install geopy

In our case, the data comes in a straightforward 2-column table, detailing the city names and the count of tech/science enterprises, labeled as 'num_companies'. We've established a class, 'TechCity', which incorporates additional attributes – longitude and latitude – facilitating the data's visualization on a geographical map.

from typing import AnyStr, TypeVar, List
from dataclasses import dataclass

@dataclass
class TechCity:
  city: AnyStr
  num_companies: float
  longitude: float
  latitude: float

  @staticmethod
  def header() -> List[AnyStr]:
     return ['city', 'num_companies', 'longitude', 'latitude']


Following that, we establish a generator class named 'TechCitiesGenerator' that transforms the input data (comprising city names and the 'num_companies' for each city) into 'TechCity' instances for display purposes.
We employ the Nominatim geolocation service, set up during the class construction. Nominatim taps into OpenStreetMap data to pinpoint locations globally by either name or address (a process called geocoding) [ref 5].

The procedure __call__,  can be broken down into three steps:
  1. Create a 'TechCity' instance.
  2. Transition these instances into a dictionary format.
  3. Archive this dictionary as a CSV or JSON file.
class TechCitiesGenerator(object):
  """ 
  Generate the input to PyGWalker table with geo-location data
       :param cities List of cities with significant number of tech/science companies
       :param num_companies List of number of companies associated with each city
       :param filename Name of the output file (CSV or JSON)
  """
  def __init__(self, cities: List[AnyStr], num_companies: List[int], filename: AnyStr):
    from geopy.geocoders import Nominatim

    self.filename = filename
    self.cities = cities
    self.num_companies_lst = num_companies
    self.loc = Nominatim(user_agent='Geopy Library')

  def __call__(self) -> bool:
    import csv
    import logging

    # Step 1: Generate the records of type TechCity
    tech_cities = [
       TechCity(city, num_companies, self.loc.geocode(city).longitude, self.loc.geocode(city).latitude)
        for index, (city, num_companies)
        in enumerate(zip(self.cities, self.num_companies_lst))
    ]
    # Step 2: Convert to list into a dictionary
    records = [vars(tech_city) for tech_city in tech_cities]
    
    # Step 3: Store the dictionary in CSV or JSON format, give the file name
    try:
       match self.filename[-4:]:
         case '.csv': 
            with open(self.filename, 'w') as f:
               writer = csv.DictWriter(f, fieldnames=TechCity.header())
               writer.writeheader()
               for record in records:
                   writer.writerow(record)
            return True

         case 'json':
            import json
                    
            json_repr = json.dumps(records, indent=4)
            with open(self.filename, 'w') as f:
                 f.write(json_repr)
            return True

         case _:
            logging.error(f'Extension for {self.filename} is incorrect')
            return False
   '
   except Exception as e:
       logging.error(f'Failed to store object {str(e)}')
       return True


Output

The most basic visualization is a table akin to standard Tableau worksheets, where the columns depict the four attributes of the TechCity class, and each row corresponds to an individual instance.


Tabular representation of TechCity instances


The display below showcases PyGWalker's ability to map the count of companies across the cities listed in the table, superimposed on a geographical layout. Achieving this visualization involves three straightforward steps:
  1. Choose 'Geographic' for the Coordinate System.
  2. Drag the longitude (and subsequently, latitude) column to the respective 'Longitude' (and 'Latitude') fields.
  3. Drag the 'num_companies' column, representing the number of companies, into the size field.

That's it.


Tableau-like geospatial representation of number of 
tech & science companies for California cities

Conclusion

Effective communication of findings between data scientists and stakeholders is pivotal for any project's triumph. PyGWalker equips engineers with the ability to represent model outcomes in a style reminiscent of Tableau, a platform that many executives recognize, right within their notebooks.

Additionally, PyGWalker's visualization approach is both instinctive and interactive, sidestepping the clutter that additional coding can sometimes introduce in notebooks

Thank you for reading this article. For more information ...

References





---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3