Showing posts with label Akka future. Show all posts
Showing posts with label Akka future. Show all posts

Friday, April 8, 2022

Implement Non-blocking MongoDB Client in Scala

Target audience: Intermediate
Estimated reading time: 4'

MongoDB, a renowned NoSQL ('non-relational') database, stands out as an open-source, document-centric storage system. Its inherent adaptability and capability for horizontal expansion make it especially adept for housing voluminous data science datasets, outperforming traditional RDBMS.
This piece outlines the configuration and realization of a non-blocking Scala client for MongoDB.


Table of contents
Setup
     Installation
      MongoDB libraries     
Follow me on LinkedIn
Important notes
  • This post assumes that the reader is familiar with MongoDB architecture and key components. The formal documentation regarding MongoDB database, client libraries and connectors is available at MongoDB docs. YouTube.com offers few videos on introducing MongoDB [ref 1].
  • Implementation relies on Scala 2.12.15

Setup 

Installation

The installation procedure for the community version of MongoDB depends on the environment and operating system [ref 2]. Here is an overview:

MacOS
  • brew tap mongodb/brew
  • brew install mongodb-community@7.0
with the default paths for the MongoDB system files:
  • Configuration file $BIN_DIRECTORY/etc/mongod.conf
  • Log directory  $BIN_DIRECTORY/mongodb
  • Data directory  $BIN_DIRECTORY/var/mongodb
BIN_DIRECTORY = /usr/local for Intel
                                 /opt/homebrew for Apple M1/M2

Linux
  • sudo yum install -y mongodb-org
  • sudo yum install -y mongodb-org-7.0 mongodb-org-database-7.0 mongodb-org-server-7.0 mongodb-mongosh-7.0 mongodb-org-mongos-7.0 mongodb-org-tools-7.0
with the default paths for the MongoDB system files:
  • Configuration file /etc/mongod.conf
  • Log directory  /var/log/mongodb
  • Data directory /var/lib/mongo

Docker
  • docker pull mongodb/mongodb-community-server
  • docker run --name mongo -d mongodb/mongodb-community-server:latest

Binary components
  • The server mongod
  • The sharded cluster query router mongos
  • The MongoDB Shell, mongosh

Configuration
The default paths for the MongoDB system files:
  • Configuration file $BIN_DIRECTORY/mongod.conf
  • Log directory  $BIN_DIRECTORY/mongodb
  • Data directory  $BIN_DIRECTORY/var/mongodb

Note: MongoDB Atlas is a hosted MongoDB service option in the cloud which requires no installation overhead and offers a free tier to get started.


MongoDB libraries

  • MongoDB Java driver (mongodb-driver-sync) for synchronous access to MongoDB collection using a Java client [ref 3].
  • MongoDB 2.12 Scala driver (mongodb-scala-driver) relies on Scala futures for unblocked access to collections [ref 4].
  • Mongo Scala BSON Library (mongo-scala-bson_2.12) is a Scala wrapper / extension to the bson library.
  • Spark connector for MongoDB (mongo-spark-connector_2.12provides integration between MongoDB and Apache Spark including Spark structured streaming [ref 5].
  • Reactive Mongo for Scala (reactivemonogo): With a classic synchronous database driver, each operation blocks the current thread until a response is received. This library is a Scala driver that provides fully non-blocking and asynchronous I/O operations [ref 6].
  • Reactive stream for MongoDB (mongodb-driver-reactivestreamsprovides asynchronous stream processing with non-blocking back pressure for MongoDB [ref 7].

  # MongoDB Java driver for synchronous operations
<dependency>
   <groupId>org.mongodb</groupId>
   <artifactId>mongodb-driver-sync</artifactId>
   <version>4.10.0</version>
</dependency>

    # MongoDB driver/client API for Scala 2.12
<dependency>
   <groupId>org.mongodb.scala</groupId>
   <artifactId>mongo-scala-driver_2.12</artifactId>
   <version>4.10.0</version>
</dependency>

    # Scala BSON formatter (read/write) for MongoDB
<dependency>
   <groupId>org.mongodb.scala</groupId>
   <artifactId>mongo-scala-bson_2.12</artifactId>
   <version>4.10.0</version>
</dependency>

     # MongoDB connector for Apache Spark running Scala 2.12
<dependency>
   <groupId>org.mongodb.spark</groupId>
   <artifactId>mongo-spark-connector_2.12</artifactId>
   <version>10.1.1</version>
</dependency>

   # Non blocking query/insert ... to MongoDB collections
<dependency>
   <groupId>org.reactivemongo</groupId>
   <artifactId>reactivemongo_2.12</artifactId>
   <version>1.1.0-noshaded-RC10</version>
</dependency>

  # Asynchronous streams processing with non-blocking back pressure
<dependency>
   <groupId>org.mongodb</groupId>
   <artifactId>mongodb-driver-reactivestreams</artifactId>
   <version>4.10.0</version>
</dependency>

Note: Not all the libraries are required to run the following use case


MongoDB client

Mongo file descriptor

MongoDB stores objects using BSON scheme. Binary data is stored as a binary byte array which limited to 16 Mbytes. MongoDB defines a files system that allowed to get around this limitation.
Therefore, we define a descriptor, MongoFile, as a Scala trait to specify the minimum set of attributes required to identify its type.

trait MongoFile {
   val fileId: UUID
   val source: String
   val path: String
   val fileType: String
   val createdAt: ZonedDateTime
   val contentLength: Long
}

Parameterized instantiation

The MongoDB client, MongoDBClient is parameterized with type implementing MongoFile trait descriptor. This design allows future client implementation to extend the descriptor with contextual information, if needed.

The client is fully defined by the arguments of its constructor:
  • host: Local or remote host for MongoDB server
  • port: Port the daemon is listening
  • dbName: Name of the database
  • isFromFile: Boolean flag that specifies if the document resides in files (true) or database collections (false)

final class MongoDBClient[T <: MongoFile] private (
   host: String, 
   port: Int, 
   dbName: String, 
   isFromFile: Boolean) {

  private lazy val mongoDBClient = try {
    val connectionString = s"mongodb://${host}:${port.toString}"

    // [1]: Full definition of Mongo client with connection and server API settings
    val serverApi = ServerApi.builder.version(ServerApiVersion.V1).build()

    val settings = MongoClientSettings
         .builder()
         .applyConnectionString(ConnectionString(connectionString))  
         .serverApi(serverApi)
         .build()
    val _mongoClient = MongoClient(settings)
   
    // [2]: Instantiation of mongo database using custom CODEC
    val _mongoDatabase =
      if(isFromFile) {
        val customCodecs: CodecRegistry = fromProviders(classOf[T])
        val codecRegistry = fromRegistries(
          customCodecs,
          DEFAULT_CODEC_REGISTRY  // [3]: Default CODEC
        )
        _mongoClient.getDatabase(dbName).withCodecRegistry(codecRegistry)
      }
      else
        _mongoClient.getDatabase(dbName)

    Some((_mongoClient, _mongoDatabase))
  }
  catch {
    case e: MongoClientException =>
      logger.error(e.getMessage)
      None
  }

The construct initially the configuration settings with server API and the connection string to instantiate the MongoDB client, _mongoClient [1].
The instantiation of the database, _mongoDatabase from encoded files relies on the file format, T <: MongoFile [2]. This implementation used the default CODEC registry which is appropriate for most of use cases [3].

The tuple, (Mongo client, Mongo database) is wrapped into an option which convert a exceptions into None.

Non-blocking operations

MongoDB is often the go-to choice for housing and querying large data sets, especially when interactions occur at less frequent intervals. Under these conditions, it's imperative that client operations continue seamlessly without being paused, pending the finalization of data insertions or query processes.

In the Scala landscape, this uninterrupted flow is facilitated by futures. They kickstart the query process (for instance, collection.find), momentarily step back, and then resume command by invoking scala.concurrent.Await.

    Await.result(
       mongoCollectionFile.find.toFuture(),
       Duration.Inf
    )

The diagram below illustrates the interaction and control of execution during a query to MongoDB collection, using Scala future.

Illustration of non-blocking query to MongoDB


The getCollectionFiles method relinquishes execution resources while waiting for safe completion of the query.

def getCollectionFile(collectionName: String): Option[MongoCollection[T]] =
  mongoDatabase.map(_.getCollection[MongoFile](collectionName))


def getCollectionFiles(collectionName: String): Option[Seq[T]] = try {

  getCollectionFile(collectionName).map(
     mongoCollectionFile => 
       // Asynchronously wait for the MongoDB response
       Await.result( mongoCollectionFile.find.toFuture(), Duration.Inf )
  )
}
catch {
  case e: Exception =>
    logger.error(e.getMessage)
    None
}


Retrieving collections

Let's look as the implementation of two methods to retrieve all the documents associated to a MongoDB collection, collectionName.

The first method, getBSONDocuments, retrieves the documents from a collection, collectionName.
The second method, getTextDocuments, retrieves the documents and converts them into readable text by decoding the BSON type.

def getBSONDocuments(collectionName: String): Seq[Document] =

  mongoDBClient.map {
    case  (_, db) => 
       val collection = db.getCollection(collectionName)
       Await.result(collection.find.toFuture(),  Duration.Inf)
  }.getOrElse({
     logger.warn(s"Mongo collection ${collectionName} not found")
     Seq.empty[Document]
  })



def getTextDocuments(collectionName: String): Seq[String] = {
  val documents = getBSONDocuments(collectionName)
    
  if(documents.nonEmpty) {

    //  [1] Accumulate text representation of MongoDB collections
    documents.foldLeft(ListBuffer[String]())(
      (accumulator, document) => {
        val fieldIterator = document.iterator
        val strBuf = new StringBuilder()

         // [2] iterates across the fields in the document...
        while (fieldIterator.hasNext) {
          val (key, value): (String, bson.BsonValue) = fieldIterator.next()

          // [3] Uses the type-key to decode the value into appropriate type....
          val output = value.getBsonType match {
             case BsonType.STRING => s"${key}:${value.asString().getValue}"
             case BsonType.DOUBLE => s"${key}:${value.asDouble().getValue}"
             case BsonType.ARRAY => s"${key}:${value.asArray()}"
             case BsonType.DOCUMENT => s"${key}:${value.asDocument().toString}"
             case BsonType.INT32 =>  s"${key}:${value.asInt32().getValue}"
             case BsonType.INT64 =>  s"${key}:${value.asInt64().getValue}"
             case BsonType.BOOLEAN => s"${key}:${value.asBoolean().getValue}"
             case BsonType.BINARY => s"${key}:${value.asBinary()}"
             case BsonType.TIMESTAMP => s"${key}:${value.asTimestamp().getValue}"
             case BsonType.DATE_TIME => s"${key}:${value.asDateTime().getValue}"
             case BsonType.REGULAR_EXPRESSION => s"${key}:${value.asRegularExpression().toString}"
             case _ => ""
           }
           strBuf.append(output).append(", ")
         }
         accumulator += strBuf.toString()
       }
     )
  }
  else {
    logger.warn(s"Mongo collection $collectionName is empty")
    Seq.empty[String]
  }
}

The method getTextDocuments iterates through the BSON formatted documents retrieved from the Mongo collection [1]. Each BSON document is a list of fields with the format (key:value).

The method then retrieves the iterator, fieldIterator, from each BSON document [2]. The key of each field is used to extract, match the type of field and convert its value to a string [3] (i.e., a string representation Boolean:true for a boolean value with key BsonType.BOOLEAN).

Thank you for reading this article. For more information ...


References



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Tuesday, August 22, 2017

Scala Futures with Callbacks

Target audience: Intermediate
Estimated reading time: 6'

Futures are a critical concept in Scala and parallel computing. This post describes and illustrates the futures in Scala and Akka as a non-blocking mechanism in distributed computing.


Table of contents
Follow me on LinkedIn
Notes
  • Implementation relies on Scala 2.11.8
  • For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted 

Overview

Akka supports futures for which the requester never block waiting for a response. A message or request is sent for a execution and the expect response (success or failure/exception) is delivered asynchronously back to the client controller.
In a previous post, we looked into Akka futures for which the client is blocked until all the concurrent tasks (futures) have completed. In this post, we look into the non-blocking approach that relies on onSuccess and onFailure callback handlers. We reuse the same example as with the blocking design


Futures callback design

The first step is to create a controller to manage the concurrent tasks, FuturesController. The controller takes the input time series xt and a list of model candidates. As the time series uses a single variable, the model are simply defined as a one variable function  (x: Double) => f(x).

 1
 2
 3
 4 
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
case class Launch()
 
final class FuturesController(
    xt: Array[Double],
    xs: List[Double => Double])
    (implicit timeout: Timeout) extends Actor {
   
    // Event/message handling loop/thread
    // implemented as a partial function
    override def receive = {
        case s: Launch => 
            sender ! processFutures(createFutures)
    }
 
    def createFutures: List[Future[Double]]
    def processFutures(futures: List[Future[Double]]): Double
}

The event handler receive (line 10) for the message Launch (line 1) creates as many futures as needed (equal to the number of models to evaluate) createFutures (line 12). The futures are then passed to the method processFutures for handling the callbacks notification (line 16).
The futures are created with their computation (or execution) unit. In this particular case, the task consists of computing the least squared error for each model relative to the input dataset (lines 5 to 9)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def createFutures: List[Future[Double]] = 
   xs.map(f => Future[Double] { 
    
       val yt = Range(0, xt.size).map( f(_) )
       val r = (xt, yt).zipped.foldLeft(0.0)((sum, z) => {
           val diff = z._1 - z._2
           sum + diff*diff
       })
       Math.sqrt(r)
  })

The method processFutures iterates through the notification (success/failure) from all the futures [1]. It filters out the future task(s) which fails [2] then select the model with the lowest least square error [3].
This version allows to catch an exception before applying the filter. The filter catches the incorrect values marked at -1 (2).

def processFutures(futures: List[Future[Double]]): Double =

   futures.map(f => { //1 
       f onSuccess { case y => y }
       f onFailure { case e: Exception => { 
           println(e.toString)
           -1.0 
       }}
   }).filter( _ != -1.0) // 2
     .zipWithIndex.minBy( _._1)._2 //3


Evaluation

Let's consider a very simple application which consists of distributing the following computation
   s(x) = f(x) + g(x) + h(x)  // ff(0), ff(1), ff(2)
Each computation relies on a different ff polynomial functions with an increasing order of complexity. Each of the three computations are assigned to one actor running on a dedicated CPU core.

val ff = List[Double => Double](
   (x: Double) => 3.0*x,
   (x: Double) => x*x + x,
   (x: Double) => x*x*x + x*x + x
)

The compute method create a list of futures from the list ff of computational functions. Then the method process the call back onSuccess and onFailure
The first version is a detailed implementation of the creation and processing of futures

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

def compute(x: Double): Double = {
    // Create a future for each of the component of ff
   val futures = ff.map(f => Future[Double] { f(x) } )
 
     // processes the results from each future
   var sum = 0.0
   futures.foreach( f => {
       f onSuccess { case x: Double => sum += x }
       f onFailure { case e: Exception => println(e.toString) }
    })
    sum
}
 
val x = 2.0
println(s"f($x) = ${compute(x)}")

The appropriate implicit conversions needs to be imported (line 2). A future is created from each of the three polynomial function ff (line 6). Then the output of each of the future execution is aggregated (line 11). This version is relies on a two step process
  • Creation of futures (line 6)
  • Computation and aggregation (lines 10 - 14)
A more compact version relies on the foldLeft higher order method to combine the creation and execution of futures.

1
2
3
4
5
6
7
8
9
def compute(x: Double): Double = {

   ff.map(f => Future[Double] { f(x) } )./:(0.0)((s, f) => {
       var y = 0.0
       f onSuccess { case x: Double => y = s + x  }
       f onFailure { case e: Exception =>  y = s }
       y
   })
}


References