Showing posts with label Bloom filter. Show all posts
Showing posts with label Bloom filter. Show all posts

Monday, January 24, 2022

Distributed Bloom Filter

Target audience: Intermediate
Estimated reading time: 5'

                Have you ever been in need of a method that's both effective and has low latency for checking if a particular object or piece of data belongs to a significantly large dataset? 
In this article, we're going to explore the concept of a distributed Bloom filter, utilizing Apache Spark and the application of a cryptographic digest as well as estimate the false positives.


Table of contents
       Use case
       Implementation

Follow me on LinkedIn

What you will learn: Creating and analyzing a Bloom filter for processing extremely large datasets with Apache Spark and cryptographic hashes, with a focus on understanding and managing false positives.


Notes: 
  • For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted 
  • The code associated with this article is written using Scala 2.12.4 and Apache Spark 3.4.0
  • Source code available on GitHub GitHub Patrick Nicolas Bloom Filter

Overview

Bloom filter became a popular probabilistic data structure to enable membership queries (object x belonging to set or category Y) a couple of years ago. The main benefit of Bloom filter is to reduce the requirement of large memory allocation by avoiding allocating objects in memory much like HashSet or HashMap. The compact representation comes with a trade-off: although the filter does not allow false negatives it does not guarantee that there are no false positives. 

In other words, a query returns:
  • very high probability that an object belongs to a set
  • an object does not belong to a set.
A Bloom filter is quite often used as a front end to a deterministic algorithm

Theory

Let's consider a set A = {a0,.. an-1} of n elements for which a query to determine membership is executed. The data structure consists of a bit vector V of m bits and k completely independent hash functions that are associated to a position in the bit vector. The assignment (or mapping) of hash functions to bits has to follow a uniform distribution [ref 1].

The diagram below illustrates the basic mechanism behind the Bloom filter. The set A is defined by the pair a1 and a2. The hash functions h1 and h2 map the elements to bit position (bit set to 1) in the bit vector. The element b has one of the positions set to 0 and therefore does not belong to the set. The element c belongs to the set because its associated positions have bits set to 1

However, the algorithm does not prevent false positive. For instance, a bit may have been set to 1 during the insertion of previous elements and the query reports erroneously that the element belongs to the set.
The insertion of an elements depends on the h hash functions, therefore the time needed to add a new element is h (number of hash functions) and independent from size of the bit vector: asymptotic insertion time = O(h). However, the filter requires h bits for each element and is less effective that traditional bit array for small sets.

The probability of false positives decreases as the number n of inserted elements decreases and the size of the bitvector m, increases. The number of hash functions that minimizes the probability of false positives is defined by h = m.ln2/n.


Digest-based filter

Scala implementation

The approach utilizes cryptographic hash functions, referencing both [ref 2] and employing the MessageDigest class from the Java library [ref 3] to create unique hash codes. Details such as auxiliary methods and specific conditions for method parameters are excluded for simplicity. 
The initial step involves establishing the DigestBloomFilter class along with its properties:
  • length Number of entries in the filter
  • numHashFunctions Number of hash functions
  • hashingAlgo Hashing algorithm with SHA1 as default
  • set Array of bytes for entries in the Bloom filter
  • digest Digest used to generate hash values

class DigestBloomFilter[T: ClassTag](
  length: Int,             // Length or capacity of the Bloom filter
  numHashFunctions: Int,   // Number of hash functions
  hashingAlgo: HashingAlgo = SHA1Algo()  // Hashing algorithm SHA1, MD5, ..
) extends BloomFilter[T] {

  private[this] val set: Array[Byte] = new Array[Byte](length)
  private[this] val digest = Try(MessageDigest.getInstance(hashingAlgo.toString))
  private[this] var size: Int = 0

  // Add a new element of type T to the set of the Bloom filter
  override def add(t: T): Unit = {
     hashToArray(t).foreach(set(_) = 1)
     size += 1
  }

  // Add an array of elements of type T to the filter
  override def addAll(ts: Array[T]): Unit =
     if(ts.nonEmpty)
       digest.foreach(_ => ts.foreach(add))

   // Test whether the filter might contain a given element
  override def mightContain(t: T): Boolean =
     digest.map(_ => hashToArray(t).forall(set(_) == 1)).getOrElse(false)



The digest using the message digest of the java library java.security.MessageDigest.
The next step consists of defining the methods to add single generic element add(t: T) and array of elements addAll(ts: Array[T])
The method mightContain evaluates whether an element is contained in the filter. The method returns
  • true if the filter very likely contains the element
  • false if the filter DOES NOT contain this element
The add and mightContain methods relies on the hashToArray private method to initialize the set of entries, with the first value being the hashCode of the new entry.

def hashToArray(t: T): Array[Int] = 
   (0 until numHashFunctions).foldLeft(new Array[Int](numHashFunctions))(
     (buf, idx) => {
       val value = if(idx > 0) hash(buf(idx -1)) else hash(t.hashCode)
       buf.update(idx, value)
       buf
     }
  )

The hash method is the core of the Bloom filter: It consists of computing an index of an entry.

def hash(value: Int): Int = digest.map(
  d => {
    d.reset()
    d.update(value)
    Math.abs(new BigInteger(1, d.digest).intValue) % (set.length - 1)
  }
).getOrElse(-1)

The instance of the MessageDigest class, digest generates a hash value using either MD5 or SHA-1 algorithm. Tail recursion is used as an alternative to the iterative process to generate the set. 
The next code snippet implements a very simple implicit conversion from Int to Array[Byte] conversion 

val numBytes: Int = 4
val lastByte: Int = numBytes - 1

implicit def int2Bytes(value: Int): Array[Byte] = Array.tabulate(numBytes)(
    n => {
      val offset = (lastByte - n) << lastByte
      ((value >>> offset) & 0xFF).toByte
    }
  )

The conversion relies on the manipulation of bits from a 32 bit Integer to 4 bytes. Alternatively, you may consider a conversion from a long value to a 8 byte array.

Use case

This simple test consists of checking if a couple of values are indeed containing in the set. The filter will definitively reject 22 and very likely accept 5 & 97. If the objective is to confirm that 5 & 97 belong to the set, then a full-fledged hash table would have to be used.

val filter = new DigestBloomFilter[Long](100, 100)

val newValues = Array[Long](5L, 97L, 91L, 23L, 67L, 33L)
filter.addAll(newValues)

assert(filter.mightContain(5))
assert(filter.mightContain(97))
assert(!filter.mightContain(22))

Performance evaluation

Let's look at the behavior of the bloom filter under load. The test consists of adding 100,000,000 new random values then test if the filter contains a value (10,000) times. The test is run 10 times after a warm up of the JVM.

The first performance test evaluates the average time required to insert a new element into a Bloom filter which size range from 100M to 1Billion entries.
The second test evaluates the average search/query time for bloom filters with same range of size.




As expected the average time to load a new set of values and check the filter contains a specific value is fairly constant.


Spark-based filter

Apache Spark

Apache Spark is a free, open-source framework for cluster computing, specifically designed to process data in real time via distributed computing [ref 4]. Its primary applications include:
  • Analytics: Spark's capability to quickly produce responses allows for interactive data handling, rather than relying solely on predefined queries.
  • Data Integration: Often, the data from various systems is inconsistent and cannot be combined for analysis directly. To obtain consistent data, processes like Extract, Transform, and Load (ETL) are employed. Spark streamlines this ETL process, making it more cost-effective and time-efficient.
  • Streaming: Managing real-time data, such as log files, is challenging. Spark excels in processing these data streams and can identify and block potentially fraudulent activities.
  • Machine Learning: The growing volume of data has made machine learning techniques more viable and accurate. Spark's ability to store data in memory and execute repeated queries swiftly facilitates the use of machine learning algorithms.

Implementation

Apache Spark includes a Bloom filter implementation, BloomFilter, that's suitable for handling large datasets within data frames [ref 5]. It features two primary attributes:
The class SparkBloomFilter is parameterized with the type of elements T to be inserted and searched.

import org.apache.spark.util.sketch._



class SparkBloomFilter[T](bloomFilter: BloomFilter)  extends TBloomFilter[T] {
  
  def getExpectedFPRate: Double = bloomFilter.expectedFpp()

  override def mightContain(t: T): Boolean = bloomFilter.mightContain(t)

  override def add(t: T): Unit = bloomFilter.put(t)

  override def addAll(ts: Array[T]): Unit =
    if(ts.nonEmpty)
      ts.foreach(add)
}

The 3 methods are:
  • add: Insert a new element into the filter
  • addAll: Insert a set of elements into the filter
  • mightContain: Test is the filter may contain a given item, with a  degree of certainty associated with the rate of false positives.
A generic constructor allows the customization of the Bloom filter on Spark with the following attributes:
  • capacity: This refers to the maximum number of items that can be accommodated, as determined by hash functions.
  • targetFPRate: This denotes the anticipated false positive rate, which is the frequency at which the filter incorrectly identifies an item as present in the set.
def apply[T](capacity: Int, targetFPRate: Float): SparkBloomFilter[T] =
    new SparkBloomFilter[T]( BloomFilter.create(capacity, targetFPRate))

Impact of capacity on expected false positives

The anticipated false-positive rate indicates the probability of hash function collisions. As the filter's capacity increases, it becomes less probable that a new item will clash with one already in the filter. We'll delve into how the filter's capacity influences the collision rate within the hashing mechanism.

def computeExpectedFPRate(capacity: Int, input: Array[Long]): Double = {
   val filter = SparkBloomFilter[Long](capacity, 0.05F)
  
   input.foreach(n => filter.add(n)) /
   filter.getExpectedFPRate
}


val input = Array[Long](5L, 97L, 91L, 23L, 67L, 33L) ++
      Array.tabulate(10000)(n => n.toLong+100L)
 
(1000 until 12000 by 500).foreach(
    capacity => println(s"$capacity ${computeExpectedFPRate(capacity, input)}")
)

In this experiment, we chose capacity values within the range of 1,000 to 12,000 for a Bloom filter containing 10,006 entries. The expected false-positive rate decreases from 1.0 towards nearly 0.0. Specifically, at a capacity of 10,006, where each entry is allocated a distinct slot, the observed rate of false positives at 0.0509 aligns with the predetermined target of 0.5.


Application to datasets

A Bloom filter can be utilized on a specific column, referred to as columnName, within a dataset containing a vast number of values, named dataSet. This application requires setting a defined capacity and aiming for a certain false positive rate.

def apply[T](
  dataSet: Dataset[T],
  columnName: String,
  capacity: Int,
  targetFPRate: Double)(implicit sparkSession: SparkSession): SparkBloomFilter[T]= {
    val filter = dataSet.stat.bloomFilter(columnName, capacity, targetFPRate)
    new SparkBloomFilter[T](filter)
}

Finally, we can apply this constructor to create a Bloom filter on a very large Spark Dataset.

case class TEntry(id: String, value: Float)

val dataSize = 1000000
val dataSet = Seq.tabulate(dataSize)(
    n => TEntry(n.toString, Random.nextFloat())
).toDS()

val filter = SparkBloomFilter(dataSet, "id", dataSize, 0.05)



Thank you for reading this article. For more information ...

References

[1Bloom filter Wikipedia
[4] Apache Spark 3.4.0
[5] Spark BloomFilter


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3