Wednesday, November 21, 2018

Integrate 3rd Party Services to Spark

Target audience: Beginner
Estimated reading time: 10'

Apache Spark stands as a go-to framework for dispersing computational tasks on a grand scale. At times, these tasks necessitate engagement with external or third-party services, spanning domains like natural language processing, image classification, reporting, or data refinement.

In this article, we delve into the intricacies of integrating third-party microservices hosted on AWS into the Apache Spark framework.


Table of contents
Follow me on LinkedIn

Introduction

Apache Spark is a commonly used framework to distribute large scale computational tasks. Some of these tasks may involve accessing external or 3rd party remote services such as natural language processing, images classification, reporting or data cleansing. 
These remote services or micro-services are commonly accessed through a REST API and are deployable as clusters of nodes (or instances) to improve scalability and high availability. 
Load balancing solutions are known to address scalability challenges since the dawn of the internet.

Is there an alternative to load balancers for scaling remote web services?
We compare two approaches to integrate Spark workers to the 3rd party service nodes: 
  • Deploy a load balancer between Spark executors and remote service
  • Apply hash partitioning for which the IP of each  service instance is assigned to a given Spark partition.
Note: This post assumes the reader is somewhat familiar with load balancers and a rudimentary knowledge of the Apache Spark framework.

Load balancers

Load balancers are commonly used to route requests to web or micro-services according to a predefined policy such as CPU load, average processing time or downtime.  They originally gain acceptance late 90's with the explosion of internet and web sites.
A load balancer is a very simple and easy solution to deploy micro services at scale: It is self contained and does not involve any significant architecture or coding changes to the underlying application (business/logic or data layers).

In a typical Apache Spark deployment, the Spark context splits data sets into partitions. Each partition pre-processes data to generate the request to the service then initiate the connection to the service cluster through the load balancer
Deployment using Apache Spark with load balanced services


The data processing steps are
  1. Master split the input data over a given set of partitions
  2. Workers nodes pre-process and cleanse data if necessary
  3. Request are dynamically generated
  4. Each partition establish and manage the connection to the load balancer
  5. Finally workers node processed the response and payload
Load balancers provides an array of features such as throttling, persistent session, or stateful packet inspection that may not be needed in a Spark environment. Moreover the load balancing scheme is at odds with the core concept of big data: data partitioning. 

Let's consider an alternative approach: assigning (mapping) one or two nodes in the  cluster nodes to each partition.


Partitioning service nodes

The first step is to select a scheme to assign a given set of service node, using IP, to a partition. Spark supports several mechanisms to distribution functions across partitions
  • Range partitioning
  • Hash partitioning
  • Custom partitioning
In this study we use a simple partitioning algorithm that consists of hashing the set of IP addresses for the service nodes, as illustrated in the following block diagram.
Deployment using Apache Spark and hashed IP partitioning


The data pipeline is somewhat similar to the load balancer
  1. Master split the input data over a given set of partitions
  2. IP addresses of all service notes are hashed and assign to each partition
  3. Workers nodes pre-process and cleanse data if necessary
  4. Requests to the service are dynamically generated
  5. Each partition establish and manage the connection to a subset of service nodes
  6. Finally worker nodes processed the response and payload
The implementation of the hashing algorithm in each partition is quite simple. A hash code is extracted from the input element (line 2, 3), as a seed to the random selection of the service node (line 5, 6). The last step consists of building the request, establish the connection to the target service node and process the response (line 9, 11).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 def hashedEndPoints(input: Input, timeout: Int, ips: Seq[String]): Output = {
     val hashedCode = input.hashCode + currentTimeMillis
     val seed = (if (hashedCode < 0) -hashedCode 
                       else hashedCode)
     val random = new scala.util.Random(seed)
     val serverHash = random.nextInt(serverAddresses.length)
     val targetIp = serverAddresses(serverHash)

     val url = s"http://${targetIp}:80/endpoint"
     val httpConnector = HttpConnector(url, timeout)
        // Execute request and return a response of type Output
   }

The function, hashedEndPoint, executed within each partition, in invoked from the master

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def process(
   notes: Dataset[Input],
   timeout: Int,
   serverAddresses: Seq[String]
)(implicit sparkSession: SparkSession): Dataset[Output] = {
 
   inputs.map(
      input => 
          if (serverAddresses.isEmpty) 
              throw new iIlegalStateException("error ...")
hashedEndPoints(input, timeout, serviceAddresses ) }

Beside ensuring consistency and avoiding adding an external component (load balancer) to the architecture, the direct assignment of service nodes to the Spark partitions has some performance benefits.


Performance evaluation

Let's evaluate the latency for processing a corpus of text though an NLP algorithm deployed over a variable number of worker notes. The following chart plots the average latency for a given NLP engine to process documents, with a variable number of nodes.


Hash partitioning does improve performance significantly in the case of a small number of nodes. However, it out-performs the traditional load balancing deployment by as much as 20% for larger number of nodes (or instances).

References


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Wednesday, October 31, 2018

K-Means Clustering in Java: Classification

Target audience: Intermediate
Estimated reading time: 6'

This article serves as the continuation of our journey into the Java-based implementation of K-means clustering. Building upon the components outlined in the preceding post, we now focus on constructing a classifier.

Follow me on LinkedIn

Overview

The basic components of the implementation of K-means clustering algorithms has been introduced in the previous post K-means clustering in Java: Components
This section describes the implementation of training and inference tasks for the model:
  • training: executed off-line during analysis of historical data
  • classification: executed at run-time to classify new obsdervations
Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.


Training

The learning method, train, implements the clustering algorithm. It iterates to minimize the sum of distances between all cluster data points & its centroid.
For each iteration (or epoch) the train method:
  1. assign observations to each cluster
  2. compute the centroid for each cluster, computeCentroid
  3. compute the total distance of all the observations with their respective centroid computeTotalDistance
  4. estimate the closest cluster for each observation
  5. re-assign the observation, updateCentroids
public int train() {
   int numIter = _maxIters, k = 0
   boolean inProgress = true;
   
   initialize();  
   while(inProgress) {
      for(KmeansCluster cluster : _clusters ) {
          cluster.attach(_obsList[k]);
          if( ++k >= _obsList.length) {
              inProgress = false;
              break;
          }
      }
   }
               
   computeTotalDistance();
   for(KmeansCluster cluster : _clusters ) {
        cluster.computeCentroid();
   }           
   computeTotalDistance();
  
   List<Observation> obsList = null; 
   KmeansCluster closestCluster = null;

    // main iterative method, that traverses all the clusters
    // computes the distance of observations relative to their centroid
    // and re-assign the observations.
   for(int i = 0; i < _maxIterations; i++) {

      for(KmeansCluster cluster : _clusters ) { 
          obsList = new ArrayList<Observation>();
          for( Observation point : cluster.getDataPointsList()) {
             obsList.add(point);
          }
   
          for( Observation point : obsList) {
             double minDistance = Double.MAX_VALUE, distance = 0.0;
             closestCluster = null;
     
             for(KmeansCluster cursor : _clusters ) {
                 distance =  point.computeDistance(cursor.getCentroid());
                 if( minDistance >  distance) {
                    minDistance = distance;
                    closestCluster = cursor;
                 }
             }
             updateCentroids(point, cluster, closestCluster);
          }
      }

     // Simple convergence criteria              
     if( _convergeCounter >= _minNumConvergeIters ) {
        numIters= i;
        break;
     }
 
   }
   return numIters;
}


Classification

The classification of a new observations is simple and consists in evaluating the distance between the new data point and each centroid and selecting the cluster with the shortest distance. The classify method extract the index or label of the cluster that is the most suitable (closest in distance) to the new observation.

public int classify(double[] obs) {
   double bestScore = Double.MAX_VALUE, distance = 0.0;
   int clusterId = -1;
       
   for(int k = 0; k < _centroids.length; k++) {
       distance = _centroids[k].computeDistance(obs);
       if(distance < bestScore) {
           bestScore = distance;
           clusterId = k;
       }
    }
    return clusterId;
}

The code snippet below implements some of the supporting method to
- compute the loss function value (total distance) - initialize the centroid for each cluster - update the values of centroids.


private void computeTotalDistance() {
   float totalDistance = 0.0F;
     
   for(KmeansCluster cluster : _clusters ) {
        totalDistance += cluster.getSumDistances();
    }
  
   double error = Math.abs(_totalDistance - totalDistance);
   convergeCounter = ( error < _convergeCriteria) ? convergeCounter +1 : 0;      
   _totalDistance = totalDistance;
}


private void initialize() {
    double[] params = getParameters();
    int numVariables = params.length>>1
      
    double[] range = new double[numVariables];
    for( int k = 0, j = numVariables; k <numVariables; k++, j++ ) {
        range[k] = params[k] - params[j];
    }
        
    double[] x = new double[numVariables];
    int sz_1 = _clusters.length+1,  m = 1;
      
    for(KmeansCluster cluster : _clusters) {
        for( int k = 0, j = numVariables; k <numVariables; k++, j++ ) {
           x[k] = ((range[k]/sz_1)*m) + params[j];
        }
        cluster.setCentroid(x);
        m++;
    }
}
 


private void updateCentroids(Observation point,
                                                KmeansCluster cluster, 
                                                KmeansCluster bestCluster) {
   boolean update = bestCluster != null && bestCluster != cluster; 

   if( update ) {
       bestCluster.attach(point);
       cluster.detach(point);
       for(KmeansCluster cursor : _clusters ) {
          cursor.computeCentroid();
       }
       computeTotalDistance();
   }
}


Thank you for reading this article. For more information ...


References

  • The Elements of Statistical Learning   - T. Hastie, R.Tibshirani, J. Friedman  - Springer 2001
  • Machine Learning: A Probabilisitc Perspective 11.4.2.5 K-means algorithm - K. Murphy - MIT Press 2012
  • Pattern Recognition and Machine Learning: Chap 9 "Mixture Models and EM: K-means Clustering" C.Bishop - Springer Science 2006 
  • github.com/patnicolas