Showing posts with label Stream. Show all posts
Showing posts with label Stream. Show all posts

Thursday, May 24, 2018

Streaming Moving Averages with Apache Spark

Target audience: Intermediate
Estimated reading time: 5'

Spark Streaming offers an abstraction called Discretized Stream, or DStream, which signifies a continuous flow of data. This can either stem from an initial source or emerge as a result of transforming the input data. Essentially, a DStream is characterized by a consistent sequence of Resilient Distributed Datasets (RDD) [1].


Follow me on LinkedIn
Important notes
  • This post describes the streaming features as implemented in Apache Spark 2.0.x 
  • Stream should not confused with structured streaming [2].

Use case

The creation of models through supervised learning from a given training sets usually requires the application of some pre-processing algorithm such as smoothing, filtering or extrapolating existing data for missing values. 

Computing the moving average of a time series is a simple and convenient technique used to filter out noise and reduce the impact of outliers on trend lines. For this post, we consider the simplest form of moving average over a period of p values, as defined in the following formula \[\tilde{x_{t}}= \frac{1}{p}\sum_{j=t-p+1}^{t}x_{j}\] 
The iterative (or recursive) form is defined as \[\tilde{x_{t}}=\tilde{x_{t-1}}+\frac{1}{p}(x_{t}-x_{t-p})\] 

Spark DStreams

This section refers to discretized streams that were introduced in Spark 1.x. Structured streaming will be discussed in a future post.



Thank you for reading this article. For more information ...

References



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Tuesday, June 10, 2014

Introduction to Scala Discretized Streams

Target audience: Beginner
Estimated reading time: 5'


How can we leverage Scala Streams to manage very large data sets with limited computing resources?


Table of contents
Follow me on LinkedIn

Overview

A Stream instance can be regarded as lazy list, or more accurately a list with lazy elements. The elements are allocated only when accessed. Stream allows Scala developers to write infinite sequences. Elements can be removed from memory (to be handled by the GC) defined by eliminating any reference to its elements once no longer needed.

Performance Evaluation

It is easy to understand the benefit of Stream in term of memory management. But what about the performance?
Let's compare Stream and List using 3 simple operations:
  • Allocating elements
  • Retrieving a random element
  • Traversing the entire collection
Let's start by defining these operations in the context of computing the mean value of a very large sequence of double values.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val NUM = 10000000 
 
   // Allocation test
val lst = List.tabulate(NUM)( _.toDouble)

   // Reading test
var y = 0.0
Range(0, 10000).foreach( _ => 
     {y = lst(Random.nextInt(NUM-1)}
)
   // Reducer test
lst.reduce( _ + _ )/lst.size

The operation of reading a value at a random index is repeated 10,000 times in order to make the performance evaluation more reliable (line 8, 9). The mean is computed using a simple reduce method (line 12)

Let's implement the same sequence of operations using Stream class.

1
2
3
4
5
6
7
8
val strm = Stream.tabulate(NUM)( _.toDouble)
   // Reading test
var y = 0.0
Range(0, 10000).foreach( _ => 
  {y = strm(Random.nextInt(NUM-1)}
)
   // Reducer test
strm.reduceRight( _ + _ )/strm.size

The implementation of the generation of random values using Stream is very similar to the implementation using List (line 4, 5). The mean of the stream is also computed with a reducer (line 8).

The test is run 20 times to avoid distortion of the initialization of the JVM. 


The allocation of the elements in the stream is slightly faster than the creation of the list.
The main difference is the time required by the List and Stream to traverse the entire collection using the reduceRight method as a reducer. In this code snippet above, the Stream has to allocate all its elements at once. This scenario is very unlikely as Streams are usually needed to process section or slices of a very large sequence of values or objects, as demonstrated in the next section.

Use case: moving average

The most common application of Scala Stream is iterative or recursive application of a function/transform or sequence of functions to a very large data set, in this case, the mean value.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
val strm = Stream.fill(NUM)( Random.nextDouble )
  
val STEP = 5
val sum = strm.take(STEP).sum
val avStrm = strm.drop(STEP)

 // Apply the updating formula 
 // Sum(n, n+STEP) = Sum(n -1, STEP) - x(n-1) + x(n)
avStrm.zip(avStrm.tail)
      .map(x => sum - x._1 + x._2)
      .map( _ /STEP)


First, the code creates a reference strm of a stream of NUM random values (line 1). Then it computes the sum of the first STEP elements of the stream (line 4). Once the sum is computed, these elements are dropped from the stream (line 5). The mean value is updated for each new batch of new STEP elements (line 9-11).

Here is an alternative implementation of the computation of the moving average on a stream of floating point values using the tail recursion.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def average(strm: Stream[Double], window: Int): Stream[Double] = {
  
  @scala.annotation.tailrec
  def average(
    src: Stream[Double], 
    target: Stream[Double]): Unit = {
    
    if( !src.isEmpty ) {
      val tailSrc = src.tail
      val newSum = sum - src.head + tailSrc.head
       average(strm.tail, target :+ newSum)
    }
  }
   
  val _strm = Stream.empty[Double] :+ strm.take(window).sum
  average(strm.drop(window), _strm)
  _strm.map( _/ window) 
}

The recursive call average (line 4) has two arguments: the stream src traversed through the recursion (line 5), and the stream that collects the average (mean) values (line 6). The method recurses as long as the source stream src is not empty (line 8).
The performance of the computation of the mean can be greatly improved by parallel its execution,
Stream.par

References