Sunday, July 19, 2020

Setup Tableau with Amazon EMR-Spark

Target audience: Intermediate
Estimated reading time: 3'

This article describes the configuration of Amazon EMR service and set up of Tableau desktop to query and visualize Spark SQL datasets and content stored on S3. The installation process can daunting as documentation and useful tips are spread across various sites, chat rooms.


Table of contents
Overview
Follow me on LinkedIn

Overview

Tableau is a popular, powerful visualization platform that leverages a large variety of data sources from files, databases, applications to frameworks such as Apache Spark. Tableau is particularly suitable to visualize the results of queries to a Spark dataset.
Tableau desktop is a charts/query builder that relies on simple drag and drop to render results of query. It support a very large variety (50+) data source connectors from files, databases, CRMs, enterprise and cloud applications to scalable frameworks such as Apache Spark. Tableau's powerful statistical and computational capabilities help data scientists and product managers to spot critical data patterns.
There are few options to query and visualize Apache Spark datasets in Tableau:
  1. Using an ODBC driver to access Spark data-frame using the Spark SQL connector
  2. Through Hive2 thrift server using the Amazon EMR Hadoop Hive connector

For this post we select the second option and describe a common use case: installation and configuration of Thrift server, loading data from S3, transformation applied to a Spark dataset and leveraging parquet format.

Setup

Starting configuration

We assume that the data has been previously stored on Amazon S3. The same procedure would apply to any other storage such as HDFS, database or local file. The other assumptions is that Apache Spark has been deployed through an Amazon EMR.

Steps

1- Download Tableau Desktop Tableau Desktop

2- Download Simba ODBC driver for MacOS for Amazon EMR Hadoop Hive connector Driver Download 

3- Implement processing of the dataset loaded from S3 in CSV format, and Hive table using parquet. The procedure (Scala code snippet below) can be also easily implemented using Python/PySpark.
  1. final private val dataDir = "/tmp/records_table"
    final private val s3Bucket = "...."
    final private val s3CSVFile = "...."
    
    final val createTable = "CREATE EXTERNAL TABLE records_table(" +
     "id varchar(64), " +
     "description varchar(512), " +
     "value float, " +
     "unit char(8) " +
     "STORED AS PARQUET LOCATION"
    
    
    def loadRecords(implicit sparkSession: SparkSession): Unit = {
      import sparkSession.implicits._
      try {
        // Load dataframe from CSV file
        val recordsDF  = s3CSVToDataFrame(s3CSVFile , s3Bucket, true, true)
        // Convert data frame into a typed data set
        val recordsDS = recordsDF.map(Record(_))
    
        // Generate and store data in Parquet columnar structure
        recordDS.write.mode(SaveMode.Overwrite).parquet(dataDir)
    
        // Create the HIVE table pre-populated from Parquet structure stored on HDFS
        sparkSession.sql("DROP TABLE IF EXISTS records_table")
        sparkSession.sql(s"${createTable} '$dataDir'")
        logger.info(s"New table for $dataDir was created")
    
        // Just a quick validation test
        sparkSession.sql("SELECT id, value FROM records_table")
                    .show
        sparkSession.close
      }
      catch {
        case e: Exception => 
          logger.error(s"Failed to create HIVE table ${e.toString}")
      }
    }
    
    
    
    @throws(clazz = classOf[IllegalStateException])
    def s3CSVToDataFrame(
         s3CSVInputFile: String, 
         header: Boolean,
         s3Bucket: String,
         isMultiLine: Boolean
    )(implicit sparkSession: SparkSession): DataFrame = {
     import sparkSession.implicits._
     
     // Initialize the access configuration for Hadoop
     val loadDS = Seq[String]().toDS
     val accessConfig = loadDS.sparkSession.sparkContext.hadoopConfiguration
    
     try {
       accessConfig.set("fs.s3a.access.key", "xxxxxx")
       accessConfig.set("fs.s3a.secret.key", "xxxxxxx")
       
       val headerStr = if (header) "true" else "false"
       // Read the content of the CSV file from S3 to generate a data frame
       sparkSession
             .read
             .format("csv")
             .option("header", headerStr)
             .option("delimiter", ",")
             .option("multiLine", isMultiLine)
             .load(path = s"s3a://${s3Bucket}/${s3CSVInputFile}")
      } catch {
         case e: FileNotFoundException =>  
             throw new IllegalStateException(e.getMessage)
         case e: SparkException =>  
             throw new IllegalStateException(e.getMessage)
         case e: Exception => 
             throw new IllegalStateException(e.getMessage)
      }
    }
    


4- Log into the target EMR instance and upload the jar file for execution

5- Add or edit few spark configuration parameters 

  • spark.sql.warehouse.dir=/hive/spark-warehouse 
  • spark.hadoop.hive.metastore.warehouse.dir=/hive/spark-warehouse 
  • spark.sql.hive.server2.thrift.port=10000 
  • spark.sql.hive.thriftServer.singleSession=true

6- Execute the code to generate the Hive table from Spark dataset  

7- Set up/edit HIVE configuration file /usr/lib/spark/conf/hive-site.xml as a super user (sudo). 

 

Note: The default Derby embedded driver is used for convenience but can be easily replaced by a MySql or PostgreSQL driver by updating the javax.jdo.option.ConnectionURL and javax.jdo.option.ConnectionDriverName values. 

The default port for the thrift server is 10000 (hive.server2.thrift.port) may have to be changed to avoid conflict with other services.
  • <configuration>
    <property>
    <name>hive.metastore.connect.retries</name>
    <value>10</value>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:derby:;databaseName=metastore_db;create=true</value>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.apache.derby.jdbc.EmbeddedDriver</value>
    </property>
    <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>~/hive/warehouse</value>
    </property>
    <property>
    <name>hive.server2.authentication</name>
    <value>NONE</value>
    </property>
    <property>
    <name>hive.server2.thrift.client.user</name>
    <value>root</value>
    </property>
    <property>
    <name>hive.server2.thrift.client.password</name>
    <value>xxxxxx</value>
    </property>
    <property>
    <name>hive.server2.thrift.port</name>
    <value>10000</value>
    </property>
    <property>
    <name>hive.security.authorization.enabled</name>
    <value>true</value>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>hive</value>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>xxxxx</value>
    </property>
    <property>
    <name>hive.exec.local.scratchdir</name>
    <value>~/tmp/hive</value>
    </property>
    </configuration>

    Note: The configuration variables defined in the spark configuration file overrides some of these entries. 

    • Remove potential locks in the metastore: rm -r metastore/*.lck (Locked access to the store will generate an error accessing and reading the table)
    • Stop the Hive2 thrift server sudo /usr/lib/spark/sbin/stop-thriftserver.sh 
    • Optionally kill the 2 processes related to thrift server ps -ef | grep RunJar sudo kill -9 {processId} 
    • Restart the thrift server sudo /usr/lib/spark/sbin/start-thriftserver.sh --master local 
    • Verify the parquet data is correctly stored on HDFS hdfs dfs -ls /tmp/metrics/ 
    • Verify table is created and populated in the EMR instance hive => show tables
    • Launch Tableau desktop
    • Select Amazon EMR-Hadoop connector
    • Configure the connection through UI (see attached snapshot): 1) Enter the public DNS URL for the EMR master instance, 2) Select authentication = user name 3) Enter user name = hadoop 4) Select SSL required.


  • Thank you for reading this article. For more information ...


    References


    ---------------------------
    Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
    He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

    Sunday, April 26, 2020

    Handling Scala 2.x Option Elegantly

    Target audience: Beginner
    Estimated reading time: 3'

    This post reviews the different alternative mechanisms in Scala to handle errors. It also illustrates the applicability of the Option monad.

    Table of contents
    Follow me on LinkedIn

    The Option monad is a great tool for handling error, in Scala: developers do not have worry about NullPointerException or handling a typed exception as in Java and C++.

    Notes:
    • For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted 
    • The code associated with this article is written using Scala 2.12.4

    Use case

    Let's consider the simple square root computation, which throws an exception if the input value is strictly negative (line 2). The most common "java-like" approach is to wrap the computation with a try - catch paradigm. In Scala catching exception can be implemented through the Try monad (lines 7-9).

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    def sqrt(x: Double): Double = 
        if(x < 0.0) 
           throw MathException(s"sqrt: Incorrect argument $x")
        else 
           Math.sqrt(x)
     
    Try ( sqrt(a)) match {
        case Success(x) => {}
        case Failure(e) => Console.println(e.toString)
    }
    

    This type of implementation put the burden on the client code to handle the exception. The Option monad provides developer an elegant to control the computation flow.

    Handling Option values

    Clearly, there is no need to compute y if x is negative.
    The most common to handle a Scala option is to unwrap it. Let's consider the function      y = sin(sqrt(x))
    def sqrt(x: Double): Option[Double] = {
        if(x < 0.0) None
        else Math.sqrt(x)
    }
     
     
    def y(x: Double): Option[Double] = sqrt(x) match {
        case Some(y) => Math.sin(x)
        case None => None
    }
    

    The computation of the square root is implemented by the method sqrt while the final computation of sin(sqrt(x)) is defined by the method y.

    This implementation is quite cumbersome because the client code has to process an extra Option. An alternative is to provide a default value (i.e 0.0) if the first computational step fails.

     
    def y(x: Double): Double = Math.sin(sqrt(x)).getOrElse(0.0)
    

    A more functional and elegant approach uses the map higher order function to propagate the value of the Option.

     
    def y(x: Double): Double =  sqrt(x).map(Math.sin(_)).getOrElse(0.0)
    

    What about a sequence of nested options? Let's consider the function y = 1/sqrt(x). There are two types of errors:
    • x < 0.0 for sqrt
    • x == 0.0 for 1/x
    A third solution consist of applying the test for x > 0.0 to meet the two conditions at once.

     
    def y(xdef y(x: Double): Double = 
        if(x < 1e-30) None  else Some(1.0/(Math.sqrt(x)))
    


    for comprehension for options

    However anticipating the multiple complex conditions on the argument is not always possible. The for comprehensive for loop is an elegant approach to handle sequence of options.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    def inv(x: Double): Option[Double] = {
         if(Math.abs(x) < 1e-30) None
         else 1.0/x
    }
    
    def log(x: Double): Option[Double] = {
         if(x < 1e-30) None
         else Math.log(x)
    }
     
    def compose(x: Double): Double =
       (for {
           y <- sqrt(x)
           z <- inv(y)
           t <- log(z)
        } yield t).getOrElse(0.0)
    

    The objective is to compose the computation of a square root with the inverse function inv (line 1) and natural logarithm log (line 6). The for comprehension construct (lines 11-15) propagates the result of each function to the next in the pipeline through the automatic conversion of option to its value. In case of error (None), the for method exists before completion.
    For-comprehension is a monad that compose (cascading) multiple flatMap with a final map method.

    Thank you for reading this article. For more information ...

    References


    ---------------------------
    Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
    He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

    Sunday, February 23, 2020

    Implement Kernel Density Estimator in Spark

    Target audience: Intermediate
    Estimated reading time: 4'

    This article introduces a very powerful, non-parametric method to extract an empirical continuous probability density function from a dataset: Multivariate Kernel Density Estimation (KDE), also known as the Parzen’s windowAt its core the KDE is a smooth approximation of an histogram [ref 1] 


    Table of contents

    Follow me on LinkedIn
    Notes
    • This article requires a basic knowledge in Apache spark MLlib framework and understanding of statistics and/or machine learning.
    • This implementation relies on Spark 2.3.1 and Scala 2.12.4

    Introduction

    KDE is one of the most well-known approaches to estimate the underlying probability density function of a dataset. KDE will learn the shape of the density from the data automatically. This flexibility arising from its non- parametric nature makes KDE a very popular approach for data drawn from a complicated distribution.

    The KDE algorithm takes a parameter, bandwidth, that affects how “smooth” the resulting curve is. For a set of observations y, and given a kernel function K and a bandwidth, the estimation of the density function f, can be expressed as.


    This post addresses the limitations of the current implementation of KDE in Apache Spark for the multi-variate features.

    Spark implementation

    Apache Spark is a fast and general-purpose cluster computing solution that provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.
    The Apache Spark ecosystems includes a machine learning library, MLlib.

    The implementation of the kernel density estimation in the current version of Apache Spark MLlib library, 2.3.1 
    [ref 2]org.apache.spark.mllib.stats.KernelDensity has two important limitations:
    • It is a univariate estimation
    • The estimation is performed on a sequence of observations, not an RDD or data set, putting computation load on the Spark driver.
    An example of application of KDE using Apache Spark MLlib 2.3.1 ... 

    val sample = sparkSession.sparkContext.parallelize(data) 
     
    val kd = new KernelDensity().setSample(sample).setBandwidth(3.0)
     
    val densities = kd.estimate(Array(-2.0, 5.0))
    

    The method setSample specifies the training set but the KDE is actually trained when the method estimate is invoked on the driver. 

    Multivariate KDE

    The purpose of this post is to extend the current functionality of the KDE by supporting multi-dimensional features and allows the developers to apply the estimation to a dataset. This implementation is restricted to the Normal distribution although it can easily be extended to other kernel functions. 
    We assume 
    • The reference to the current Spark session is implicit (line 1)
    • The encoding of a row for serialization of the task is provided (line 1)
    The method estimate has 3 arguments 
    • TrainingDS training dataset (line 9)
    • Validation validation set (line 10)
    • bandwidth size of the Parzen window
    The validation set has to be broadcast to each worker nodes (line 14). This should not be a problem as the size of the validation set is expected of reasonable size. 
    The training set is passed to each partitions as iterator through a mapPartitions (line 17). The probability densities and count are computed through a Scala aggregate method with a zero function of type, (Array[Double], Long) (line 23). The sequence operator invokes the multinomial normal distribution (line 29). 
      
    The combiner (3rd argument of the aggregate) relies on the BLAS vectorization z = <- a.x+y dxapy (line 38). BLAS library [ref 3has 3 levels (1D, 2D and 3D arrays). Blas library
    The vector of densities is scaled with invCount using the decal BLAS level 1 method (line 45).

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    final class KDE(implicit sparkSession: SparkSession, 
          encoder: Encoder[Row]) {
     
      /**
        * Applied the trained KDE to a set of validation data
        * @param trainingDS  Training data sets
        * @param validationRdd Validation data sets
        * @return Datasets of probability densities
        */
      def estimate(
        trainingDS: Dataset[Obs], 
        validationDS: Dataset[Obs], 
        bandwidth: Double = 1.0): Dataset[Double] = {
        import math._, sparkSession.implicits._
        val validation_brdcast = sparkSession.sparkContext
                .broadcast[Array[Obs]](validationDS.collect)
    
        trainingDS.mapPartitions((iter: Iterator[Obs]) => {
           val seqObs = iter.toArray
           val scale = 0.5 * seqObs.size* log(2 * Pi)
           val validation = validation_brdcast.value
    
           val (densities, count) = seqObs.aggregate(
             (new Array[Double](validation.length), 0L) ) (
               {        // seqOp (U, T) => U
                
                 case ((x, z), y) => {
                    var i = 0
                    while (i < validation.length) {   
                     // Call the pdf function for the normal distribution
                        x(i) += multiNorm(y, bandwidth, scale, validation(i))
                        i += 1
                    }
                    (x, z + 1)  // Update  count & validation values
                 }
              },
              {         // combOp: (U, U) => U
                 case ((u, z), (v, t)) => { 
                    // Combiner calls vectorization z <- a.x + y
                    blas.daxpy(validation.length, 1.0, v, 1, u, 1)
                    (u, z + t)
                 }
              }
          )
    
          val invCount: Double = 1.0 / count
          blas.dscal(validation.length, invCount, densities, 1)  
              // Rescale the density using LINPACK z <- a.x
          densities.iterator
        })
      }
    }
    

    The companion singleton is used to define the multinomial normal distribution (line 5). The type of observations (feature) is Array[Double].

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    final object KDE {
        import math._
        type Obs = Array[Double]
    
        @throws(classOf[IllegalArgumentException])
        def multiNorm(
             means: Obs, 
             bandWidth: Double, 
             scale: Double, 
             x: Obs): Double = {
          require(x.length == means.length, 
               "Dimension of means and observations differs")
    
           exp(
              -scale - (0 until means.length).map( 
                 n => {
                    val sx = (means(n) - x(n)) / bandWidth
                    -0.5 * sx * sx
                 }
           ).sum
        )
      }
    }
    

    Application

    This simple application requires that the spark context (SparkSession) to be defined as well as an explicit encoding of Row using Kryo serializer. The implicit conversion are made available by importing sparkSession.implicits.
    The training set is a sequence of key-value pairs (lines 3-14). The validation set is synthetically generated by multiplying the data in the training value with 2.0 (line 17).

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    implicit val sparkSession: SparkSession =    
           confToSessionFromFile(LocalSparkConf)
    implicit val encoder = Encoders.kryo[Row]
    import sparkSession.implicits._
    
    val trainingData = Seq[(String, Array[Double])](
         ("A", Array[Double](1.0, 0.6)), ("B", Array[Double](2.0, 0.6)), 
         ("C", Array[Double](1.5, 9.5)), ("D", Array[Double](4.5, 0.7)), 
         ("E", Array[Double](0.4, 1.5)), ("F", Array[Double](2.1, 0.6)),
         ("G", Array[Double](0.5, 6.3)), ("H", Array[Double](1.5, 0.1)), 
         ("I", Array[Double](1.2, 3.0)), ("B", Array[Double](3.1, 1.1))
      ).toDS
    
      val validationData = trainingData
          .map { case (key, values) => values.map(_ *2.0) }
    
      val kde = new KDE
      val result = kde.estimate(trainingData.map(_._2),validationData)
    
      println(s"result: ${result.collect.mkString(", ")}")
    
      sparkSession.close
    
    
      val data = Seq[Double](1.0, 5.6)
      val sample = sparkSession.sparkContext.parallelize(data)
      val kd = new KernelDensity().setSample(sample) .setBandwidth(3.0)
      val densities = kd.estimate(Array(-2.0, 5.0))
    


    Note: There are excellent research papers highlighting the statistical foundation behind KDE as well as recent advances [ref 4].

    Thank you for reading this article. For more information ...

    References

    [3BLAS
    Environment Scala: 2.12.4,  Java JDK 1.8, Apache Spark 2.3.1, OpenBLAS 0.3.4


    ---------------------------

    Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
    He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3