Friday, April 8, 2022

Discounted Cumulative Gain in Scala

Target audience: Advanced
Estimated reading time: 5'

Numerous real-life applications of machine learning require the prediction the most relevant ranking of items to optimize an outcome. This article illustrates the Normalized Discounted Cumulative Gain (NDCG) and its implementation in Scala.




What you will learn:  How to implement the Discounted Cumulative Gain to assess the effectiveness of a search algorithm or recommendation system


Notes:
  • Environments: Scala 2.12.11
  • To enhance the readability of the algorithm implementations, we have omitted non-essential code elements like error checking, comments, exceptions, validation of class and method arguments, scoping qualifiers, and import statements.

Discounted Cumulative Gain

Discounted Cumulative Gain (DCG) and its normalized counter part, Normalized Discounted Cumulative Gain (NDCG)  is a measure used in information retrieval and ranking system evaluation to assess the effectiveness of a search algorithm or recommendation system. DCG is particularly useful for situations where the items are ranked in order of relevance or importance and the value of each item diminishes as one goes down the list of search results.

Here are examples of real-life applications that benefit from NDCG:
  • Evaluate and prioritize counter-measures to cyberattack.
  • Ranks symptoms in a clinical trial.
  • Extract documents relevant to a given topic from a corpus.
Let's dive into the mathematical formalism for the Discounted Cumulative Gain. 


For an indexed relevance tj as illustrated in the diagram above, the discounted cumulative gain at position n is computed as \[DGC_{j}=\frac{2^{t_{j}}-1}{log_{2}(j+1)} \ (1)\ \ \ \ DGC=\sum_{j=1}^{n} DGC_{j}\ (2) \ \ \]
The objective is to compare any given list of ranked/sorted item with a benchmark which represent the optimum ranking (ranking with the highest DCG value).  \[NDGC=p(ranking | IDGC) = log(2) \frac{DGC}{IDGC}\ \ (3) \] Each item in the ranked list is assigned a relevance score, typically a positive real number, with higher scores indicating more relevant items. The relevance score of each item is discounted logarithmically based on its position in the list. This discounting reflects the notion that users are less likely to consider items as they move down the list, so items appearing earlier have more impact.


Scala implementation

Computation

The implementation of the computation of NDCG in Scala is quite simple, indeed. Given a ranked list of items. The three steps are

  1. Compute the IDCG (or normalization factor)from the list
  2. Compute the DCG for the list
  3. Compute the NDCG = log(2).DCG/IDCF
First let's consider list of items, of type T to rank. The method ranking to sort a sample of sorted items is provided as an implicit function. The constructor for NDCG has a single argument: the sample of ranking: 

class NDCG[T](initialSample: Seq[T])(implicit ranking: T => Int) {
   import NDCG._

   val iDCG: Double = normalize

   def ndgc: Double = score(initialSample)

    // Implement Formula (3)
   def ndgc(samples: Seq[T]): Double =
       if( samples.size != initialSample.size) 0.0 
       else Math.log(2)*dcg(samples)/iDCG

   private def normalize: Double = {                                // 1
      val sorted = initialSample.zipWithIndex.sortBy{       // 2
         case (sample, n) =>  -ranking(sample)
       }.map( _._1)

       dcg(sorted)
   }

    // Implement Formula (2)
   private def dcg(samples: Seq[T]): Double =                // 3
      samples.zipWithIndex.aggregate(0.0)(                     // 4
        (_dcg, samplej) => _dcg+ dgci(samplej._2+1, ranking(samplej._1))  // 5
          , _ + _)
}


The Ideal Discounted Cumulative Gain, iDCG is computed through the normalize method (# 1). iDCG (normalization factor) is computed by first sorting the items of type T by their value in decreasing order (# 2), then scoring this re-sorted list using the dcg method (# 3). 
The computation of the Discounted Cumulative Gain by the method dici (# 5) is a direct application of the formula (1) described in the previous chapter. The  cumulative gain in formula (2) is computed using the Scala aggregate method (# 4).

As mentioned earlier, the method dgci implements the formula (1). The logarithm function uses a base 2. It is computed as natural log(x)/natural log.


    // Implement Formula (1)
def dgci(item: T, rank: Int): Double =     Math.pow(2, item) - 1.0) / Math.log(rank + 1)


Note The DCG of subsequent samples can be computed using the same iDCG value from the same instance of NDCG.

Evaluation

Let's now consider a list of items of type Item defined as follows: 

case class Item(id: String, x: Double, rank: Int)


The list of items, itemsList is implicitly ranked through the attribute, rank using a Scala implicit

val itemsList = Seq[Item](
  Item("1", 3.4, 4), 
  Item("2", 1.4, 3),
  Item("3", -0.7, 5), 
  Item("4", 5.2, 2), 
  Item("5", 1.4, 1)
)

implicit val ranking = (item: Item) => item.rank

Finally, let's compute the NDCG coefficient for the list of items, by invoking the score method.

val nDCG = new NDCG[Item](itemsList)

println(s"IDCG = ${nDCG.iDCG}")    //45.64
println(s"Score = ${nDCG.score}")  // 0.801

The ideal discounted cumulative gain, iDCG is 45.6: It is the optimum ranking for this list of time. The first sample score a probability of 0.8 


Thank you for reading this article. For more information ...

References


---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3

Implement Non-blocking MongoDB Client in Scala

Target audience: Intermediate
Estimated reading time: 4'

MongoDB, a renowned NoSQL ('non-relational') database, stands out as an open-source, document-centric storage system. Its inherent adaptability and capability for horizontal expansion make it especially adept for housing voluminous data science datasets, outperforming traditional RDBMS.
This piece outlines the configuration and realization of a non-blocking Scala client for MongoDB.


      MongoDB libraries     


Important notes
  • This post assumes that the reader is familiar with MongoDB architecture and key components. The formal documentation regarding MongoDB database, client libraries and connectors is available at MongoDB docs. YouTube.com offers few videos on introducing MongoDB [ref 1].
  • Implementation relies on Scala 2.12.15

Setup 

Installation

The installation procedure for the community version of MongoDB depends on the environment and operating system [ref 2]. Here is an overview:

MacOS
  • brew tap mongodb/brew
  • brew install mongodb-community@7.0
with the default paths for the MongoDB system files:
  • Configuration file $BIN_DIRECTORY/etc/mongod.conf
  • Log directory  $BIN_DIRECTORY/mongodb
  • Data directory  $BIN_DIRECTORY/var/mongodb
BIN_DIRECTORY = /usr/local for Intel
                                 /opt/homebrew for Apple M1/M2

Linux
  • sudo yum install -y mongodb-org
  • sudo yum install -y mongodb-org-7.0 mongodb-org-database-7.0 mongodb-org-server-7.0 mongodb-mongosh-7.0 mongodb-org-mongos-7.0 mongodb-org-tools-7.0
with the default paths for the MongoDB system files:
  • Configuration file /etc/mongod.conf
  • Log directory  /var/log/mongodb
  • Data directory /var/lib/mongo

Docker
  • docker pull mongodb/mongodb-community-server
  • docker run --name mongo -d mongodb/mongodb-community-server:latest

Binary components
  • The server mongod
  • The sharded cluster query router mongos
  • The MongoDB Shell, mongosh

Configuration
The default paths for the MongoDB system files:
  • Configuration file $BIN_DIRECTORY/mongod.conf
  • Log directory  $BIN_DIRECTORY/mongodb
  • Data directory  $BIN_DIRECTORY/var/mongodb

Note: MongoDB Atlas is a hosted MongoDB service option in the cloud which requires no installation overhead and offers a free tier to get started.


MongoDB libraries

  • MongoDB Java driver (mongodb-driver-sync) for synchronous access to MongoDB collection using a Java client [ref 3].
  • MongoDB 2.12 Scala driver (mongodb-scala-driver) relies on Scala futures for unblocked access to collections [ref 4].
  • Mongo Scala BSON Library (mongo-scala-bson_2.12) is a Scala wrapper / extension to the bson library.
  • Spark connector for MongoDB (mongo-spark-connector_2.12provides integration between MongoDB and Apache Spark including Spark structured streaming [ref 5].
  • Reactive Mongo for Scala (reactivemonogo): With a classic synchronous database driver, each operation blocks the current thread until a response is received. This library is a Scala driver that provides fully non-blocking and asynchronous I/O operations [ref 6].
  • Reactive stream for MongoDB (mongodb-driver-reactivestreamsprovides asynchronous stream processing with non-blocking back pressure for MongoDB [ref 7].

  # MongoDB Java driver for synchronous operations
<dependency>
   <groupId>org.mongodb</groupId>
   <artifactId>mongodb-driver-sync</artifactId>
   <version>4.10.0</version>
</dependency>

    # MongoDB driver/client API for Scala 2.12
<dependency>
   <groupId>org.mongodb.scala</groupId>
   <artifactId>mongo-scala-driver_2.12</artifactId>
   <version>4.10.0</version>
</dependency>

    # Scala BSON formatter (read/write) for MongoDB
<dependency>
   <groupId>org.mongodb.scala</groupId>
   <artifactId>mongo-scala-bson_2.12</artifactId>
   <version>4.10.0</version>
</dependency>

     # MongoDB connector for Apache Spark running Scala 2.12
<dependency>
   <groupId>org.mongodb.spark</groupId>
   <artifactId>mongo-spark-connector_2.12</artifactId>
   <version>10.1.1</version>
</dependency>

   # Non blocking query/insert ... to MongoDB collections
<dependency>
   <groupId>org.reactivemongo</groupId>
   <artifactId>reactivemongo_2.12</artifactId>
   <version>1.1.0-noshaded-RC10</version>
</dependency>

  # Asynchronous streams processing with non-blocking back pressure
<dependency>
   <groupId>org.mongodb</groupId>
   <artifactId>mongodb-driver-reactivestreams</artifactId>
   <version>4.10.0</version>
</dependency>

Note: Not all the libraries are required to run the following use case


MongoDB client

Mongo file descriptor

MongoDB stores objects using BSON scheme. Binary data is stored as a binary byte array which limited to 16 Mbytes. MongoDB defines a files system that allowed to get around this limitation.
Therefore, we define a descriptor, MongoFile, as a Scala trait to specify the minimum set of attributes required to identify its type.

trait MongoFile {
   val fileId: UUID
   val source: String
   val path: String
   val fileType: String
   val createdAt: ZonedDateTime
   val contentLength: Long
}

Parameterized instantiation

The MongoDB client, MongoDBClient is parameterized with type implementing MongoFile trait descriptor. This design allows future client implementation to extend the descriptor with contextual information, if needed.

The client is fully defined by the arguments of its constructor:
  • host: Local or remote host for MongoDB server
  • port: Port the daemon is listening
  • dbName: Name of the database
  • isFromFile: Boolean flag that specifies if the document resides in files (true) or database collections (false)

final class MongoDBClient[T <: MongoFile] private (
   host: String, 
   port: Int, 
   dbName: String, 
   isFromFile: Boolean) {

  private lazy val mongoDBClient = try {
    val connectionString = s"mongodb://${host}:${port.toString}"

    // [1]: Full definition of Mongo client with connection and server API settings
    val serverApi = ServerApi.builder.version(ServerApiVersion.V1).build()

    val settings = MongoClientSettings
         .builder()
         .applyConnectionString(ConnectionString(connectionString))  
         .serverApi(serverApi)
         .build()
    val _mongoClient = MongoClient(settings)
   
    // [2]: Instantiation of mongo database using custom CODEC
    val _mongoDatabase =
      if(isFromFile) {
        val customCodecs: CodecRegistry = fromProviders(classOf[T])
        val codecRegistry = fromRegistries(
          customCodecs,
          DEFAULT_CODEC_REGISTRY  // [3]: Default CODEC
        )
        _mongoClient.getDatabase(dbName).withCodecRegistry(codecRegistry)
      }
      else
        _mongoClient.getDatabase(dbName)

    Some((_mongoClient, _mongoDatabase))
  }
  catch {
    case e: MongoClientException =>
      logger.error(e.getMessage)
      None
  }

The construct initially the configuration settings with server API and the connection string to instantiate the MongoDB client, _mongoClient [1].
The instantiation of the database, _mongoDatabase from encoded files relies on the file format, T <: MongoFile [2]. This implementation used the default CODEC registry which is appropriate for most of use cases [3].

The tuple, (Mongo client, Mongo database) is wrapped into an option which convert a exceptions into None.

Non-blocking operations

MongoDB is often the go-to choice for housing and querying large data sets, especially when interactions occur at less frequent intervals. Under these conditions, it's imperative that client operations continue seamlessly without being paused, pending the finalization of data insertions or query processes.

In the Scala landscape, this uninterrupted flow is facilitated by futures. They kickstart the query process (for instance, collection.find), momentarily step back, and then resume command by invoking scala.concurrent.Await.

    Await.result(
       mongoCollectionFile.find.toFuture(),
       Duration.Inf
    )

The diagram below illustrates the interaction and control of execution during a query to MongoDB collection, using Scala future.

Illustration of non-blocking query to MongoDB


The getCollectionFiles method relinquishes execution resources while waiting for safe completion of the query.

def getCollectionFile(collectionName: String): Option[MongoCollection[T]] =
  mongoDatabase.map(_.getCollection[MongoFile](collectionName))


def getCollectionFiles(collectionName: String): Option[Seq[T]] = try {

  getCollectionFile(collectionName).map(
     mongoCollectionFile => 
       // Asynchronously wait for the MongoDB response
       Await.result( mongoCollectionFile.find.toFuture(), Duration.Inf )
  )
}
catch {
  case e: Exception =>
    logger.error(e.getMessage)
    None
}


Retrieving collections

Let's look as the implementation of two methods to retrieve all the documents associated to a MongoDB collection, collectionName.

The first method, getBSONDocuments, retrieves the documents from a collection, collectionName.
The second method, getTextDocuments, retrieves the documents and converts them into readable text by decoding the BSON type.

def getBSONDocuments(collectionName: String): Seq[Document] =

  mongoDBClient.map {
    case  (_, db) => 
       val collection = db.getCollection(collectionName)
       Await.result(collection.find.toFuture(),  Duration.Inf)
  }.getOrElse({
     logger.warn(s"Mongo collection ${collectionName} not found")
     Seq.empty[Document]
  })



def getTextDocuments(collectionName: String): Seq[String] = {
  val documents = getBSONDocuments(collectionName)
    
  if(documents.nonEmpty) {

    //  [1] Accumulate text representation of MongoDB collections
    documents.foldLeft(ListBuffer[String]())(
      (accumulator, document) => {
        val fieldIterator = document.iterator
        val strBuf = new StringBuilder()

         // [2] iterates across the fields in the document...
        while (fieldIterator.hasNext) {
          val (key, value): (String, bson.BsonValue) = fieldIterator.next()

          // [3] Uses the type-key to decode the value into appropriate type....
          val output = value.getBsonType match {
             case BsonType.STRING => s"${key}:${value.asString().getValue}"
             case BsonType.DOUBLE => s"${key}:${value.asDouble().getValue}"
             case BsonType.ARRAY => s"${key}:${value.asArray()}"
             case BsonType.DOCUMENT => s"${key}:${value.asDocument().toString}"
             case BsonType.INT32 =>  s"${key}:${value.asInt32().getValue}"
             case BsonType.INT64 =>  s"${key}:${value.asInt64().getValue}"
             case BsonType.BOOLEAN => s"${key}:${value.asBoolean().getValue}"
             case BsonType.BINARY => s"${key}:${value.asBinary()}"
             case BsonType.TIMESTAMP => s"${key}:${value.asTimestamp().getValue}"
             case BsonType.DATE_TIME => s"${key}:${value.asDateTime().getValue}"
             case BsonType.REGULAR_EXPRESSION => s"${key}:${value.asRegularExpression().toString}"
             case _ => ""
           }
           strBuf.append(output).append(", ")
         }
         accumulator += strBuf.toString()
       }
     )
  }
  else {
    logger.warn(s"Mongo collection $collectionName is empty")
    Seq.empty[String]
  }
}

The method getTextDocuments iterates through the BSON formatted documents retrieved from the Mongo collection [1]. Each BSON document is a list of fields with the format (key:value).

The method then retrieves the iterator, fieldIterator, from each BSON document [2]. The key of each field is used to extract, match the type of field and convert its value to a string [3] (i.e., a string representation Boolean:true for a boolean value with key BsonType.BOOLEAN).

Thank you for reading this article. For more information ...


References



---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3