Saturday, November 29, 2014

Apache Spark/MLlib for K-means

Target audience: Intermediate
Estimated reading time: 5'


This post illustrates the Apache Spark MLlib library with the plain-vanilla K-means clustering (unsupervised) algorithm.


Table of contents


Overview

Apache Spark attempts to address the limitation of Hadoop in terms of performance and real-time processing by implementing in-memory iterative computing, which is critical to most discriminative machine learning algorithms. Numerous benchmark tests have been performed and published to evaluate the performance improvement of Spark relative to Hadoop. In case of iterative algorithms, the time per iteration can be reduced by a ratio of 1:10 or more.
The core element of Spark is Resilient Distributed Datasets (RDD), which is a collection of elements partitioned across the nodes of a cluster and/or CPU cores of servers. An RDD can be created from local data structures such as list, array or hash tables, from the local file system or the Hadoop distributed file system (HDFS).

Note: The code presented in this post uses Apache Spark version 1.3.1. There is no guarantee that the implementation of the K-means in this post will be compatible with future version of Apache Spark.

Apache Spark RDDs

The operations on an RDD in Spark are very similar to the Scala higher order methods. These operations are performed concurrently over each partition. Operations on RDD can be classified as:
* Transformation: convert, manipulate and filter the elements of an RDD on each partition
* Action: aggregate, collect or reduce the elements of the RDD from all partitions

An RDD can persist, be serialized and cached for future computation. Spark provides a large array of pre-built transforms and actions which go well beyond the basic map-reduce paradigm. Those methods on RDDs are a natural extension of the Scala collections making code migration seamless for Scala developers.

Apache Spark supports fault-tolerant operations by allowing RDDs to persist both in memory and in the file systems. Persistency enables automatic recovery from node failures. The resiliency of Spark relies on the supervisory strategy of the underlying Akka actors, the persistency of their mailboxes and replication schemes of HDFS.
Spark is initialized through its context. For instance, a local Spark deployment on 8 cores, with 2 Gbytes allocated for data processing (RDDs) in memory only storage level and 512 Mbytes for the master process is defined by creating a spark configuration instance of type SparkConf

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
 
val sparkConf = new SparkConf()
            .setMaster("local[8]")
            .setAppName("SparkKMeans")
            .set("spark.executor.memory", "2048m")
            .set("spark.storageLevel", "MEMORY_ONLY")
            .set("spark.driver.memory", "512M")
            .set("spark.default.parallelism", "16")
 
implicit val sc = new SparkContext(sparkConf))



Apache Spark MLlib

MLlib is a scalable machine learning library built on top of Spark. As of version 1.0, the library is a work in progress. The main components of the library are:
  • Classification algorithms, including logistic regression, Naïve Bayes and support vector machines
  • Clustering limited to K-means in version 1.0
  • L1 & L1 Regularization
  • Optimization techniques such as gradient descent, logistic gradient and stochastic gradient descent and L-BFGS
  • Linear algebra such as Singular Value Decomposition
  • Data generator for K-means, logistic regression and support vector machines.
The machine learning byte code is conveniently included in the spark assembly jar file built with the simple build tool, sbt.
Let's consider the K-means clustering components bundled with Apache Spark MLlib. The K-means configuration parameters are:
  • K Number of clusters (line 4)
  • maxNumIters Maximum number of iterations for the minimizing the reconstruction error< (line 5)/li>
  • numRuns Number of runs or episode used for training the clusters (line 6)
  • caching Specify whether the resulting RDD has to be cached in memory (line 7)
  • xt The array of data points (type Array[Double]) (line 8)
  • sc Implicit Spark context
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
 
class SparkKMeans(
    K: Int, 
    maxNumIters: Int, 
    numRuns: Int,
    caching: Boolean,
    xt: Array[Array[Double]]) (implicit sc: SparkContext) {
   
 
  def train: Try[KMeansModel] = {
    val kmeans = new KMeans
    kmeans.setK(K)
    kmeans.setMaxIterations(maxNumIters)
    kmeans.setRuns(numRuns)
   
    val rdd = sc.parallelize(xt.map(new DenseVector(_)))
    rdd.persist(StorageLevel.MEMORY_ONLY)
    if( caching )
       rdd.cache
    kmeans.run(rdd)
  }
}

The clustering model is created by the train method (line 11). Once the Spark/MLlib K-means is instantiated and initialized (lined 12 -15), the ipnt data set xt is converted into a DenseVector then converted into a RDD (line 17). Finally the input RDD is fed to the Kmeans (kmeans.run)