Sunday, November 1, 2020

Evaluate Performance of Scala Tail Recursion

Target audience: Intermediate
Estimated reading time: 3'

Recursion refers to the technique where a function invokes itself, either directly or indirectly, and such a function is termed a recursive function. 
Some problems can be more effortlessly addressed using recursive algorithms. In this article, we will assess the performance of Scala's tail recursion in comparison to iterative approaches.


Table of contents
Follow me on LinkedIn
   

Overview

In Scala, the tail recursion is a commonly used technique to apply a transformation to the elements of a collection. The purpose of this post is to evaluate the performance degradation of the tail recursion comparatively to iterative based methods.
For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.

Test benchmark

Let's consider a "recursive" data transformation on an array using a sliding window. For the sake of simplicity, we create a simple polynomial transform on a array of values
   {X0, ... ,Xn, ... Xp}
with a window w, defined as
   f(Xn) = (n-1)Xn-1 + (n-2)Xn-2 + ... + (n-w)Xn-w.  

Such algorithms are widely used in signal processing and technical analysis of financial markets (i.e. moving average, filters).

def polynomial(values: Array[Int]): Int = 
  (if(values.size < W_SIZE) 
     values 
  else 
     values.takeRight(W_SIZE)
  ).sum


The first implementation of the polynomial transform is a tail recursion on each element Xn of the array. The transform f compute f (values(cursor)) from the array values[0, ... , cursor-1] as describe in the code snippet below

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Evaluation(values: Array[Int]) {
  def recurse(f: Array[Int] => Int): Array[Int] = {

    @scala.annotation.tailrec
    def recurse(
      f: Array[Int] => Int, 
      cursor: Int, 
      results: Array[Int]): Boolean = {  
        
      if( cursor >= values.size) // exit condition
        true
      else {
        val arr = f(values.slice(cursor+1, cursor-W_SIZE))
        results.update(cursor, arr)
        recurse(f, cursor+1, results)
      }
    }

    val results = new Array[Int](values.size)
    recurse(f, 0, results)
    results
  }
}

The second implementation relies on the scanLeft method that return a cumulative of transformed value f(Xn).

def scan(f: Array[Int] => Int): Array[Int] = 
   values.zipWithIndex.scanLeft(0)((sum, vn) => 
         f(values.slice(vn._2+1, vn._2-W_SIZE))
  )

Finally, we implement the polynomial transform on the sliding array window with a map method.

def map(f: Array[Int] => Int): Array[Int] = 
   values.zipWithIndex.map(vn =>  f(values.slice(vn._2+1, vn._2-W_SIZE)))


Performance evaluation

For the test, each of those 3 methods is executed 1000 on a dual core i7 with 8 Gbyte RAM and MacOS X Mountain Lion 10.8. The first test consists of executing the 3 methods and varying the size of the array from 10 to 90. The test is repeated 5 times and the duration is measured in milliseconds.



The tail recursion is significantly faster than the two other methods. The scan methods (scan, scanLeft, scanRight) have significant overhead that cannot be "amortized" over a small array. It is worth noticing that the performance of map and scan are similar. The relative performance of those 3 methods is confirmed while testing with large size array (from 1,000,000 to 9,000,000 items).



Thank you for reading this article. For more information ...

References


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Friday, October 9, 2020

Law of Demeter in Java and Scala

Target audience: Beginner
Estimated reading time: 3'

In this article, we shed light on a set of principles frequently bypassed by software engineers: The Law of Demeter, also known as the principle of least knowledge. This design guideline, pivotal in crafting software, especially within the object-oriented paradigm, underscores the essence of minimalism. It posits that an object should make minimal assumptions about the structure or attributes of other entities. In essence, a module should be privy only to the information and resources imperative for its intended function.


Introduction

The Law of Demeter for methods requires that a method of an object may only invoke the methods of the following kinds of objects:
 1  The object itself: this
 2  Variables or objects which scope is the class (attributes or variable members)
 3  The method parameters (or arguments)
 4  Variables or objects local to the method
 5  A global variable or object accessible by the object

The advantage of  the Law of Demeter is that applications are easier to maintain and update because objects are less dependent on the member attributes of other objects. Such a advantage is important when using 3rd party libraries or frameworks. Design patterns such as Facade, Adapter or Proxy provide developers with similar benefits.

The main drawback of Law of Demeter is the constant need to create wrappers to isolate the internal structure of other objects adding execution time overhead. Such wrappers, commonly used in large frameworks, relies on interfaces that delegate the actual implementation of functionality to concrete classes.  Aspect programming, attempts to get around this overhead, among other things.

The law of Demeter was very popular in early 1990's when C++ gained acceptance in the software engineering community.

Use case

The following Java and Scala code snippets illustrates the programming idioms that complies and also violates the Law of Demeter. The following Java class that implements a string concatenation complies with the law regarding local, class attributes and methods.

public class StringConcatenation  {
  private String _name = null;
 
  public String rightUsage(final String s) {
      // Rule 1: Invoke its own method using 'this'
    if( this.isValid(s) ) {
 
      // Rule 2: Call its own attribute:  '_name'
      StringBuilder buf = new StringBuilder(_name); 
 
      // Rule 3: Call methods parameter: 's'
      buf.append(s );
 
      // Rule 4: Call local object : 'buf'
      buf.append("\n");
    }
     
    return buf.toString();
  }
}

The rightUsage method complies with the Law of Demeter because it is referring to objects, variables or method with either class or local scope. Let's consider the following Scala Trait, Dictionary and class, ScientificDictionary that are provided as part of a 3rd party library. The Translation class uses a specific dictionary (scientific, medical,....) in particular language (English,German..) to translate any document.

sealed trait Dictionary[Language] {  
    def translate[Language](s: String): String 
}
 
case class ScientificDictionary[Language]  extends Dictionary[Language] { }                
case class MedicalDictionary[Language]  extends Dictionary[Language] { }
case class SlangDictionary[Language]  extends Dictionary[Language] { }
 
class Translation[Language](var dictionary: Dictionary[Language])  {
   def translate[Language](s: String): String =  dictionary.translate(s}
}

The method wrongUsage below violates the Law of Demeter because there is no guarantee that the 3rd party library provider may not alter or remove a reference to Dictionary from the Translation object. There is also no guarantee that the translate method may be removed or deprecated in future releases of the library.

class StringConcatenation(_name: String) {
    def wrongUsage(translate: Translation[Spanish], s: String): String = 
        translate._dictionary.translate(s"${_name}$s")
}
 

Some code analysis tools can be configured to enforce one or more Demeter rules. At the minimum, these rules should be part of the tool box of software development technical lead responsible for code reviews.

Reference

Law of Demeter Wikipedia


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Sunday, August 30, 2020

Type Erasure, Manifest & Specialization in Scala 2.x

Target audience: Intermediate
Estimated reading time: 4'

Both Scala and Java programming languages employ type erasure to eliminate generics during compilation. This mechanism upholds type constraints exclusively at compile time, removing the specific type details during runtime. Consequently, at runtime, the JVM fails to distinguish between Higher-Kinded types (i.e., List[T] and List[Int].
In this article, we delve into two strategies in Scala that counter this limitation: Manifests and specialization for primitive types.
Table of contents
Follow me on LinkedIn
Notes: 
  • For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted 
  • The code associated with this article is written using Scala 2.12.4

Type erasure

Type erasure ensures that no new classes are created for parameterized types; consequently, generics incur no runtime overhead and the generated bytecode contains only ordinary classes, interfaces, and methods.
The type parameters [U <: T]  are removed and replaced by their upper bound T or Any. The process involves boxing and un-boxing the primitives types if they are used in the code as type parameters, degrading performance. 

Implicit conversion

Let consider a class ListCompare that compare lists of parametric type U bounded by the type Item (line 3).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
case class Item()
 
class ListCompare[U <: Item](
  xs: List[U]
)(implicit f: U => Ordered[U]) {

  def compare(xso: List[U]): Boolean = xso match {
    case str: List[String] => 
      if(xs.size == xso.size) 
        xs.zip(xso).exists( x=> x._1.compareTo(x._2) != 0) 
      else false
    
    case n: List[Int] => 
      if(xs.size == xso.size) 
        xs.zip(xso).exists(x => x._1 != x._2) 
      else false
   
    case _ => false
  }
}

The class has to have an implicit conversion U => Ordered[T] (line 5) to support the comparison of strings. The code above will generate the following warning message: "non-variable type argument String in type pattern List[String] is unchecked since it is eliminated by erasure". The warning message is generated by the compiler because the two parameterized types, List[String] (line 7) and List[Int] (line 12) may not be available to the JVM at the time of the execution of the pattern matching.

Manifest

One solution is to use a Manifest. A Manifest[T] is an opaque descriptor for type T. It allows access to the erasure of the type as a class instance. The most common usage of manifest is related to the creation of native Arrays if the class is not known at compile time as illustrated in the code snippet below.

def myArray[T] = new Array[T](0)
def myArray[T](implicit m: Manifest[T]) = new Array[T](0)
def myArray[T: Manifest] = new Array[T](0)

  • The first line of code won't compile. 
  • The second function will maintain the erasure by passing the manifest as an implicit parameter. 
  • The last function defined the manifest as a context bound. 
Let's re-implement the compare method with a Manifest specified as an implicit parameter.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class ListCompare[U <: Item](
  xs: List[U]
)(implicit f: U => Ordered[U]) {

  def compare(
    xso: List[U]
   )(implicit u: Manifest[List[U]]): Boolean = {
    if( u <:< manifest[List[String]] ) 
      if( xs.size == xso.size)
        xs.zip(xso)
          .exists( x=> x._1.compareTo(x._2) != 0) 
      else false
     
    else if(u <:< manifest[List[Int]])  
      if( xs.size == xso.size) 
        xs.zip(xso).exists(x => x._1!=x._2) 
      else false
     
    else false
  }
}

The code will compile and execute as expected. It relies on the "<:<" type operator, which is an old variant (lines 8 & 14). In any case, the code is far from being elegant and quite difficult for an inexperience Scala developer to grasp. There should be a better way.

Specialization

One better option is to generate a separate class for the primitive type, Int and String using the @specialized annotation. The annotation @specialized (line 1) forces the compiler to generate byte code for each primitive listed as specialized type. For instance the instruction @specialized(Int, Double) T generates two extra, efficient classes for the primitive types Int and Double. The original ListComp class and its method compare are re-written using the annotation as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class ListComp[@specialized Int, String, U <: Item](
   xs:List[U]
)(implicit f: U =>Ordered[U]) {
   
  def compare(xso: List[U]): Boolean =
    if(xs.size == xso.size) 
      xs.zip(xso)
        .exists( x => x._1.compareTo(x._2)!=0)   
    else 
      false
}

The code above will not throw a warning or error. However there is not such a thing as a free lunch, as the compiler generates extra byte code for the methods associated to the specialized primitive types. The objective in this case is to trade a higher memory consumption for performance improvement.

Alternative mechanisms

There are alternative, granular options to avoid type erasure error when compiling Scala containing generics:
  • T: ClassTag: Given a Collection[Any] it provides an implicit conversion to Collection[T]. It is not as powerful as TypeTag
  • T: TypeTag: Allows to differentiate between Higher Kinds such as List[String] and List[Int] for instance. However, it cannot differentiate A[T] from A[U] if A is an abstract class.
  • T: WeakTypeTag: Allow differentiation between concrete and abstract classes.

Thank you for reading this article. For more information ...

References

  • Programming in Scala M. Odesky, L.Spoon, B. Venners - Artima 2008
  • Scala for the Impatient - Cay Horstman Addison-Wesley 2012
  • github.com/patnicolas

Sunday, July 19, 2020

Setup Tableau with Amazon EMR-Spark

Target audience: Intermediate
Estimated reading time: 3'

This article describes the configuration of Amazon EMR service and set up of Tableau desktop to query and visualize Spark SQL datasets and content stored on S3. The installation process can daunting as documentation and useful tips are spread across various sites, chat rooms.


Table of contents
Overview
Follow me on LinkedIn

Overview

Tableau is a popular, powerful visualization platform that leverages a large variety of data sources from files, databases, applications to frameworks such as Apache Spark. Tableau is particularly suitable to visualize the results of queries to a Spark dataset.
Tableau desktop is a charts/query builder that relies on simple drag and drop to render results of query. It support a very large variety (50+) data source connectors from files, databases, CRMs, enterprise and cloud applications to scalable frameworks such as Apache Spark. Tableau's powerful statistical and computational capabilities help data scientists and product managers to spot critical data patterns.
There are few options to query and visualize Apache Spark datasets in Tableau:
  1. Using an ODBC driver to access Spark data-frame using the Spark SQL connector
  2. Through Hive2 thrift server using the Amazon EMR Hadoop Hive connector

For this post we select the second option and describe a common use case: installation and configuration of Thrift server, loading data from S3, transformation applied to a Spark dataset and leveraging parquet format.

Setup

Starting configuration

We assume that the data has been previously stored on Amazon S3. The same procedure would apply to any other storage such as HDFS, database or local file. The other assumptions is that Apache Spark has been deployed through an Amazon EMR.

Steps

1- Download Tableau Desktop Tableau Desktop

2- Download Simba ODBC driver for MacOS for Amazon EMR Hadoop Hive connector Driver Download 

3- Implement processing of the dataset loaded from S3 in CSV format, and Hive table using parquet. The procedure (Scala code snippet below) can be also easily implemented using Python/PySpark.
  1. final private val dataDir = "/tmp/records_table"
    final private val s3Bucket = "...."
    final private val s3CSVFile = "...."
    
    final val createTable = "CREATE EXTERNAL TABLE records_table(" +
     "id varchar(64), " +
     "description varchar(512), " +
     "value float, " +
     "unit char(8) " +
     "STORED AS PARQUET LOCATION"
    
    
    def loadRecords(implicit sparkSession: SparkSession): Unit = {
      import sparkSession.implicits._
      try {
        // Load dataframe from CSV file
        val recordsDF  = s3CSVToDataFrame(s3CSVFile , s3Bucket, true, true)
        // Convert data frame into a typed data set
        val recordsDS = recordsDF.map(Record(_))
    
        // Generate and store data in Parquet columnar structure
        recordDS.write.mode(SaveMode.Overwrite).parquet(dataDir)
    
        // Create the HIVE table pre-populated from Parquet structure stored on HDFS
        sparkSession.sql("DROP TABLE IF EXISTS records_table")
        sparkSession.sql(s"${createTable} '$dataDir'")
        logger.info(s"New table for $dataDir was created")
    
        // Just a quick validation test
        sparkSession.sql("SELECT id, value FROM records_table")
                    .show
        sparkSession.close
      }
      catch {
        case e: Exception => 
          logger.error(s"Failed to create HIVE table ${e.toString}")
      }
    }
    
    
    
    @throws(clazz = classOf[IllegalStateException])
    def s3CSVToDataFrame(
         s3CSVInputFile: String, 
         header: Boolean,
         s3Bucket: String,
         isMultiLine: Boolean
    )(implicit sparkSession: SparkSession): DataFrame = {
     import sparkSession.implicits._
     
     // Initialize the access configuration for Hadoop
     val loadDS = Seq[String]().toDS
     val accessConfig = loadDS.sparkSession.sparkContext.hadoopConfiguration
    
     try {
       accessConfig.set("fs.s3a.access.key", "xxxxxx")
       accessConfig.set("fs.s3a.secret.key", "xxxxxxx")
       
       val headerStr = if (header) "true" else "false"
       // Read the content of the CSV file from S3 to generate a data frame
       sparkSession
             .read
             .format("csv")
             .option("header", headerStr)
             .option("delimiter", ",")
             .option("multiLine", isMultiLine)
             .load(path = s"s3a://${s3Bucket}/${s3CSVInputFile}")
      } catch {
         case e: FileNotFoundException =>  
             throw new IllegalStateException(e.getMessage)
         case e: SparkException =>  
             throw new IllegalStateException(e.getMessage)
         case e: Exception => 
             throw new IllegalStateException(e.getMessage)
      }
    }
    


4- Log into the target EMR instance and upload the jar file for execution

5- Add or edit few spark configuration parameters 

  • spark.sql.warehouse.dir=/hive/spark-warehouse 
  • spark.hadoop.hive.metastore.warehouse.dir=/hive/spark-warehouse 
  • spark.sql.hive.server2.thrift.port=10000 
  • spark.sql.hive.thriftServer.singleSession=true

6- Execute the code to generate the Hive table from Spark dataset  

7- Set up/edit HIVE configuration file /usr/lib/spark/conf/hive-site.xml as a super user (sudo). 

 

Note: The default Derby embedded driver is used for convenience but can be easily replaced by a MySql or PostgreSQL driver by updating the javax.jdo.option.ConnectionURL and javax.jdo.option.ConnectionDriverName values. 

The default port for the thrift server is 10000 (hive.server2.thrift.port) may have to be changed to avoid conflict with other services.
  • <configuration>
    <property>
    <name>hive.metastore.connect.retries</name>
    <value>10</value>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:derby:;databaseName=metastore_db;create=true</value>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.apache.derby.jdbc.EmbeddedDriver</value>
    </property>
    <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>~/hive/warehouse</value>
    </property>
    <property>
    <name>hive.server2.authentication</name>
    <value>NONE</value>
    </property>
    <property>
    <name>hive.server2.thrift.client.user</name>
    <value>root</value>
    </property>
    <property>
    <name>hive.server2.thrift.client.password</name>
    <value>xxxxxx</value>
    </property>
    <property>
    <name>hive.server2.thrift.port</name>
    <value>10000</value>
    </property>
    <property>
    <name>hive.security.authorization.enabled</name>
    <value>true</value>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>hive</value>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>xxxxx</value>
    </property>
    <property>
    <name>hive.exec.local.scratchdir</name>
    <value>~/tmp/hive</value>
    </property>
    </configuration>

    Note: The configuration variables defined in the spark configuration file overrides some of these entries. 

    • Remove potential locks in the metastore: rm -r metastore/*.lck (Locked access to the store will generate an error accessing and reading the table)
    • Stop the Hive2 thrift server sudo /usr/lib/spark/sbin/stop-thriftserver.sh 
    • Optionally kill the 2 processes related to thrift server ps -ef | grep RunJar sudo kill -9 {processId} 
    • Restart the thrift server sudo /usr/lib/spark/sbin/start-thriftserver.sh --master local 
    • Verify the parquet data is correctly stored on HDFS hdfs dfs -ls /tmp/metrics/ 
    • Verify table is created and populated in the EMR instance hive => show tables
    • Launch Tableau desktop
    • Select Amazon EMR-Hadoop connector
    • Configure the connection through UI (see attached snapshot): 1) Enter the public DNS URL for the EMR master instance, 2) Select authentication = user name 3) Enter user name = hadoop 4) Select SSL required.


  • Thank you for reading this article. For more information ...


    References


    ---------------------------
    Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
    He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3