Showing posts with label Query. Show all posts
Showing posts with label Query. Show all posts

Friday, April 8, 2022

Implement Non-blocking MongoDB Client in Scala

Target audience: Intermediate
Estimated reading time: 4'

MongoDB, a renowned NoSQL ('non-relational') database, stands out as an open-source, document-centric storage system. Its inherent adaptability and capability for horizontal expansion make it especially adept for housing voluminous data science datasets, outperforming traditional RDBMS.
This piece outlines the configuration and realization of a non-blocking Scala client for MongoDB.


Table of contents
Setup
     Installation
      MongoDB libraries     
Follow me on LinkedIn
Important notes
  • This post assumes that the reader is familiar with MongoDB architecture and key components. The formal documentation regarding MongoDB database, client libraries and connectors is available at MongoDB docs. YouTube.com offers few videos on introducing MongoDB [ref 1].
  • Implementation relies on Scala 2.12.15

Setup 

Installation

The installation procedure for the community version of MongoDB depends on the environment and operating system [ref 2]. Here is an overview:

MacOS
  • brew tap mongodb/brew
  • brew install mongodb-community@7.0
with the default paths for the MongoDB system files:
  • Configuration file $BIN_DIRECTORY/etc/mongod.conf
  • Log directory  $BIN_DIRECTORY/mongodb
  • Data directory  $BIN_DIRECTORY/var/mongodb
BIN_DIRECTORY = /usr/local for Intel
                                 /opt/homebrew for Apple M1/M2

Linux
  • sudo yum install -y mongodb-org
  • sudo yum install -y mongodb-org-7.0 mongodb-org-database-7.0 mongodb-org-server-7.0 mongodb-mongosh-7.0 mongodb-org-mongos-7.0 mongodb-org-tools-7.0
with the default paths for the MongoDB system files:
  • Configuration file /etc/mongod.conf
  • Log directory  /var/log/mongodb
  • Data directory /var/lib/mongo

Docker
  • docker pull mongodb/mongodb-community-server
  • docker run --name mongo -d mongodb/mongodb-community-server:latest

Binary components
  • The server mongod
  • The sharded cluster query router mongos
  • The MongoDB Shell, mongosh

Configuration
The default paths for the MongoDB system files:
  • Configuration file $BIN_DIRECTORY/mongod.conf
  • Log directory  $BIN_DIRECTORY/mongodb
  • Data directory  $BIN_DIRECTORY/var/mongodb

Note: MongoDB Atlas is a hosted MongoDB service option in the cloud which requires no installation overhead and offers a free tier to get started.


MongoDB libraries

  • MongoDB Java driver (mongodb-driver-sync) for synchronous access to MongoDB collection using a Java client [ref 3].
  • MongoDB 2.12 Scala driver (mongodb-scala-driver) relies on Scala futures for unblocked access to collections [ref 4].
  • Mongo Scala BSON Library (mongo-scala-bson_2.12) is a Scala wrapper / extension to the bson library.
  • Spark connector for MongoDB (mongo-spark-connector_2.12provides integration between MongoDB and Apache Spark including Spark structured streaming [ref 5].
  • Reactive Mongo for Scala (reactivemonogo): With a classic synchronous database driver, each operation blocks the current thread until a response is received. This library is a Scala driver that provides fully non-blocking and asynchronous I/O operations [ref 6].
  • Reactive stream for MongoDB (mongodb-driver-reactivestreamsprovides asynchronous stream processing with non-blocking back pressure for MongoDB [ref 7].

  # MongoDB Java driver for synchronous operations
<dependency>
   <groupId>org.mongodb</groupId>
   <artifactId>mongodb-driver-sync</artifactId>
   <version>4.10.0</version>
</dependency>

    # MongoDB driver/client API for Scala 2.12
<dependency>
   <groupId>org.mongodb.scala</groupId>
   <artifactId>mongo-scala-driver_2.12</artifactId>
   <version>4.10.0</version>
</dependency>

    # Scala BSON formatter (read/write) for MongoDB
<dependency>
   <groupId>org.mongodb.scala</groupId>
   <artifactId>mongo-scala-bson_2.12</artifactId>
   <version>4.10.0</version>
</dependency>

     # MongoDB connector for Apache Spark running Scala 2.12
<dependency>
   <groupId>org.mongodb.spark</groupId>
   <artifactId>mongo-spark-connector_2.12</artifactId>
   <version>10.1.1</version>
</dependency>

   # Non blocking query/insert ... to MongoDB collections
<dependency>
   <groupId>org.reactivemongo</groupId>
   <artifactId>reactivemongo_2.12</artifactId>
   <version>1.1.0-noshaded-RC10</version>
</dependency>

  # Asynchronous streams processing with non-blocking back pressure
<dependency>
   <groupId>org.mongodb</groupId>
   <artifactId>mongodb-driver-reactivestreams</artifactId>
   <version>4.10.0</version>
</dependency>

Note: Not all the libraries are required to run the following use case


MongoDB client

Mongo file descriptor

MongoDB stores objects using BSON scheme. Binary data is stored as a binary byte array which limited to 16 Mbytes. MongoDB defines a files system that allowed to get around this limitation.
Therefore, we define a descriptor, MongoFile, as a Scala trait to specify the minimum set of attributes required to identify its type.

trait MongoFile {
   val fileId: UUID
   val source: String
   val path: String
   val fileType: String
   val createdAt: ZonedDateTime
   val contentLength: Long
}

Parameterized instantiation

The MongoDB client, MongoDBClient is parameterized with type implementing MongoFile trait descriptor. This design allows future client implementation to extend the descriptor with contextual information, if needed.

The client is fully defined by the arguments of its constructor:
  • host: Local or remote host for MongoDB server
  • port: Port the daemon is listening
  • dbName: Name of the database
  • isFromFile: Boolean flag that specifies if the document resides in files (true) or database collections (false)

final class MongoDBClient[T <: MongoFile] private (
   host: String, 
   port: Int, 
   dbName: String, 
   isFromFile: Boolean) {

  private lazy val mongoDBClient = try {
    val connectionString = s"mongodb://${host}:${port.toString}"

    // [1]: Full definition of Mongo client with connection and server API settings
    val serverApi = ServerApi.builder.version(ServerApiVersion.V1).build()

    val settings = MongoClientSettings
         .builder()
         .applyConnectionString(ConnectionString(connectionString))  
         .serverApi(serverApi)
         .build()
    val _mongoClient = MongoClient(settings)
   
    // [2]: Instantiation of mongo database using custom CODEC
    val _mongoDatabase =
      if(isFromFile) {
        val customCodecs: CodecRegistry = fromProviders(classOf[T])
        val codecRegistry = fromRegistries(
          customCodecs,
          DEFAULT_CODEC_REGISTRY  // [3]: Default CODEC
        )
        _mongoClient.getDatabase(dbName).withCodecRegistry(codecRegistry)
      }
      else
        _mongoClient.getDatabase(dbName)

    Some((_mongoClient, _mongoDatabase))
  }
  catch {
    case e: MongoClientException =>
      logger.error(e.getMessage)
      None
  }

The construct initially the configuration settings with server API and the connection string to instantiate the MongoDB client, _mongoClient [1].
The instantiation of the database, _mongoDatabase from encoded files relies on the file format, T <: MongoFile [2]. This implementation used the default CODEC registry which is appropriate for most of use cases [3].

The tuple, (Mongo client, Mongo database) is wrapped into an option which convert a exceptions into None.

Non-blocking operations

MongoDB is often the go-to choice for housing and querying large data sets, especially when interactions occur at less frequent intervals. Under these conditions, it's imperative that client operations continue seamlessly without being paused, pending the finalization of data insertions or query processes.

In the Scala landscape, this uninterrupted flow is facilitated by futures. They kickstart the query process (for instance, collection.find), momentarily step back, and then resume command by invoking scala.concurrent.Await.

    Await.result(
       mongoCollectionFile.find.toFuture(),
       Duration.Inf
    )

The diagram below illustrates the interaction and control of execution during a query to MongoDB collection, using Scala future.

Illustration of non-blocking query to MongoDB


The getCollectionFiles method relinquishes execution resources while waiting for safe completion of the query.

def getCollectionFile(collectionName: String): Option[MongoCollection[T]] =
  mongoDatabase.map(_.getCollection[MongoFile](collectionName))


def getCollectionFiles(collectionName: String): Option[Seq[T]] = try {

  getCollectionFile(collectionName).map(
     mongoCollectionFile => 
       // Asynchronously wait for the MongoDB response
       Await.result( mongoCollectionFile.find.toFuture(), Duration.Inf )
  )
}
catch {
  case e: Exception =>
    logger.error(e.getMessage)
    None
}


Retrieving collections

Let's look as the implementation of two methods to retrieve all the documents associated to a MongoDB collection, collectionName.

The first method, getBSONDocuments, retrieves the documents from a collection, collectionName.
The second method, getTextDocuments, retrieves the documents and converts them into readable text by decoding the BSON type.

def getBSONDocuments(collectionName: String): Seq[Document] =

  mongoDBClient.map {
    case  (_, db) => 
       val collection = db.getCollection(collectionName)
       Await.result(collection.find.toFuture(),  Duration.Inf)
  }.getOrElse({
     logger.warn(s"Mongo collection ${collectionName} not found")
     Seq.empty[Document]
  })



def getTextDocuments(collectionName: String): Seq[String] = {
  val documents = getBSONDocuments(collectionName)
    
  if(documents.nonEmpty) {

    //  [1] Accumulate text representation of MongoDB collections
    documents.foldLeft(ListBuffer[String]())(
      (accumulator, document) => {
        val fieldIterator = document.iterator
        val strBuf = new StringBuilder()

         // [2] iterates across the fields in the document...
        while (fieldIterator.hasNext) {
          val (key, value): (String, bson.BsonValue) = fieldIterator.next()

          // [3] Uses the type-key to decode the value into appropriate type....
          val output = value.getBsonType match {
             case BsonType.STRING => s"${key}:${value.asString().getValue}"
             case BsonType.DOUBLE => s"${key}:${value.asDouble().getValue}"
             case BsonType.ARRAY => s"${key}:${value.asArray()}"
             case BsonType.DOCUMENT => s"${key}:${value.asDocument().toString}"
             case BsonType.INT32 =>  s"${key}:${value.asInt32().getValue}"
             case BsonType.INT64 =>  s"${key}:${value.asInt64().getValue}"
             case BsonType.BOOLEAN => s"${key}:${value.asBoolean().getValue}"
             case BsonType.BINARY => s"${key}:${value.asBinary()}"
             case BsonType.TIMESTAMP => s"${key}:${value.asTimestamp().getValue}"
             case BsonType.DATE_TIME => s"${key}:${value.asDateTime().getValue}"
             case BsonType.REGULAR_EXPRESSION => s"${key}:${value.asRegularExpression().toString}"
             case _ => ""
           }
           strBuf.append(output).append(", ")
         }
         accumulator += strBuf.toString()
       }
     )
  }
  else {
    logger.warn(s"Mongo collection $collectionName is empty")
    Seq.empty[String]
  }
}

The method getTextDocuments iterates through the BSON formatted documents retrieved from the Mongo collection [1]. Each BSON document is a list of fields with the format (key:value).

The method then retrieves the iterator, fieldIterator, from each BSON document [2]. The key of each field is used to extract, match the type of field and convert its value to a string [3] (i.e., a string representation Boolean:true for a boolean value with key BsonType.BOOLEAN).

Thank you for reading this article. For more information ...


References



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3