Wednesday, October 10, 2018

K-Means Clustering in Java: Components

Target audience: Advanced
Estimated reading time: 5'

K-means clustering stands as a prominent unsupervised learning technique, aiming to classify unlabelled data into distinct categories. Its primary objective is to identify inherent groupings within the dataset. To achieve this, the algorithm operates in cycles, designating each data entry to a specific group based on its defining features.

This introductory piece in our series delves into the implementation of K-means' fundamental components.


Table of contents
Follow me on LinkedIn
 

Overview

Among the clustering methods have been developed over the years from Spectral clustering, Non-negative Matrix factorization, Canopy to Hierarchical and K-means clustering. The K-means algorithm is by far the easiest to implement. This simplicity comes with a high price in terms of scalability and even reliability. However, as an unsupervised learning technique, K-means is still valuable for reducing the number of model features or detecting anomalies.

The objective is to classify observations or data points by groups that share common attributes or features. The diagram below illustrates the clustering of observations (x,y) for a simple 2-feature model.



Each cluster has a mean or centroid, m = ( .. m..). First we need to define a distance between an observation  X = (...x ..) and c. The Manhattan and Euclidean distances are respectively defined as:  \[d_{M} = \sum_{i=0}^{n}\left | x_{i} - m_{i}\right |\,\,\,,\,\,d_{E}= \sum_{i=0}^{n} (x_{i} - m_{i})^{2}\] The loss function for N cluster Cj is defined by \[W(C)=\frac{1}{2}\sum_{k=0}^{N-1}\sum_{c_{i}=k}\sum_{C_{j}} d(x_{i},x_{j})\]  The goal is to find the centroid m, and clusters C, that minimize the loss function as: \[C^{*}\left (i \right ) = arg\min _{k\in [0,N-1]}d (x_{i}, m_{k})\]

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.


Distances and observations

First we need to define the distance between each observation and the centroid of a cluster. The class hierarchy related to the distance can be implemented as nested classes as there is no reason to "expose" to client code. The interface, Distance, defines the signature of the computation method. For sake of simplicity, the sample code implements only the Manhattan and Euclidean distances.  Exceptions, validation of method arguments, setter and getter methods are omitted for the sake of simplicity.

protected interface Distance {
    public double compute(double[] x, Centroid centroid);
}

    // Defintion of d(x,y) =|x-y|
protected class ManhattanDistance implements Distance {
   
   public double compute(double[] x, Centroid centroid) {
       double sum = 0.0, xx = 0.0;
       for( int k = 0; k< x.length; k++) {
           xx = x[k] - centroid.get(k);
           if( xx < 0.0) {
              xx = -xx;
           }
           sum += xx;
       }
       return sum;
    }
}

  // Definition d(x,y)= sqr(x-y) 
protected class EuclideanDistance implements Distance {
  
    public double compute(double[] x, Centroid centroid) {
        double sum = 0.0, xx = 0.0;
        for( int k = 0; k < x.length; k++) {
            xx = x[k] - centroid.get(k);
            sum += xx*xx;
        } 
        return Math.sqrt(sum);
    } 
}

Next, we define an observation (or data point) as a vector or array of floating point values, in our example.  An observation can support heterogeneous types (boolean, integer, float point,..) as long as they are normalized to [0,1]. In our example we simply normalized over the maximum values for all the observations.

public final class Observation {

    // use Euclidean distance that is shared between all the instances
   private static Distance metric = new EuclideanDistance();

   public static void setMetric(final Distance metric) {
      this.metric = metric;
   }
 
   private double[] _x  = null;
   private int  _index  = -1;

   public Observation(double[] x, int index) { 
       _x = x; 
       _index = index; 
   }
   
    // compute distance between each point and the centroid
   public double computeDistance(final Centroid centroid) {
       return metric.compute(_x, centroid);
   }

    // normalize the value of data points.
   public void normalize(double[] maxValues) {
      for( int k = 0; k < _x.length; k++) {
         _x[k] /= maxValues[k];
      }
   }
}


Clustering

Centroid for each cluster are computed iteratively to reduce the loss function.  The centroid values are computed using the mean of each feature across all the observations. The method Centroid.compute initialize the data points belonging to a cluster with the list of observations and compute the centroid values _x by normalizing with the number of points. 

protected class Centroid {
   private double[] _x = null;       
       
   protected Centroid() {}
   protected Centroid(double[] x) {
       Array.systemCopy(_x, x, 0, x.length, sizeOf(double));
   }

    // Compute the centoid values _x by normalizing with the number of points.
   protected void compute(final List<Observation> observations)  {
       double[] x = new double[_x.length];
       Arrays.fill(x, 0.0);
           
      for( Observation point : observations ) {
         for(int k =0; k < x.length; k++) {
            x[k] += point.get(k);
         }
      }
    
      int numPoints = observations.size();
      for(int k =0; k < x.length; k++) {
         _x[k] = x[k]/numPoints;
      }
   }
}

A cluster, KmeansCluster is defined by its label (_index in this example) a centroid, _centroid, the list of observations, _observations it contains and the current loss associated to the cluster (sum of the distance between all observations and the centroid).
The cluster behavior is defined by the following methods:
  • computeCentroid: compute the sum of the distance between all the point in this cluster and the centroid.
  • attach: Attach or add a new observation to this cluster
  • detach: Remove an existing observations from this cluster.

public final class KmeansCluster {
   private int       _index   = -1;
   private Centroid  _centroid  = null; 
   private double    _sumDistances  = 0.0;
   private List<observation> _observations = new ArrayList<Observation>()

   public void computeCentroid() {
      _centroid.compute( _observations );
      for( Observation point : _observations  ) {
          point.computeDistance(_centroid);
      }
      computeSumDistances();
   }

     // Attach a new observation to this cluster.
   public void attach(final Observation point) { 
      point.computeDistance(_centroid);
      _observations.add(point);
      computeSumDistances();
   }

   public void detach(final Observation point) {
      _observations.remove(point);
      computeSumDistances();
   }
           
   private void computeSumDistances() { 
      _sumDistances = 0.0;     
      for( Observation point : _observations) {
        _sumDistances += point.computeDistance(_centroid);
      }
   }
      //....
}

Finally, the clustering class implements the training and run-time classification. The train method iterates across all the clusters and for all the observations to reassign the observations to each cluster. The iterative computation ends when either the loss value converges or the maximum number of iterations is reached. 

If the algorithm use K clusters with M observations with N variables the execution time for creating the clusters is K*M*N. If the algorithm converges after T iterations then the overall execution is T*K*M*N. For instance, the K-means classification of 20K observations and data with 25 dimension, using 10 clusters, converging after 50 iterations requires  250,000,000 evaluations! The constructor create the clustering algorithm with a predefined number of cluster, K, and a set of observations.
The method getCentroids retrieves the current list of centroids (value of centroid vectors)

public final class KmeansClustering { 
   private KmeansCluster[] _clusters = null;
   private Observation[] _obsList = null; 
   private double _totalDistance  = 0.0;
   private Centroid[] _centroids = null;
   
   public KmeansClustering(int numClusters, final Observation[] obsList) {   
      _clusters = new KmeansCluster[numClusters];
      for (int i = 0; i < numClusters; i++) {
         _clusters[i] = new KmeansCluster(i);
      }
      _obsList = obsList;
   }

 
   public final List<double[]> getCentroids() {
       List<double[]> centroidDataList = null;

       if(_clusters != null &&; _clusters.length < 0) {
           centroidDataList = new LinkedList<double[]>();
           for( KmeansCluster cluster : _clusters) {
               centroidDataList.add(cluster.getCentroid().getX());
           }
       }
       return centroidDataList;
   }
}

The next article, K-means clustering in Java: Classification  describes the implementation of the training and classification tasks.

Thank you for reading this article. For more information ...

References

  • The Elements of Statistical Learning   - T. Hastie, R.Tibshirani, J. Friedman  - Springer 2001
  • Machine Learning: A Probabilisitc Perspective 11.4.2.5 K-means algorithm - K. Murphy - MIT Press 2012
  • Pattern Recognition and Machine Learning: Chap 9 "Mixture Models and EM: K-means Clustering" C.Bishop - Springer Science 2006 
  • github.com/patnicolas


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Saturday, September 8, 2018

Scala 2.x Implicit Classes to Extend Libraries

Target audience: Beginner
Estimated reading time: 3'

Implicit methods offer significant utility in establishing global type conversions, provided their semantics are clearly grasped. But where do implicit classes stand in this spectrum?
This article delves into the theory and practicality of Scala's implicit classes, highlighting their potential to enhance existing Scala standard libraries.

Follow me on LinkedIn
Notes
  • For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.
  • The code associated with this article is written using Scala 2.11.8

Overview

Implicit methods are quite useful in defining global type conversion (as long as the semantic is clearly understood). But what about implicit classes?
Implicit classes can be used to extend existing Scala standard library classes. Most of Scala classes are declared final or implement a sealed trait. Composition is a viable alternative to Inheritance in term of re-usability: the class to extend is wrapped into a helper or utility class. However, a helper/container class adds an indirection and "pollute" the name space.

 
Let's look at an example of extension of standard library.

Example

The use case consists of extending the Try class, scala.util.Try with a Either semantic: Execute a function if the code throws an exception, and another function if the computation succeeds. Such simple design allows computation flows to be routed depending on unexpected state.


The main construct is the declaration of a implicit class that wraps Try. A recovery function rec is called if an exception is thrown, a computation f is performed otherwise.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import scala.util._
 
implicit class Try2Either[T](_try: Try[T]) {
    
  def toEither[U](rec: ()=>U)(f: T=>T): Either[U,T] = 
     _try match {
         case Success(res) => Right(f(res))
         case Failure(e) => 
             println(e.toString)
             Left(rec())  
     }
}

You may notice that the method toEither is curried to segregate the two parameterized type T and U. It also comply with the semantic of Either with Left element for error (and recovery) and Right element dedicated to the successful outcome. 

Let's take the example of the normalization of a large array of floating point values by their sum. A small value will generate a rounding error during the normalization and an exception is thrown. However we do not want to burden the client code with handling the exception (the client method may not know the context of the exception after all). In this example the recovery function rec instantiates the class Recover which is responsible for a retry, potentially from a different state.


Implicit classes have an important limitation in terms of re-usability. You cannot override a default method without having to sub-class the original Scala standard library class, which is not an option in our example because Try is a sealed abstract class.
As with implicit methods, the name of the class is never actually used in the code but need to reflect the intention of the developer. Let's apply this implicit class

type DVector = Array[Double]

  // Arbitrary recovery class
class Recover {
     def reTry(x: DVector): DVector  = Array[Double](0.0)
}
  
  // Normalization of a vector. Proceeds to the
  // next computation (log) if the normalization succeeds
  // or invoke the recovery procedure otherwise
def normalize(x: DVector ): Either[Recover, DVector] = Try {
    val sum = x.sum 

    if(sum < 0.01) 
        throw new IllegalStateException(s"sum $sum")
    x.map( _ / sum) 
}
 .toEither(() => new Recover)( (v: DVector) => v.map( Math.log(_))

The implementation is very elegant. There is no need for new semantic and naming convention and the return type Either is very well understood in the Scala development community.

Thank you for reading this article. For more information ...

References

Monday, July 9, 2018

Speed up Fibonacci Sequence with Tail Recursion

Target audience: Intermediate
Estimated reading time: 5'

The Fibonacci sequence is characterized by each number being the summation of its two immediate predecessors. In this article, we delve into an examination and comparison of tail recursion's efficiency in Scala, specifically when applied to the Fibonacci sequence.


Table of contents
Follow me on LinkedIn

There are many ways to skin a cat and implement the Fibonacci recursion. This post illustrates the power of tail recursion in Scala, relative to alternative solution such as recursion without tail elimination and iterations. This post compares the performance of the simple "out-of-the-box" implementation of the Fibonacci recursion with an Scala implementation using tail-recursion.

Note: This post relies on Scala 2.11.8


Non-tail recursive Fibonacci

The Fibonacci algorithm is defined as
   f(n) = f(n-1) + f(n-2)
   f(0) = 0
   f(1) = 1
Let's look at text book recursive implementation.

 
def fibonacci(n: Int): Long =  {
    if(n == 0) 0
    else if(n == 1) 1
    else fibonacci1(n-1) + fibonacci1(n-2)
}

The duration of execution is recorded for values between 2 and 50, run 10,000 times.
The profile of the performance of the non-tail recursion of Fibonacci should not be a surprise. After all, the time complexity of this implementation is a staggering O(2n)


Tail recursion

Let's look at one example of implementation of the Fibonacci formula using the Scala tail recursion.

def fibonacci(n: Int): Long = {
   
   @tailrec
   def _fibonacci(i: Int, next: Long, sum: Long): Long = 
       if(i ==0) sum
       else fibonacci(i-1, sum+ next, next)

   fibonacci(n, 1, 0)
}

The performance of the tail recursion is measure against a plain-vanilla iterative execution. The duration of execution is recorded for values between 5 and 200, run 100,000 times.

 
def fibonacci(n: Int): Long = {
    var s1 = 0
    var s2 = 1 
 
    (n-1 to 0 by -1).foreach(
       _ => {
           val tmp = s2 + s1
           s1 = s2
           s2 = tmp    
      }
   )
   
   s1
}

The following chart illustrates that the performance of the implementation of Fibonacci using the tail recursion and the iterative execution is almost identical.

Iterative computation

For good measure, I thought it would be interesting to measure the performance of an iterative approach using a fold.

def fibonacci(n: Int): Long = 
 
  (n-1 to 0 by -1)./:((0, 1)) {  
      case ((s1, s2), n) => { 
          val tmp = s2 + s1
          (s2, tmp) 
      }
   }._1

Here is the performance profile for the iterative computation.
Note: The fold (:/) can be implemented as a closure using a recursive formula for the function argument.

Conclusion

Scala tail recursion is indeed as effective as the iterative implementation at processing the Fibonacci formula. The performance of the recursion without tail elimination is extremely poor. Aggregators or reducers such as fold, reduce and scan add some overhead to the computation although the performance is still decent.

Thank you for reading this article. For more information ...

References



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Thursday, May 24, 2018

Streaming Moving Averages with Apache Spark

Target audience: Intermediate
Estimated reading time: 5'

Spark Streaming offers an abstraction called Discretized Stream, or DStream, which signifies a continuous flow of data. This can either stem from an initial source or emerge as a result of transforming the input data. Essentially, a DStream is characterized by a consistent sequence of Resilient Distributed Datasets (RDD) [1].


Follow me on LinkedIn
Important notes
  • This post describes the streaming features as implemented in Apache Spark 2.0.x 
  • Stream should not confused with structured streaming [2].

Use case

The creation of models through supervised learning from a given training sets usually requires the application of some pre-processing algorithm such as smoothing, filtering or extrapolating existing data for missing values. 

Computing the moving average of a time series is a simple and convenient technique used to filter out noise and reduce the impact of outliers on trend lines. For this post, we consider the simplest form of moving average over a period of p values, as defined in the following formula \[\tilde{x_{t}}= \frac{1}{p}\sum_{j=t-p+1}^{t}x_{j}\] 
The iterative (or recursive) form is defined as \[\tilde{x_{t}}=\tilde{x_{t-1}}+\frac{1}{p}(x_{t}-x_{t-p})\] 

Spark DStreams

This section refers to discretized streams that were introduced in Spark 1.x. Structured streaming will be discussed in a future post.



Thank you for reading this article. For more information ...

References



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3