Apache Kafka
Spark structured streaming
Use case
Overview
Data streaming pipeline
Implementation
Weather data sources
Weather tracking pipeline
References
- Software versions utilized in the source code include Scala 2.12.15, Spark 3.4.0, and Kafka 3.4.0.
- This article focuses on the data streaming pipeline. Explaining the storm prediction model falls outside the article's range.
- You can find the source code on the Github-Streaming library [ref 1]
- Note that error messages and the validation of method arguments are not included in the provided code examples.
Streaming frameworks
- A Kafka message queue, where each monitoring device type is allocated a specific topic, enabling the routing of different alerts to the relevant agency.
- The Spark streaming framework to handle weather data processing and to disseminate forecasts of serious weather disruptions.
- A predictive model specifically for severe storms and tornadoes.
Apache Kafka
- Publishing (writing) and subscribing (reading) to event streams, as well as continuously importing/exporting data from various systems.
- Durable and reliable storage of event streams for any desired duration.
- Real-time or retrospective processing of event streams.
zookeeper-server-stop
kafka-server-stop
sleep 2
zookeeper-server-start $KAFKA_ROOT/kafka/config/zookeeper.properties &
sleep 1
ps -ef | grep zookeeper
kafka-server-start $KAFKA_ROOT/kafka/config/server.properties &
sleep 1
ps -ef | grep kafka
<scala.version>2.12.15</scala.version>
<kafka.version>3.4.0</kafka.version>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-scala_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
Apache Spark
- Analytics: Spark's capability to quickly produce responses allows for interactive data handling, rather than relying solely on predefined queries.
- Data Integration: Often, the data from various systems is inconsistent and cannot be combined for analysis directly. To obtain consistent data, processes like Extract, Transform, and Load (ETL) are employed. Spark streamlines this ETL process, making it more cost-effective and time-efficient.
- Streaming: Managing real-time data, such as log files, is challenging. Spark excels in processing these data streams and can identify and block potentially fraudulent activities.
- Machine Learning: The growing volume of data has made machine learning techniques more viable and accurate. Spark's ability to store data in memory and execute repeated queries swiftly facilitates the use of machine learning algorithms.
<spark.version>3.3.2</spark.version>
<scala.version>2.12.15</scala.version>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
Spark structured streaming
Use case
Overview
Weather station to collect temperature, pressure, humidity | |
Doppler radar for advanced data regarding wind intensity and direction |
Data streaming pipeline
- Kafka queue.
- Spark's distributed structured streams.
- A variety of storm and tornado prediction models, developed using the PyTorch library and accessible via REST API.
Storm tracking streaming pipeline
Data gathered from weather stations and Doppler radars is fed into the Spark engine, where both streams are combined and harmonized based on location and timestamp. This unified dataset is then employed for training the model. During the inference phase, predictions are streamed back to Kafka.
Adding an intriguing aspect to the implementation, both weather and Doppler data are ingested from Kafka as batch queries, and the storm forecasts are subsequently streamed back to Kafka.
Implementation
Weather data sources
trait TrackingData[T <: TrackingData[T]] {
self =>
val id: String // Identifier for the weather tracking device
val longitude: Float // Longitude for the weather tracking device
val latitude: Float // Latitude for the weather tracking device
val timeStamp: String // Time stamp data is collected
}
case class WeatherData (
override val id: String, // Identifier for the weather station
override val longitude: Float, // Longitude for the weather station
override val latitude: Float, // Latitude for the weather station
override val timeStamp: String = System.currentTimeMillis().toString,
// Time stamp data is collected
temperature: Float, // Temperature (Fahrenheit) collected at timeStamp
pressure: Float, // Pressure (millibars) collected at timeStamp
humidity: Float) extends TrackingData[WeatherData] { // Humidity (%) collected at timeStamp
// Random generator for testing
def rand(rand: Random, alpha: Float): WeatherData = this.copy(
timeStamp = (timeStamp.toLong + 10000L + rand.nextInt(2000)).toString,
temperature = temperature*(1 + alpha*rand.nextFloat()),
pressure = pressure*(1 + alpha*rand.nextFloat()),
humidity = humidity*(1 + alpha*rand.nextFloat())
)
// Encoded weather station data produced to Kafka queue
override def toString: String =
s"$id;$longitude;$latitude;$timeStamp;$temperature;$pressure;$humidity"
}
case class DopplerData(
override val id: String, // Identifier for the Doppler radar
override val longitude: Float, // Longitude for the Doppler radar
override val latitude: Float, // Latitude for the doppler radar
override val timeStamp: String, // Time stamp data is collected
windShear: Boolean, // Is it a wind shear?
windSpeed: Float, // Average wind speed
gustSpeed: Float, // Maximum wind speed
windDirection: Int) extends TrackingData[DopplerData] {
// Random generator
def rand(rand: Random, alpha: Float): DopplerData = this.copy(
timeStamp = (timeStamp.toLong + 10000L + rand.nextInt(2000)).toString,
windShear = rand.nextBoolean(),
windSpeed = windSpeed * (1 + alpha * rand.nextFloat()),
gustSpeed = gustSpeed * (1 + alpha * rand.nextFloat()),
windDirection = {
val newDirection = windDirection * (1 + alpha * rand.nextFloat())
if(newDirection > 360.0) newDirection.toInt%360 else newDirection.toInt
}
)
// Encoded Doppler radar data produced to Kafka
override def toString: String = s"$id;$longitude;$latitude;$timeStamp;$windShear;$windSpeed;" +
s"$gustSpeed;$windDirection"
}
Weather tracking streaming pipeline
- Source: Retrieves data from weather stations and Doppler radars via Kafka queues.
- Aggregation: Synchronizes and consolidates the data.
- Storm Prediction: Activates the model to provide potential storm advisories, communicated through a REST API.
- Sink: Distributes storm advisories to the relevant Kafka topics.
class WeatherTracking(
inputTopics: Seq[String],
outputTopics: Seq[String],
model: Dataset[ModelInputData] => Seq[WeatherAlert])(implicit sparkSession: SparkSession) {
def execute(): Unit =
for {
(weatherDS, dopplerDS) <- source // Step 1
consolidatedDataset <- synchronizeSources(weatherDS, dopplerDS) // Step 2
predictions <- predict(consolidatedDataset) // Step 3
consolidatedDataset <- sink(predictions) // Step 3
} yield { consolidatedDataset }
def source: Option[(Dataset[WeatherData], Dataset[DopplerData])] = try {
import sparkSession.implicits._
val df = sparkSession.read.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", inputTopics.mkString(","))
.option("max.poll.interval.ms", 2800)
.option("fetch.max.bytes", 32768)
.load(). // (1)
val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]. // (2)
\// Convert weather data stream into objects
val weatherDataDS = ds .filter(_._1.head == 'W') // (3)
.map{ case (_, value) => WeatherDataEncoder.decode(value) } // (4)
// Convert Doppler radar data stream into objects
val dopplerDataDS = ds.filter(_._1.head == 'D') // (3)
.map { case (_, value) => DopplerDataEncoder.decode(value) } // (4)
Some((weatherDataDS, dopplerDataDS))
}
catch { case e: Exception => None }
def synchronizeSources(
weatherDS: Dataset[WeatherData],
dopplerDS: Dataset[DopplerData]): Option[Dataset[ModelInputData]] = try {
val timedBucketedWeatherDS = weatherDS.map(
wData =>wData.copy(timeStamp = bucketKey(wData.timeStamp))
)
val timedBucketedDopplerDS = dopplerDS.map(
wData => wData.copy(timeStamp = bucketKey(wData.timeStamp))
)
// Performed a fast, presorted join
val output = sortingJoin[WeatherData, DopplerData](
timedBucketedWeatherDS,
tDSKey = "timeStamp",
timedBucketedDopplerDS,
uDSKey = "timeStamp"
).map {
case (weatherData, dopplerData) =>
ModelInputData(
weatherData.timeStamp,
weatherData.temperature,
weatherData.pressure,
weatherData.humidity,
dopplerData.windShear,
dopplerData.windSpeed,
dopplerData.gustSpeed,
dopplerData.windDirection
)
}
Some(output)
} catch { case e: Exception => None }
def sink(stormPredictions: Dataset[StormPrediction]): Option[Dataset[String]] = try {
// Encode dataset of weather alerts to be produced to Kafka topic
val encStormPredictions = stormPredictions.map(_.toString)
encStormPredictions
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", outputTopics.mkString(",")
).start()
Some(encStormPredictions)
} catch { case e: Exception => None }
References
Appendix
case class ModelInputData(
timeStamp: String, // Time stamp for the new consolidated data, input to model
temperature: Float, // Temperature in Fahrenheit
pressure: Float, // Barometric pressure in millibars
humidity: Float, // Humidity in percentage
windShear: Boolean, // Boolean flag to specify if this is a wind shear
windSpeed: Float, // Average speed for the wind (miles/hour)
gustSpeed: Float, // Maximum speed for the wind (miles/hour)
windDirection: Float // Direction of the wind [0, 360] degrees
)
case class StormPrediction(
id: String, // Identifier of the alert/message
intensity: Int, // Intensity of storm or Tornado [1, to 5]
probability: Float, // Probability of a storm of a given intensity develop
timeStamp: String, // Time stamp of the alert or prediction
modelInputData: ModelInputData,// Weather and Doppler radar data used to generate/predict the alert
cellArea: CellArea // Area covered by the alert/prediction
)
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3