Wednesday, November 22, 2023

Tracking Storms with Kafka/Spark Streaming

Target audience: Intermediate
Estimated reading time: 4'

“There is peace even in the storm.” Vincent Van Gogh

Certain challenges in our everyday life demand immediate attention, with severe storms and tornadoes being prime examples. This article demonstrates the integration of Kafka event queues with Spark's distributed structured streaming, crafting a powerful and responsive system for predicting potentially life-threatening weather events.


Table of contents
Streaming frameworks
      Apache Kafka
      Apache Spark
      Spark structured streaming
Use case
      Overview
      Data streaming pipeline
Implementation
      Weather data sources
      Weather tracking pipeline
References
Follow me on LinkedIn
Important notes:
  • Software versions utilized in the source code include Scala 2.12.15, Spark 3.4.0, and Kafka 3.4.0. 
  • This article focuses on the data streaming pipeline. Explaining the storm prediction model falls outside the article's range. 
  • You can find the source code on the Github-Streaming library [ref 1]
  • Note that error messages and the validation of method arguments are not included in the provided code examples.

Streaming frameworks

The goal is to develop a service that warns local authorities about severe storms or tornadoes. Given the urgency of analyzing data and suggesting actions, our alert system is designed as a streaming data flow.

The main elements include:
  • A Kafka message queue, where each monitoring device type is allocated a specific topic, enabling the routing of different alerts to the relevant agency.
  • The Spark streaming framework to handle weather data processing and to disseminate forecasts of serious weather disruptions.
  • A predictive model specifically for severe storms and tornadoes.

Apache Kafka 

Apache Kafka is an event streaming platform enabling:
  • Publishing (writing) and subscribing (reading) to event streams, as well as continuously importing/exporting data from various systems.
  • Durable and reliable storage of event streams for any desired duration.
  • Real-time or retrospective processing of event streams.
Kafka is accessible as an open-source library [ref 2] or through commercial cloud services like Confluence [ref 3].

You can start and stop the Kafka service using shell scripts in the following way:

zookeeper-server-stop
kafka-server-stop
sleep 2

zookeeper-server-start $KAFKA_ROOT/kafka/config/zookeeper.properties &
sleep 1
ps -ef | grep zookeeper
kafka-server-start $KAFKA_ROOT/kafka/config/server.properties &
sleep 1
ps -ef | grep kafka

For testing purpose, we deploy Apache Kafka on a local host listening to the default port 9092. Here are some useful commands
To list existing topic
    kafka-topics --bootstrap-server localhost:9092 --list
To create a new topic (i.e. doppler)
    kafka-topics 
        --bootstrap-server localhost:9092 
        --topic doppler 
        --create 
        --replication-factor 1 
        --partitions 2
To list messages or event current queued for a given topic (i.e., weather)
    kafka-console-consumer
        --topic weather 
        --from-beginning 
        --bootstrap-server localhost:9092

Here is an example of libraries required to build a Kafka consumer and producer application.

<scala.version>2.12.15</scala.version>
<kafka.version>3.4.0</kafka.version>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>${kafka.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams-scala_2.12</artifactId>
  <version>${kafka.version}</version>
</dependency>


Apache Spark

Apache Spark is a free, open-source framework for cluster computing, specifically designed to process data in real time via distributed computing. Its primary applications include:
  • Analytics: Spark's capability to quickly produce responses allows for interactive data handling, rather than relying solely on predefined queries.
  • Data Integration: Often, the data from various systems is inconsistent and cannot be combined for analysis directly. To obtain consistent data, processes like Extract, Transform, and Load (ETL) are employed. Spark streamlines this ETL process, making it more cost-effective and time-efficient.
  • Streaming: Managing real-time data, such as log files, is challenging. Spark excels in processing these data streams and can identify and block potentially fraudulent activities.
  • Machine Learning: The growing volume of data has made machine learning techniques more viable and accurate. Spark's ability to store data in memory and execute repeated queries swiftly facilitates the use of machine learning algorithms.
Here is an example of libraries required to build a Spark application:

<spark.version>3.3.2</spark.version>
<scala.version>2.12.15</scala.version>

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.12</artifactId>
   <version>${spark.version}</version>
</dependency>

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql_2.12</artifactId>
   <version>${spark.version}</version>
</dependency>

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
   <version>${spark.version}</version>
</dependency>

Spark structured streaming

Spark Structured Streaming, built atop Spark SQL, is a streaming data engine. It processes data in increments, continually updating outcomes as additional streaming data is received [ref 4]. A Spark Streaming application consists of three primary parts: the source (input), the processing engine (business logic), and the sink (output).

For comprehensive details on setting up, applying, and deploying the Spark Structured Streaming library, visit Boost real-time processing with Spark Structured Streaming.

In our specific scenario, data input is managed via Kafka consumers, and data output is handled through Kafka producers [ref 5].

Use case 

Overview

This use case involves gathering data from weather stations and Doppler radars, then merging these data sources based on location and time stamps. After consolidation, the unified data is sent to a model that forecasts potentially hazardous storms or tornadoes. The resulting predictions are then relayed back to the relevant authorities (such as emergency personnel, newsrooms, law enforcement, etc.) through Kafka.

The two sources of data are:
Weather station to collect temperature, pressure, humidity
Doppler radar for advanced data regarding wind intensity and direction

In practice, meteorologists receive overlapping data features from weather stations and Doppler radars, which they then reconcile and normalize. However, in our streamlined use case, we choose to omit these overlapping features.

Illustration of collection of data from weather stations and Doppler radars


The process for gathering data from weather data tracking devices is not covered in this context, as it is not pertinent to the streaming pipeline.

Data streaming pipeline

The monitoring streaming pipeline is structured into three phases:
  1. Kafka queue.
  2. Spark's distributed structured streams.
  3. A variety of storm and tornado prediction models, developed using the PyTorch library and accessible via REST API.
As noted in the introductory section, the PyTorch-based model is outside this post's focus and is mentioned here only for context.

Storm tracking streaming pipeline

Data gathered from weather stations and Doppler radars is fed into the Spark engine, where both streams are combined and harmonized based on location and timestamp. This unified dataset is then employed for training the model. During the inference phase, predictions are streamed back to Kafka.

Adding an intriguing aspect to the implementation, both weather and Doppler data are ingested from Kafka as batch queries, and the storm forecasts are subsequently streamed back to Kafka.


Implementation

Weather data sources

Weather tracking device
Initially, we need to establish the fundamental characteristics of weather tracking data points: location (longitude and latitude) and time stamp. These are crucial for correlating and synchronizing data from different devices. The details on how these data points are synchronized will be explained in the section Streaming processing: Data aggregation.

trait TrackingData[T <: TrackingData[T]]  {
self =>
   val id: String                 // Identifier for the weather tracking device
   val longitude: Float       // Longitude for the weather tracking device
   val latitude: Float          // Latitude for the weather tracking device
   val timeStamp: String   // Time stamp data is collected
}


Weather station
We collect temperature, pressure and humidity parameters from weather stations given their location (longitude, latitude) and time interval (timeStamp). Therefore a weather station record, WeatherData inherits tracking attributes from TrackingData.

case class WeatherData (
   override val id: String,              // Identifier for the weather station
   override val longitude: Float,    // Longitude for the weather station
   override val latitude: Float,       // Latitude for the weather station
   override val timeStamp: String = System.currentTimeMillis().toString,      
      // Time stamp data is collected
  
   temperature: Float,         // Temperature (Fahrenheit) collected at timeStamp
   pressure: Float,              // Pressure (millibars) collected at timeStamp
   humidity: Float) extends TrackingData[WeatherData]  {  // Humidity (%) collected at timeStamp


   // Random generator for testing
  def rand(rand: Random, alpha: Float): WeatherData = this.copy(
      timeStamp = (timeStamp.toLong + 10000L + rand.nextInt(2000)).toString,
      temperature = temperature*(1 + alpha*rand.nextFloat()),
      pressure = pressure*(1 + alpha*rand.nextFloat()),
      humidity = humidity*(1 + alpha*rand.nextFloat())
   )

   // Encoded weather station data produced to Kafka queue
  override def toString: String =
    s"$id;$longitude;$latitude;$timeStamp;$temperature;$pressure;$humidity"
}

We generate randomly values of some attributes for testing purpose using the formula \[x(1 + \alpha r_{[0,1]})\]  with alpha ~ 0.1 and r be a uniform distribution between 0 and 1.
The string representation with ; delimiter, is used to serialize the tracking data for Kafka.

Doppler radar
We retrieve wind-related measurements (such as wind shear, average wind speed, gust speed, and direction) from the Doppler radar for specific locations and time intervals. Similar to the weather station, the 'DopplerData' class is a sub-class of the 'TrackingData' trait.

case class DopplerData(
  override val id: String,                // Identifier for the Doppler radar
  override val longitude: Float,     // Longitude for the Doppler radar
  override val latitude: Float,        // Latitude for the doppler radar
  override val timeStamp: String, // Time stamp data is collected
  windShear: Boolean,                 // Is it a wind shear?
  windSpeed: Float,                     // Average wind speed
  gustSpeed: Float,                      // Maximum wind speed
  windDirection: Int) extends TrackingData[DopplerData] {

   // Random generator
  def rand(rand: Random, alpha: Float): DopplerData = this.copy(
     timeStamp = (timeStamp.toLong + 10000L + rand.nextInt(2000)).toString,
     windShear = rand.nextBoolean(),
     windSpeed = windSpeed * (1 + alpha * rand.nextFloat()),
     gustSpeed = gustSpeed * (1 + alpha * rand.nextFloat()),
     windDirection = {
        val newDirection = windDirection * (1 + alpha * rand.nextFloat())
        if(newDirection > 360.0) newDirection.toInt%360 else newDirection.toInt
     }
  )

  //  Encoded Doppler radar data produced to Kafka
  override def toString: String = s"$id;$longitude;$latitude;$timeStamp;$windShear;$windSpeed;" +
    s"$gustSpeed;$windDirection"
}


Weather tracking streaming pipeline

Streaming tasks
To enhance understanding, we simplify and segment the processing engine (business logic) into two parts: the aggregation of input data and the actual model prediction.

The streaming pipeline comprises four key tasks:
  • Source: Retrieves data from weather stations and Doppler radars via Kafka queues.
  • Aggregation: Synchronizes and consolidates the data.
  • Storm Prediction: Activates the model to provide potential storm advisories, communicated through a REST API.
  • Sink: Distributes storm advisories to the relevant Kafka topics.
class WeatherTracking(
  inputTopics: Seq[String],
  outputTopics: Seq[String],
  model: Dataset[ModelInputData] => Seq[WeatherAlert])(implicit sparkSession: SparkSession) {

  def execute(): Unit = 
    for {
       (weatherDS, dopplerDS) <- source                                 // Step 1
       consolidatedDataset <- synchronizeSources(weatherDS, dopplerDS)  // Step 2
       predictions <- predict(consolidatedDataset)                   // Step 3
       consolidatedDataset <- sink(predictions)                       // Step 3
    } yield { consolidatedDataset }
  

In every computational task within the streaming pipeline, exceptions are transformed into options. These are then serialized using the Scala for-comprehension statement.

Streaming source
The data is consumed as a data frame df, with key and value defines as string (1). 

Illustration of encoding/decoding weather tracking data

The key for the source is designated as W_${weather station id} for weather data, and D_{Doppler radar id} for Doppler data. The value consists of the encoded tracking data (toString) (2), which is then sorted by data sources (3). Subsequently, the weather tracking data is decoded into instances of Weather and Doppler radar data (4).

def source: Option[(Dataset[WeatherData], Dataset[DopplerData])] = try {
  import sparkSession.implicits._

  val df = sparkSession.read.format("kafka")
       .option("kafka.bootstrap.servers", "localhost:9092")
       .option("subscribe", inputTopics.mkString(","))
       .option("max.poll.interval.ms", 2800)
       .option("fetch.max.bytes", 32768)
.load(). // (1)

  val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
                  .as[(String, String)].                                                          // (2)

  \// Convert weather data stream into objects
  val weatherDataDS = ds .filter(_._1.head == 'W')                             // (3)
      .map{ case (_, value) => WeatherDataEncoder.decode(value) }  // (4)

  // Convert Doppler radar data stream into objects
  val dopplerDataDS = ds.filter(_._1.head == 'D')                                // (3)
      .map { case (_, value) => DopplerDataEncoder.decode(value) }  // (4)

  Some((weatherDataDS, dopplerDataDS))
} 
catch { case e: Exception => None }


The batching source's implementation for batch queries may include a range of Kafka configuration parameters for consumers, like max.partition.fetch.bytes, connections.max.idle.ms, fetch.max.bytes, or max.poll.interval.ms [ref 6].
Meanwhile, the decode method transforms a semicolon-delimited string into an instance of either WeatherData or DopplerData.
The method readStream would be used for data streamed consumed from Kafka.

Streaming processing: Data aggregation
The goal is to align data from weather stations and Doppler radar within a time frame of approximately ±20 seconds around the same timestamp. This timestamp is categorized in intervals of 20 seconds.

def synchronizeSources(
  weatherDS: Dataset[WeatherData],
  dopplerDS: Dataset[DopplerData]): Option[Dataset[ModelInputData]] = try {
   
  val timedBucketedWeatherDS = weatherDS.map(
      wData =>wData.copy(timeStamp = bucketKey(wData.timeStamp))
  )
  val timedBucketedDopplerDS = dopplerDS.map(
      wData => wData.copy(timeStamp = bucketKey(wData.timeStamp))
  )
      // Performed a fast, presorted join
  val output = sortingJoin[WeatherData, DopplerData](
      timedBucketedWeatherDS,
      tDSKey = "timeStamp",
      timedBucketedDopplerDS,
      uDSKey = "timeStamp"
  ).map {
    case (weatherData, dopplerData) =>
        ModelInputData(
          weatherData.timeStamp,
          weatherData.temperature,
          weatherData.pressure,
          weatherData.humidity,
          dopplerData.windShear,
          dopplerData.windSpeed,
          dopplerData.gustSpeed,
          dopplerData.windDirection
        )
   }
   Some(output)
} catch { case e: Exception => None }


The sortingJoin is a parameterized method that pre-sort data in each partition prior to joining the two datasets. The implementation is available at Github - Spark fast join implementation.
The data class for the input to the model in described in the appendix.

Streaming sink
Finally, the sink encodes the weather alert generated by the storm predictor. Contrary to the streaming source, the data is produced to Kafka topic as a data stream.

def sink(stormPredictions: Dataset[StormPrediction]): Option[Dataset[String]] = try {

   // Encode dataset of weather alerts to be produced to Kafka topic
  val encStormPredictions = stormPredictions.map(_.toString)

  encStormPredictions
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", outputTopics.mkString(",")
  ).start()

  Some(encStormPredictions)
} catch { case e: Exception => None }

The implementation of the sink can list various Kafka configuration parameters for producer such as buffer.memory, retries, batch.size, max.request.size or linger.ms [ref  7]. The method write should be used in the case the data is to be produced in batch to Kafka.

Thank you for reading this article. For more information ...

References


Appendix

Storm prediction input
case class ModelInputData(
   timeStamp: String,       // Time stamp for the new consolidated data, input to model
   temperature: Float,      // Temperature in Fahrenheit
   pressure: Float,           // Barometric pressure in millibars
   humidity: Float,           // Humidity in percentage
   windShear: Boolean,  // Boolean flag to specify if this is a wind shear
   windSpeed: Float,      // Average speed for the wind (miles/hour)
   gustSpeed: Float,       // Maximum speed for the wind (miles/hour)
   windDirection: Float   // Direction of the wind [0, 360] degrees
)

Storm prediction output
case class StormPrediction(
   id: String,                        // Identifier of the alert/message 
   intensity: Int,                  // Intensity of storm or Tornado [1, to 5]
   probability: Float,           // Probability of a storm of a given intensity develop
   timeStamp: String,         // Time stamp of the alert or prediction
   modelInputData: ModelInputData,// Weather and Doppler radar data used to generate/predict the alert
   cellArea: CellArea          // Area covered by the alert/prediction
)


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Monday, November 6, 2023

Boost Real-time Processing with Spark Structured Streaming

Target audience: Intermediate
Estimated reading time: 5'

Conventional distributed batch processing systems fall short in supporting applications like social media platforms, Internet of Things, or business-to-consumer online transactions. Fortunately, Apache Structured Streaming equips software developers with the necessary tools for large-scale, real-time stream processing.

This article delves into how the classic Extract-Transform-Load (ETL) pipeline is implemented within the realm of real-time streaming data.


What you will learn: How to implement real-time ETL using Spark Structured Streaming.

Table of contents
       Apache Spark
      Setup
      Transformation 
      Action
      Streams wrapper
      Extract
      Transform 
      Load
Follow me on LinkedIn
Notes:
  • It's presumed that the reader has a basic understanding of the Apache Spark framework
  • Environment Scala 2.12.11, Apache Spark 3.4.0, Spark streaming 3.4.0
  • Error handling and comments in source code has been omitted for sake of clarity.

Introduction

Apache Spark

Apache Spark is an open-source distributed computing system designed for handling large-scale data processing  [ref 1]. It leverages in-memory caching and refined query execution strategies to deliver rapid analytic responses, regardless of the data's size.

Spark streamlines the process by requiring only a single step: data is loaded into memory, operations are executed, and outcomes are written back, leading to significantly quicker execution. Additionally, Spark enhances efficiency in machine learning algorithms by caching data in memory, allowing for rapid repeated function calls on the same dataset.


Structured streaming

Apache Spark Structured Streaming is a stream processing framework that's both scalable and resilient to faults, built atop the Spark SQL engine. Its approach to streaming computation mirrors the batch processing model applied to static datasets. The Spark SQL engine manages the task of executing this process incrementally and perpetually, updating the outcomes as new streaming data flows in [ref 2].

In contrast with Spark's original streaming library that relied on RDDs, Structured Streaming facilitates processing based on event time, incorporates watermarking features, and utilizes the DataFrame API that is a part of Spark SQL.

Spark Streaming processes incoming data by splitting it into small batches, which are executed as Resilient Distributed Datasets (RDDs). On the other hand, Structured Streaming operates on a DataFrame linked to an infinite table, using an API that's fine-tuned for enhanced performance [ref 3].

Streaming components

In this section, we'll provide a concise overview of the essential elements of Spark Streaming that are employed in any Extract-Transform-Load (ETL) process.

Setup 

To develop a structured streaming application, at least three Spark libraries, in the form of jar files, are essential: Core, SQL, and Streaming. The Maven pom.xml snippet provided below demonstrates how to set up these three libraries:

<spark.version>3.4.0</spark.version>

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.12</artifactId>
   <version>${spark.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql_2.12</artifactId>
   <version>${spark.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming_2.12</artifactId>
   <version>${spark.version}</version>
</dependency>

Important note: The artifact for Spark structured streaming with input from Kafka is spark-streaming-kafka-0-10_2.12.

Our use case utilizes Spark's transformations and actions to construct an ETL (Extract, Transform, Load) pipeline.
A transformation refers to any operation in Spark that yields a DataFrame or Dataset, and is executed in a lazy manner, meaning it's not computed immediately.
An action, on the other hand, prompts a computation to deliver a result, thereby initiating the execution of all prior transformations in the sequence.

Transformation

The class STransform defined the data transformation of DataFrame (map function) using SQL, syntax.

The class attributes are:
  • selectFields: List of fields to display
  • whereConditions: WHERE conditions  if not empty
  • transformFunc: DataFrame transformation function DataFrame => DataFrame
  • descriptor: Optional descriptor
The selects fields and whereConditions are concatenate for the SQL statement. There is no validation of the generated SQL query prior execution.

TransformFunc = (DataFrame, String) => DataFrame

class STransform(
   selectFields: Seq[String],
   whereConditions: Seq[String],
   transformFunc: TransformFunc,
   descriptor: String = ""
){
   def apply(df: DataFrame): DataFrame = transformFunc(df, queryStmt)

   def queryStmt: String = {
      val whereConditionStr = if (whereConditions.nonEmpty) s"WHERE ${whereConditions.mkString("AND ")}" else ""
      s"SELECT ${selectFields.mkString(",")} FROM temptable $whereConditionStr"
   }
}


Action

The class SAggregator wraps the group by operation with an aggregation function.
  • groupByCol: Column used for grouping (groupBy)
  • aggrCol: Column used by the aggregation function
  • aggrFunc: Aggregation function that convert a Column into another Column
  • aggrAliasName: Alias name for the aggregated values
AggrFunc = Column => Column

class SAggregator(
   groupByCol: Column,
   aggrCol: Column,
   aggrFunc: AggrFunc,
   aggrAliasName: String
){
   def apply(inputDF: DataFrame): DataFrame = 
     inputDF.groupBy(groupByCol).agg(aggrFunc(aggrCol).alias(aggrAliasName))
}

Streams wrapper

SparkStructStreams defines the generic wrapper trait for structured streaming with the minimum set of required attributes to describe any ETL-based pipeline.
Each specific ETL pipeline has to override the following variables:
  • outputMode  Mode for writer stream (i.e. Append, Update, ...)
  • outputFormat  Format used by the stream writer (json, console, csv, ...)
  • outputColumn Name of the aggregated column
  • isConsoleSink Flag to enabled Console sink for debugging purpose
  • transform  Optional transformation (input dataframe, SQL statement) => Output data frame
  • aggregator Optional aggregator with groupBy (single column) and sql.functions._ aggregation function.
trait SparkStructStreams{
  val outputMode: OutputMode
  val outputFormat: String
  val outputColumn: String
  val isConsoleSink: Boolean
  val transform: Option[STransform]
  val aggregator: Option[SAggregator]
}

ETL

Streaming pipeline

Our data pipeline implements the conceptual Extract-Transform-Load pattern. 
The extraction consists of reading the data stream from HDFS in JSON format. The two fundamental types of data processing tasks in Apache Spark are transformations (map) and actions (reduce). They implements the transform section of the pipeline.
Finally the data stream is written into sink as CSV file, implementing the Load task. 

Spark streaming ETL data pipeline

We wrap the streaming pipeline into a class, SparkStructStreaminFromFile inherited from SparkStructStreams to which we add the path of the source, folderPath and an implicit reference to the SparkSession.
As the transform and aggregation tasks rely on SQL statements, we need to extract the schema from the data source. The data source consists of JSON files  so we infer the schema from the first record.

class SparkStructStreamsFromFile (
   folderPath: String,  // Absolute path for the source file
   override val outputMode: OutputMode, // Mode for writer stream (i.e. Append, Update, ...)
   override val outputFormat: String, //  Format used by the stream writer (json, console, csv, ...)
   override val outputColumn: String, // Name of the aggregated column
   override val isConsoleSink: Boolean,
   override val transform: Option[STransform], // Transformation (DataFrame, SQL) =>  DataFrame
   override val aggregator: Option[SAggregator] // groupBy (single column) +  sql.functions._
)(implicit  sparkSession: SparkSession)  extends SparkStructStreams {
    
     // Extract schema from files
  lazy val schema: StructType = 
    sparkSession.read.json(s"hdfs://${folderPath}").head().schema


  def execute(): Unit = {
        
   // --------------------   EXTRACT ------------------------
          // Step 1: Stream reader from a file 'folderPath'
    val readDF:  DataFrame = sparkSession
        .readStream
        .schema(schema)
        .json(s"hdfs://$folderPath")
    assert(readDF.isStreaming)


    // -----------------  TRANSFORM ---------------------
          // Step 2: Transform
    val transformedDF: DataFrame = transform.map(_(readDF)).getOrElse(readDF)

         // Step 3: Aggregator
    val aggregatedDF = aggregator.map(_(transformedDF)).getOrElse(transformedDF)

         // Step 4: Debugging to console
     aggregatedDF.writeStream.outputMode(OutputMode.Complete()).format("console").start()


    //--------------------  LOAD ---------------------------
         // Step 5: Stream writer into a table
    val query = aggregatedDF
        .writeStream
        .outputMode(OutputMode.Update())
        .foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
             batchDF.persist()
             batchDF.select(outputColumn)
                .write
                .mode(SaveMode.Overwrite)
                .format(outputFormat)
                .save(path = s"hdfs://output/$outputFormat")
             batchDF.unpersist()
         }
         // Step 6: Initiate streaming
      .trigger(Trigger.ProcessingTime("4 seconds"))
      .start()

    query.awaitTermination()
  }
}

The method execute implements the logic of the streaming pipeline. There are 6 steps
  1. Read stream from a set of JSON file located in  'folderPath'' into a data frame, readDF. The schema is inferred from the first JSON record in the constructor
  2. Apply the transformation on extracted data frame, readDF
  3. Apply the Spark action on the transformed data frame, transformedDF, on grouped data. 
  4. Use console sink to stream debugging information
  5. Stream the aggregated data, aggregatedDF into CSV files using a stream writer in Update mode.
  6. Initiate the streaming process

Extract

The extraction of data consists of loading the JSON data  into a partitioned data frame, df through API method, readStream.
df = sparkSession.readStream.schema(mySchema).json(path)

Transform

The transformation, myTransformFunc, convert the data frame using extracted data, readDF and SQL query, sqlStatement, to execute:  SELECT age, gender FROM table where age > 18; The result of the query is stored into a temporary view, 'temptable'.

def myTransformFunc(
  readDF: DataFrame,
  sqlStatement: String
)(implicit sparkSession: SparkSession): DataFrame = {
  readDF.createOrReplaceTempView("TempView")
  sparkSession.sql(sqlStatement)
}

val myTransform = new STransform(
   Seq[String]("age","gender"),
   Seq[String]("age > 18"),
   myTransformFunc,
   "Filter by age"
)


The second step is to compute the average age of grouped data as SELECT  gender, avg(age) FROM TempView GROUP BY gender;

def aggregatedFunc(inputColumn: Column): Column = {
  import org.apache.spark.sql.functions._
  avg(inputColumn)
}

val myAggregator = new SAggregator(
   new Column("gender"),
   new Column("age"),
   aggregatedFunc,
   "avg_age"
)

Load

The final task is to write the CSV file sink.

df.writeStream.outputMode(Update())
  .foreachBatch
    (df: DataFrame, batchId: Long) =>  
          df.persist()
          df.select('ave_age').write.mode(Overwrite).format("css").save(path)
          df.unpersist()
 }.trigger(Trigger.ProcessingTime("4 seconds")).start()

The foreachBatch function enables developers to define a specific operation to be applied to the output data from each micro-batch within a streaming query. However, this function cannot be used in continuous processing mode, where foreach would be the suitable alternative.

The mode defines the procedure to update the unbounded result table:
  • Append mode (Default) Rows are appended to the result table for query (select, where, map, flatMap, filter, join
  • Complete mode: The result table is output after a trigger, required for aggregation.
  • Update mode: Only the rows in the results table added since the last trigger event are output to the sink.

Putting all together

SparkStructStreamsFromFile(
   path,
   OutputMode.Update(),
   outputFormat = "csv",
   outputTable = "avg_age",
   debug = true,
   myTransform,
   myAggregator
 ).execute()

The output of the streaming pipeline in CSV format is 
gender, avg(age)
male,36
female,33


Thank you for reading this article. For more information ...

References




---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3