Overview
- Craft an implementation of the Kullback-Leibler divergence in Scala, leveraging the Apache Spark framework for distributed computation.
- Showcase how the Kullback-Leibler divergence can be utilized to contrast various continuous probability density functions.
Scala implementation
object KullbackLiebler {
final val EPS = 1e-10
type DATASET = Iterator[(Double, Double)]
def execute( // #1
xy: DATASET,
f: Double => Double): Double = {
val z = xy.filter{ case(x, y) => abs(y) > EPS}
- z.foldLeft(0.0){
case(s, (x, y)) => {
val px = f(x)
s + px*log(px/y)}
}
}.
def execute( // #2
xy: DATASET,
fs: Iterable[Double=>Double]): Iterable[Double] = fs.map(execute(xy, _))
}
// One Gaussian distribution
val gauss = (x: Double) => INV_PI*exp(-x*x/2.0)
// Uniform distribution
val uniform = (x: Double) => x
// Log normal distribution
val logNormal = (x: Double) => {
val lx = log(x)
INV_PI/x*exp(-lx*lx)
}
// Gamma distribution
val gamma = (x: Double, n: Int) =>
exp(-x)*pow(x, n)/fact(n)
// Log Gamma distribution
val logGamma = (x: Double, alpha: Int, beta: Int) =>
exp(beta*x)*exp(-exp(x)/alpha)/(pow(alpha, beta)*fact(beta-1))
// Simple computation of m! (for beta)
def fact(m: Int): Int = if(m < 2) 1 else m*fact(m-1)
// Normalization factor for Beta
val cBeta = (n: Int, m: Int) => {
val f = if(n < 2) 1 else fact(n-1)
val g = if(m < 2) 1 else fact(m-1)
f*g/fact(n+m -1).toDouble
}
// Beta distribution
val beta = (x: Double, alpha: Int, beta: Int) =>
pow(x, alpha-1)*pow(x, beta-1)/cBeta(alpha, beta)
// Chi-Square distribution
val chiSquare = (x: Double, k: Int) => {
val k_2 = k >>1
pow(x, k_2-1)*exp(-0.5*x) /((1 << k_2)*fact(k_2))
}
val gamma2 = gamma( _ : Double, 2)val beta25 = beta(_: Double, 2, 5) val chiSquare4 = chiSquare(_: Double, 4)
- The implementation of the probability density functions in the code snippet is not optimized for performance.
- Please refer to your favorite Statistics handbook to learn more about these probability distributions.
Spark to the rescue
- Analytics: Spark's capability to quickly produce responses allows for interactive data handling, rather than relying solely on predefined queries.
- Data Integration: Often, the data from various systems is inconsistent and cannot be combined for analysis directly. To obtain consistent data, processes like Extract, Transform, and Load (ETL) are employed. Spark streamlines this ETL process, making it more cost-effective and time-efficient.
- Streaming: Managing real-time data, such as log files, is challenging. Spark excels in processing these data streams and can identify and block potentially fraudulent activities.
- Machine Learning: The growing volume of data has made machine learning techniques more viable and accurate. Spark's ability to store data in memory and execute repeated queries swiftly facilitates the use of machine learning algorithms.
- Segment the primary dataset.
- Distribute the initial sequence of probability density functions.
- Apply the Kullback-Leibler formula to each segment using mapPartitions.
- Retrieve and combine the divergence value from every segment.
final val NUM_DATA_POINTS = 10000000. // #1
val numTasks: Int = 128 // #2
val conf = new SparkConf() // #3
.setAppName("Kullback-Liebler")
.setMaster(s"local[$numTasks]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
Next let's implement the broadcasting mechanism for the batch or sequence of probability density functions.
lazy val pdfs = Map[Int, Double => Double](
1 -> uniform,
2 -> logNormal,
3 -> gamma1,
4 -> gamma2,
5 -> gamma4,
6 -> logGamma12,
7 -> logGamma22,
8 -> beta22,
9 -> beta25,
10 -> chiSquare2,
11 -> chiSquare4
)
val pdfs_broadcast = sc.broadcast[Iterable[Int]](pdfs.map(_._1))
val kl_rdd = master_rdd.mapPartitions(
(it:DATASET) => {
val pdfsList = pdfs_broadcast.value.map( n => pdfs.get(n).get)
execute(it, pdfsList).iterator
}
)
val kl_master = kl_rdd.collect
val divergences = (0 until kl_master.size by pdfs.size)
.foldLeft(Array.fill(pdfs.size)(0.0))( (s, n) => {
(0 until pdfs.size).foreach(j =>
s.update(j, kl_master(n+j)))
s
}
).map( _ / kl_master.length)
Conclusion
References
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning.
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3