Sunday, April 2, 2017

Recursive Minimum Spanning Tree in Scala

Target audience: Intermediate
Estimated reading time: 6'

Determining the best way to link nodes is frequently encountered in network design, transport ventures, and electrical circuitry. This piece explores and showcases an efficient computation for the minimum spanning tree (MST) through the use of Prim's algorithm, which is built on tail recursion.
This article assumes a very minimal understanding of undirected graphs.

Note: Implementation relies on Scala 2.11.8

Overview

Each connectivity in a graph is usually defined as a weight (cost, length, time...). The purpose is to compute the schema that connects all the nodes that minimize the total weight. This problem is known as the minimum spanning tree or MST related to the nodes connected through an un-directed graph [ref 1].

Several algorithms have been developed over the last 70 years to extract the MST from a graph. This post focuses on the implementation of the Prim's algorithm in Scala.

There are many excellent tutorials on graph algorithm and more specifically on the Prim's algorithm. I recommend Lecture 7: Minimum Spanning Trees and Prim’s Algorithm [ref 2].

Let's PQ is a priority queue, a Graph G(V, E) with n vertices V and E edges w(u,v). A Vertex v is defined by 
  • An identifier
  • A load factor, load(v)
  • A parent tree(v)
  • The adjacent vertices adj(v)
The Prim's algorithm can be easily expressed as a simple iterative process. It consists of using a priority queue of all the vertices in the graph and update their load to select the next node in the spanning tree. Each node is popped up (and removed) from the priority queue before being inserted in the tree.
PQ <- V(G)
foreach u in PQ
   load(u) <- INFINITY
 
while PQ nonEmpty
   do u <- v in adj(u)
      if v in PQ && load(v) < w(u,v)
      then
         tree(v) <- u
         load(v) <- w(u,v)
The Scala implementation relies on a tail recursion to transfer vertices from the priority queue to the spanning tree.

Graph definition

The first step is to define a graph structure with edges and vertices [ref 3]. The graph takes two arguments:
  • numVertices number of vertices
  • start index of the root of the minimum spanning tree
The vertex class has three attributes
  • id identifier (arbitrary an integer)
  • load dynamic load (or key) on the vertex
  • tree reference to the minimum spanning tree
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final class Graph(numVertices: Int, start: Int = 0) {
 
  class Vertex(val id: Int, 
     var load: Int = Int.MaxValue, 
     var tree: Int = -1) 

  val vertices = List.tabulate(numVertices)(new Vertex(_))
  vertices.head.load = 0
  val edges = new HashMap[Vertex, HashMap[Vertex, Int]]

  def += (from: Int, to: Int, weight: Int): Unit = {
    val fromV = vertices(from)
    val toV = vertices(to)
    connect(fromV, toV, weight)
    connect(toV, fromV, weight)
  }

  def connect(from: Vertex, to: Vertex, weight: Int): Unit = {
    if( !edges.contains(from))
      edges.put(from, new HashMap[Vertex, Int])    
    edges.get(from).get.put(to, weight)
  }   
  // ...
}

The vertices are initialized by with a unique identifier id, and a default load Int.MaxValue and a default depth tree (lines 3-5). The Vertex class resides within the scope of the outer class Graph to avoid naming conflict. The vertices are managed through a linked list (line 7) while the edges are defined as hash maps with a map of other edges as value (line 9). The operator += add a new edge between two existing vertices with a specified load (line 11) 
In most case, the identifier is a character string or a data structure. As described in the pseudo-code, the load for the root of the spanning tree is defined a 0.

The load is defined as an integer for performance's sake. It is recommended to convert (quantization) a floating-point value to an integer for the processing of very large graph, then convert back to a original format on the resulting minimum spanning tree.
The edges are defined as hash table with the source vertex as key and the hash table of destination vertex and edge weight as value. 


The graph is un-directed therefore the connection initialized in the method
+= are bi-directional.

Priority queue

The priority queue is used to re-order the vertices and select the next vertex to be added to the spanning tree.

Note: There are many different implementation of priority queues in Scala and Java. You need to keep in mind that the Prim's algorithm requires the queue to be re-ordered after its load is updated (see pseudo-code). The PriorityQueue classes in the Scala and Java libraries do not allow elements to be removed or to be explicitly re-ordered. An alternative is to use a binary tree, red-black tree for which elements can be removed and the tree re-balanced.

The implementation of the priority has an impact on the time complexity of the algorithm. The following implementation of the priority queue is provided only to illustrate the Prim's algorithm. 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class PQueue(vertices: List[Vertex]) {
   var queue = vertices./:(new PriorityQueue[Vertex])((pq, v) => pq += v)
    
   def += (vertex: Vertex): Unit = queue += vertex
   def pop: Vertex = queue.dequeue
   def sort: Unit = {}
   def push(vertex: Vertex): Unit = queue.enqueue(vertex)
   def nonEmpty: Boolean = queue.nonEmpty
}
  
type MST = ListBuffer[Int]
implicit def orderingByLoad[T <: Vertex]: Ordering[T] = Ordering.by( - _.load)  


The Scala PriorityQueue class required the implicit ordering of vertices using their load (line 2). This accomplished by defining an implicit conversion of a type T with upper-bound type Vertex to Ordering[T] (line 12).

Notes
  • The type T has to be a sub-class of Vertex. A direct conversion from Vertex type to Ordering[Vertex] is not allowed in Scala.
  • We use the PriorityQueue from the Java library as it provides more flexibility than the Scala TreeSet.

Prim's algorithm

This implementation is the direct translation of the pseudo-code presented in the second paragraph. It relies on the efficient Scala tail recursion (line 5).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def prim: List[Int] = {
  val queue = new PQueue(vertices)
   
  @scala.annotation.tailrec
  def prim(parents: MST): Unit = {
    if( queue.nonEmpty ) {
      val head = queue.pop
      val candidates = edges.get(head).get
          .filter{ 
            case(vt,w) => vt.tree == -1 && w <= vt.load
          }
 
      if( candidates.nonEmpty ) {
        candidates.foreach {case (vt, w) => vt.load = w }
        queue.sort
      }
      parents.append(head.id)
      head.tree = 1
      prim(parents)
    }
  }
  val parents = new MST
  prim(parents)
  parents.toList
}

As long as the priority queue is not empty (line 6), the next element is the priority queue is retrieved (line 7) for which is select the most appropriate candidate for the next vertex (line 8 - 11). The load of each candidate is updated (line 14) and the priority queue is re-sorted (line 15).
Although a tree set is a more efficient data structure for managing the set of vertices waiting to be weighted, it does not allow the existing priority queue to be resorted because of its immutability.

Time complexity

As mentioned earlier, the time complexity depends on the implementation of the priority queue. If E is the number of edges, and V the number of vertices:
  • Minimum spanning tree with linear queue: V2
  • Minimum spanning tree with binary heap: (E + V).LogV
  • Minimum spanning tree with Fibonacci heap: V.LogV
Note: See Summary of time complexity of algorithms for details.


References

[1Introduction to Algorithms Chapter 24 Minimum Spanning Trees - T. Cormen, C. Leiserson, R. Rivest - MIT Press 1989
[2Lecture 7: Minimum Spanning Trees and Prim’s Algorithm Dekai Wu, Department of Computer Science and Engineering - The Hong Kong University of Science & Technology
[3] Graph Theory Chapter 4 Optimization Involving Tree - V.K. Balakrishnan - Schaum's Outlines Series, McGraw Hill, 1997

    Sunday, February 26, 2017

    Implement Function Vectors in Scala

    Target audience: Intermediate
    Estimated reading time: 4'

    This article describes the definition and an implementation of the functions vector.

    Follow me on LinkedIn
    Note: Implementation relies on Scala 2.11.8

    Overview

    Space of functions are commonly used in machine learning (kernel functions) and differential geometry (tensors and manifolds).The most common technique to create a functions space is to define the space as a category and assign monadic operations to it.
    This post discusses an alternative, less elaborate approach to define a functions space as a container. The key operator on function is the composition as defined in the Function1[-T, +R] class:
        def andThen[A](g: (R) ⇒ A): (T) ⇒ A
         def compose[A](g: (A) ⇒ T): (A) ⇒ R
    
    Let's use andThen as the basic block to build our functions space.



    ... andThen

    The andThen is a member method of some of the Scala standard library collections such as List and HashMap.

    val xsInt = List[Int](4, 0, 9, 56, 11) 
     
    xsInt.andThen((n: Int) => n*n).mkString(",")
       // print 16,0,81,3136,121
    

    As mentioned in the introduction, the Function1 implements andThen to compose two functions as this o otherFunction where the o composition operator is defined as
         f o g:  (t: T) => f(g(t))
    
    The method is implemented using the andThen method

    val fg = Math.sin andThen (x: Double) => x*x
     
    Console.println(s"fg(5.0): ${fg(5.0)}")
    


    Function vectors space

    A function space is defined as a space of function vectors, similar to Euclidean space for which variables is defined as a vector or array of real values. A function vector is commonly used to convert a vector or tensor from one space to another non-euclidean space by transforming the original coordinates (x,y,z) to another coordinates in a different space (f(x,y,z), ..) using the function vector.

    Let's implement the functions vector as the class FunctionVector (line 1) and its two most important method, composition (line 2) and the dot product (tensor product) (line 3).

    1
    2
    3
    4
    5
    6
    class FunctionVector[T](fVec: List[T=>T]) {
       def andThen(op: T => T): T=>T
       def dot(opVec: List[T=>T])(implicit num: Numeric[T]): T => T
       def map(op: T => T): List[T=>T]
       ...
    }
    

    The dot product is defined as the product of two vectors functions. For example in a function vector spaces of dimension 3, v(f,g,h) and v'(f',g',h')
        dot: (x,y,z) -> f o f'(x,y,z) + g o g'(x,y,z) + h o h'(x,y,z)
    
    The method andThen composes this function vector with a function op and generates a 'cumulative' composed function as
        f o g o h o op
    
    The iteration along the function vector is implemented as a tail recursion. The method relies on the List[T=>T].andThen method (line 6).

    1
    2
    3
    4
    5
    6
    7
    8
    9
    def andThen(g: T => T): T => T = {
    
       @scala.annotation.tailrec
       def andThen(xsf: List[T=>T])(op: T => T): T => T =
          if(xsf.size == 0) op 
          else andThen(xsf.drop(1))(xsf.head andThen op)
       
      andThen(fVec.reverse)(op)
    }
    

    The 'dot' product takes another function vector of same size as argument. The two vectors are zipped to generate a Iterable[T=>T] vector of composed function elements using a map. Finally the resulting dot function is computed by generating a new function with summation of the elements of the composed functions vector. The summation requires the instance of Numeric to be declared as an implicit argument.

    def dot(gVec: List[T=>T])(implicit num: Numeric[T]): T => T = { 
     
        val composed = fVec.zip(opVec).map(fg => fg._1.andThen(fg._2))
        (t: T) => composed.map( _(t)).sum
    }
    

    Contrary to the andThen method, the map convert a function vector to another function vector by applying a natural transformation.

      // return List[T => T]
    def map(op: T => T) = fVec.map( _ andThen op) 
     

    Finally, the definition of the dot product can be extended to any aggregation method aggr

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    def dot(
      opVec: List[T=>T], 
      aggr: (T =>T, T=>T) =>  (T=>T)): T => T = {
      
        val composed = fVec.zip(opVec).map( 
            fg => fg._1 andThen fg._2
        )
      
        (t: T) => composed./:((t: T) => t)(
            (h,f) =>aggr(h,f)
        )(t)
    }
    

    The aggregation method passed as argument of the dot method (line 3) is a simplified version of the aggregation defined in the Scala standard library. It is used to implement the dot method on the composed functions vectors (line 10). 

    Let's apply our new found knowledge about FunctionVector:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    val functionsList = List[Function1[Double,Double]](
       Math.sin, Math.sqrt
    )
       
    val vec = new FunctionVector[Double](functionsList)
    val output1 = vec.andThen((x: Double) => x*x)
    val opVec = List[Double=>Double](
     (x: Double)=> x+ 1.0, 
     (x: Double)=> Math.exp(-x)
    )
    val output2 = vec.dot(opVec)
    

    For evaluating our Function vectors classes, we used a list of functions of type Double => Double (line 1): a sinusoidal and a square root functions (line 2).
    Once the function vector is created (line 5), it is available to be composed with the existing list of functions of the class FunctionVector (lines 5 and 6).


    References

    Monday, January 16, 2017

    Histogram-Based Approximation Using Apache Spark

    Target audience: Intermediate
    Estimated reading time: 5'

    This post illustrates the histogram-based function approximation for very large data sets using Apache Spark and Scala.

    Follow me on LinkedIn

    Note: Implementation relies on Scala 2.11.8, Spark 2.0

    Overview

    The previous post introduces and describes a model to approximate or fit a function to a given data set, using an histogram implemented in Scala. (see Histogram-based approximation using Scala).
    This post re-implements the dynamic resizable histogram using Apache Spark with limited changes in the original code base.


    Histogram

    The Apache Spark version of histogram-based approximation reuses the classes Bin and Histogram described in the previous post. We made some minor changes to the constructors of the Histogram. As a reminder the Histogram class was defined as follows:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    final class Histogram(var step: Double, 
         private var values: mutable.ArrayBuffer[Bin]) {
      var min = values.head._x - 0.5*step
      var max = values.last._x + 0.5*step
    
      private val maxNumBins = (values.length+1)<<RADIX
    
      def this(first: Double, step: Double, initNumBins: Int) = 
          this(step, Range(0, initNumBins-1)
          .scanLeft(first)((s, n) => s + step)
          ./:(new mutable.ArrayBuffer[Bin])(
                  (vals, x) => vals += (new Bin(x)) )
           )
    
      def this(params: HistogramParams) = 
          this(params.first, params.step, params.initNumBins)
      
      def train(x: Double, y: Double): Unit
      final def predict(x: Double): Double
       // ….
    }
    

    The class Histogram has three constructors, for convenience's sake: 
    • Instantiation of an histogram using an estimate of the _x value of the first bin, the width of the bins, step and the initial number of bins, initNumBins. This constructor is invoked for the first batch of data .
    • Instantiation of an histogram using an initial given step and an existing array of bins/buckets of width step. This constructor is invoked for subsequent batch of data (line 8-12).
    • Instantiation of an histogram using its parameters params (line 15).
    The parameters for the histogram are encapsulated in the following HistogramParams class described in a previous post.

    I added an implementation of the concatenation of array bins which is required by the reducer implemented by a fold action on Spark data nodes (line 6)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    object Histogram {
      final val EPS = 1e-12
      final val RADIX = 8
      final def sqr(x: Double): Double = x*x
    
      def +(itBins1: Array[Bin], itBins2: Array[Bin]): Array[Bin]={
        val histBins = new mutable.ArrayBuffer[Bin]
        itBins1.scanLeft(histBins)((newBins, bin) => newBins +=bin)
        itBins2.scanLeft(histBins)((newBins, bin) => newBins +=bin)
        histBins.toArray
      }
    }
    


    Test data generation

    For testing purpose, we need to generate a data set of very large (or even infinite) size. For this purpose we load an existing data set from a file which is duplicated then modified with a random uniform noise to create a large number of batch to be recursively processed by Spark data nodes. 

    The amplitude of the randomization (synthetic noise) is significant enough to generate different data point while preserving the original data pattern so the algorithm can be easily validated.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    final class DataGenerator(sourceName: String, nTasks: Int) {
      private final val DELIM = " "
      private final val RATIO = 0.05
      var datasetSize: Int = _
    
        // Generate the RDD from an existing data set template
        //  and a noise function
      def apply(sc: SparkContext): RDD[(Float, Float)] = {
        val r = new Random(System.currentTimeMillis + 
               Random.nextLong)
        val src = Source.fromFile(sourceName)
        val input = src.getLines.map(_.split(DELIM))
          ./:(new mutable.ArrayBuffer[(Float, Float)])
            ((buf, xy) => {
             val x = addNoise(xy(0).trim.toFloat, r)
             val y = addNoise(xy(1).trim.toFloat, r)
             buf += ((x, y))
        })
    
        datasetSize = input.size
        val data_rdd = sc.makeRDD(input, nTasks)
        src.close
        data_rdd
      }
    
      private def addNoise(value: Float, r: Random): Float = 
          value*(1.0 + RATIO*(r.nextDouble - 0.5)).toFloat
    }
    

    The data is generated for a specific number of tasks nTasks and loading the data from a local file sourceName (line 1). The generation of the histogram is implemented by the method apply. Once loaded from file (line 12), the input is generated locally as an array of (Float, Float) (line 12 -13).
    Next, the algorithm adds randomization addNoise (line 15, 16) before converting into a RDD (line 21).


    Apache Spark client

    The first step is to create a Spark configuration and context for a given number of concurrent tasks.

    1
    2
    3
    4
    5
    6
    val numTasks: Int = 12
    
    val conf = new SparkConf().setAppName("Simple Application")
       .setMaster(s"local[$numTasks]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    

    The main goal of the Spark driver is to apply a tail recursive function across all the batches of data. The histogram is computed/updated on the Spark data node, then reduced to a single histogram which is then broadcasted to all the nodes to be updated with the next batch of data.


    The RDDs are generated by an instance of the DataGenerator class as input (line 1). The input is loaded recursively into multiple partitions using the Spark method mapPartitionsWithIndex (line 11). Each partition generates a new array of bins (line30) once the class Histogram is instantiated (line 17) then trained (line 24). The data is then grouped by partition index (line 28). finally the fold method is invoked to generate the array of bins (line 30).

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    val input = new DataGenerator("small.data", numTasks)
    
    @scala.annotation.tailrec
    def execute(iter: Int, 
        initParam_brdcast: Broadcast[HistogramParams]): ArrayBuffer[Bin]={
      val initParam = initParam_brdcast.value
    
      if( iter <= 0)
        initParam.values
      else {
        val hist_rdd = input(sc).mapPartitionsWithIndex(
         (idx: Int,  it: Iterator[(Float, Float)]) => {
    
          // The first invocation used the broadcasted parameters for
          //  the histogram and the subsequent invocation update 
          // the distribution of histogram bins
          val histogram =
            if (initParam.values.length == 0) 
               new Histogram(initParam)
            else 
               new Histogram(step, initParam.values)
    
          // Train the histogram with a new batch of data
         it.foreach { case (x, y) => histogram.train(x, y) }
             
         histogram.getValues.map((idx, _)).iterator
         })
         .groupBy(_._1)
         .map { case (n, itBins) => itBins.map(_._2).toArray }
         .fold(Array[Bin]())(Histogram +)
    
        execute(iter-1, sc.broadcast(
         HistogramParams(0.0, initParam.step, 0, new ArrayBuffer[Bin]++hist_rdd)
        )
      }
    }
    

    Note:
    The invocation of the recursive method execute on the Apache Spark driver (line 4) broadcasts the current (recently updated) parameters of the histogram initParms_brdcast as one of the arguments of the recursive call (line 5). Upon receiving the broadcast values, the partitions load the next batch of data, generates and processes the RDD input.apply(sc) and update the histogram parameters (line 11).

    Evaluation

    Let's write a test driver to generate the Histogram using Spark RDD. After their initialization (line 1-4), the parameters of the histogram are simply broadcasted to the remote data/worker nodes (line 7,8).

    1
    2
    3
    4
    5
    6
    7
    8
    9
    val initNumBins = 2048
    val range = (-1.0, 2.0)
    var step = (range._2 - range._1)/initNumBins
    val maxRecursions = 4
       
    val hist = execute(maxRecursions, 
        sc.broadcast(
             HistogramParams(range._1 + 0.5 * step, step, initNumBins))
      )
    


    References