I delve into a diverse range of topics, spanning programming languages, machine learning, data engineering tools, and DevOps. Our articles are enriched with practical code examples, ensuring their applicability in real-world scenarios.
Showing posts with label NoSQL. Show all posts
Showing posts with label NoSQL. Show all posts
Monday, December 16, 2024
Posts History
Labels:
Airflow,
BERT,
Big Data,
ChatGPT,
Data pipelines,
Deep learning,
Docker,
Genetic Algorithm,
Java,
Kafka,
Machine learning,
Monads,
NoSQL,
Python,
PyTorch,
Reinforcement Learning,
Scala,
Spark,
Streaming
Friday, April 8, 2022
Implement Non-blocking MongoDB Client in Scala
Target audience: Intermediate
Estimated reading time: 4'
MongoDB, a renowned NoSQL ('non-relational') database, stands out as an open-source, document-centric storage system. Its inherent adaptability and capability for horizontal expansion make it especially adept for housing voluminous data science datasets, outperforming traditional RDBMS.
This piece outlines the configuration and realization of a non-blocking Scala client for MongoDB.
Table of contents
SetupInstallation
Important notes:
- This post assumes that the reader is familiar with MongoDB architecture and key components. The formal documentation regarding MongoDB database, client libraries and connectors is available at MongoDB docs. YouTube.com offers few videos on introducing MongoDB [ref 1].
- Implementation relies on Scala 2.12.15
Setup
Installation
The installation procedure for the community version of MongoDB depends on the environment and operating system [ref 2]. Here is an overview:
MacOS
- brew tap mongodb/brew
- brew install mongodb-community@7.0
with the default paths for the MongoDB system files:
- Configuration file $BIN_DIRECTORY/etc/mongod.conf
- Log directory $BIN_DIRECTORY/mongodb
- Data directory $BIN_DIRECTORY/var/mongodb
BIN_DIRECTORY = /usr/local for Intel
/opt/homebrew for Apple M1/M2
Linux
- sudo yum install -y mongodb-org
- sudo yum install -y mongodb-org-7.0 mongodb-org-database-7.0 mongodb-org-server-7.0 mongodb-mongosh-7.0 mongodb-org-mongos-7.0 mongodb-org-tools-7.0
with the default paths for the MongoDB system files:
- Configuration file /etc/mongod.conf
- Log directory /var/log/mongodb
- Data directory /var/lib/mongo
Docker
- docker pull mongodb/mongodb-community-server
- docker run --name mongo -d mongodb/mongodb-community-server:latest
Binary components
- The server mongod
- The sharded cluster query router mongos
- The MongoDB Shell, mongosh
Configuration
The default paths for the MongoDB system files:
- Configuration file $BIN_DIRECTORY/mongod.conf
- Log directory $BIN_DIRECTORY/mongodb
- Data directory $BIN_DIRECTORY/var/mongodb
Note: MongoDB Atlas is a hosted MongoDB service option in the cloud which requires no installation overhead and offers a free tier to get started.
MongoDB libraries
- MongoDB Java driver (mongodb-driver-sync) for synchronous access to MongoDB collection using a Java client [ref 3].
- MongoDB 2.12 Scala driver (mongodb-scala-driver) relies on Scala futures for unblocked access to collections [ref 4].
- Mongo Scala BSON Library (mongo-scala-bson_2.12) is a Scala wrapper / extension to the bson library.
- Spark connector for MongoDB (mongo-spark-connector_2.12) provides integration between MongoDB and Apache Spark including Spark structured streaming [ref 5].
- Reactive Mongo for Scala (reactivemonogo): With a classic synchronous database driver, each operation blocks the current thread until a response is received. This library is a Scala driver that provides fully non-blocking and asynchronous I/O operations [ref 6].
- Reactive stream for MongoDB (mongodb-driver-reactivestreams) provides asynchronous stream processing with non-blocking back pressure for MongoDB [ref 7].
# MongoDB Java driver for synchronous operations
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.10.0</version>
</dependency>
# MongoDB driver/client API for Scala 2.12
<dependency>
<groupId>org.mongodb.scala</groupId>
<artifactId>mongo-scala-driver_2.12</artifactId>
<version>4.10.0</version>
</dependency>
# Scala BSON formatter (read/write) for MongoDB
<dependency>
<groupId>org.mongodb.scala</groupId>
<artifactId>mongo-scala-bson_2.12</artifactId>
<version>4.10.0</version>
</dependency>
# MongoDB connector for Apache Spark running Scala 2.12
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>10.1.1</version>
</dependency>
# Non blocking query/insert ... to MongoDB collections
<dependency>
<groupId>org.reactivemongo</groupId>
<artifactId>reactivemongo_2.12</artifactId>
<version>1.1.0-noshaded-RC10</version>
</dependency>
# Asynchronous streams processing with non-blocking back pressure
<dependency>
<groupId>org.mongodb</groupId> <artifactId>mongodb-driver-reactivestreams</artifactId> <version>4.10.0</version> </dependency>
Note: Not all the libraries are required to run the following use case
MongoDB client
Mongo file descriptor
MongoDB stores objects using BSON scheme. Binary data is stored as a binary byte array which limited to 16 Mbytes. MongoDB defines a files system that allowed to get around this limitation.
Therefore, we define a descriptor, MongoFile, as a Scala trait to specify the minimum set of attributes required to identify its type.
trait MongoFile {
val fileId: UUID
val source: String
val path: String
val fileType: String
val createdAt: ZonedDateTime
val contentLength: Long
}
Parameterized instantiation
The MongoDB client, MongoDBClient is parameterized with type implementing MongoFile trait descriptor. This design allows future client implementation to extend the descriptor with contextual information, if needed.
The client is fully defined by the arguments of its constructor:
- host: Local or remote host for MongoDB server
- port: Port the daemon is listening
- dbName: Name of the database
- isFromFile: Boolean flag that specifies if the document resides in files (true) or database collections (false)
final class MongoDBClient[T <: MongoFile] private (
host: String,
port: Int,
dbName: String,
isFromFile: Boolean) {
private lazy val mongoDBClient = try {
val connectionString = s"mongodb://${host}:${port.toString}"
// [1]: Full definition of Mongo client with connection and server API settings
val serverApi = ServerApi.builder.version(ServerApiVersion.V1).build()
val settings = MongoClientSettings
.builder()
.applyConnectionString(ConnectionString(connectionString))
.serverApi(serverApi)
.build()
val _mongoClient = MongoClient(settings)
// [2]: Instantiation of mongo database using custom CODEC
val _mongoDatabase =
if(isFromFile) {
val customCodecs: CodecRegistry = fromProviders(classOf[T])
val codecRegistry = fromRegistries(
customCodecs,
DEFAULT_CODEC_REGISTRY // [3]: Default CODEC
)
_mongoClient.getDatabase(dbName).withCodecRegistry(codecRegistry)
}
else
_mongoClient.getDatabase(dbName)
Some((_mongoClient, _mongoDatabase))
}
catch {
case e: MongoClientException =>
logger.error(e.getMessage)
None
}
The construct initially the configuration settings with server API and the connection string to instantiate the MongoDB client, _mongoClient [1].
The instantiation of the database, _mongoDatabase from encoded files relies on the file format, T <: MongoFile [2]. This implementation used the default CODEC registry which is appropriate for most of use cases [3].
The instantiation of the database, _mongoDatabase from encoded files relies on the file format, T <: MongoFile [2]. This implementation used the default CODEC registry which is appropriate for most of use cases [3].
The tuple, (Mongo client, Mongo database) is wrapped into an option which convert a exceptions into None.
Non-blocking operations
MongoDB is often the go-to choice for housing and querying large data sets, especially when interactions occur at less frequent intervals. Under these conditions, it's imperative that client operations continue seamlessly without being paused, pending the finalization of data insertions or query processes.
In the Scala landscape, this uninterrupted flow is facilitated by futures. They kickstart the query process (for instance, collection.find), momentarily step back, and then resume command by invoking scala.concurrent.Await.
Await.result(
mongoCollectionFile.find.toFuture(),
Duration.Inf
)
mongoCollectionFile.find.toFuture(),
Duration.Inf
)
The diagram below illustrates the interaction and control of execution during a query to MongoDB collection, using Scala future.
Illustration of non-blocking query to MongoDB
The getCollectionFiles method relinquishes execution resources while waiting for safe completion of the query.
def getCollectionFile(collectionName: String): Option[MongoCollection[T]] =
mongoDatabase.map(_.getCollection[MongoFile](collectionName))
def getCollectionFiles(collectionName: String): Option[Seq[T]] = try {
getCollectionFile(collectionName).map(
mongoCollectionFile =>
// Asynchronously wait for the MongoDB response
Await.result( mongoCollectionFile.find.toFuture(), Duration.Inf )
)
}
catch {
case e: Exception =>
logger.error(e.getMessage)
None
}
Retrieving collections
Let's look as the implementation of two methods to retrieve all the documents associated to a MongoDB collection, collectionName.
The first method, getBSONDocuments, retrieves the documents from a collection, collectionName.
The second method, getTextDocuments, retrieves the documents and converts them into readable text by decoding the BSON type.
def getBSONDocuments(collectionName: String): Seq[Document] =
mongoDBClient.map {
case (_, db) =>
val collection = db.getCollection(collectionName)
Await.result(collection.find.toFuture(), Duration.Inf)
}.getOrElse({
logger.warn(s"Mongo collection ${collectionName} not found")
Seq.empty[Document]
})
def getTextDocuments(collectionName: String): Seq[String] = {
val documents = getBSONDocuments(collectionName)
if(documents.nonEmpty) {
// [1] Accumulate text representation of MongoDB collections
documents.foldLeft(ListBuffer[String]())(
(accumulator, document) => {
val fieldIterator = document.iterator
val strBuf = new StringBuilder()
// [2] iterates across the fields in the document...
while (fieldIterator.hasNext) {
val (key, value): (String, bson.BsonValue) = fieldIterator.next()
// [3] Uses the type-key to decode the value into appropriate type....
val output = value.getBsonType match {
case BsonType.STRING => s"${key}:${value.asString().getValue}"
case BsonType.DOUBLE => s"${key}:${value.asDouble().getValue}"
case BsonType.ARRAY => s"${key}:${value.asArray()}"
case BsonType.DOCUMENT => s"${key}:${value.asDocument().toString}"
case BsonType.INT32 => s"${key}:${value.asInt32().getValue}"
case BsonType.INT64 => s"${key}:${value.asInt64().getValue}"
case BsonType.BOOLEAN => s"${key}:${value.asBoolean().getValue}"
case BsonType.BINARY => s"${key}:${value.asBinary()}"
case BsonType.TIMESTAMP => s"${key}:${value.asTimestamp().getValue}"
case BsonType.DATE_TIME => s"${key}:${value.asDateTime().getValue}"
case BsonType.REGULAR_EXPRESSION => s"${key}:${value.asRegularExpression().toString}"
case _ => ""
}
strBuf.append(output).append(", ")
}
accumulator += strBuf.toString()
}
)
}
else {
logger.warn(s"Mongo collection $collectionName is empty")
Seq.empty[String]
}
}
The method getTextDocuments iterates through the BSON formatted documents retrieved from the Mongo collection [1]. Each BSON document is a list of fields with the format (key:value).
The method then retrieves the iterator, fieldIterator, from each BSON document [2]. The key of each field is used to extract, match the type of field and convert its value to a string [3] (i.e., a string representation Boolean:true for a boolean value with key BsonType.BOOLEAN).
Thank you for reading this article. For more information ...
References
[1] Learn MongoDB in 1 hour - Bro Code
[2] Install MongoDB
[3] MongoDB Java driver
[4] MongoDB Scala driver
[5] MongoDB Connector for Spark
[6] Reactive Mongo documentation
[7] MongoDB Reactive Streams Java Driver
[2] Install MongoDB
[3] MongoDB Java driver
[4] MongoDB Scala driver
[5] MongoDB Connector for Spark
[6] Reactive Mongo documentation
[7] MongoDB Reactive Streams Java Driver
---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning.
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3
Labels:
Akka future,
MongDB,
NoSQL,
Query,
Scala
Subscribe to:
Posts (Atom)