Scala, MongoDB, and Cats-Effect

MongoDB is an open-source database that uses a document-oriented data model and a non-structured query language. It is one of the most powerful NoSQL databases around today. In comparison to traditional SQL databases, MongoDB does not use the usual rows and columns to model its data; instead, it uses a BSON (Binary JSON) format to save the data (documents) in collections, where the basic unit of data consists of a set of key-value pairs.

For Scala, there are several MongoDB clients available, the most popular of which are the official MongoDB Scala Driver and ReactiveMongo.

However, in this blog post, I am going to introduce a more recent MongoDB client — mongo4cats, which represents a wrapper around a native MongoDB Java client compatible with Cats-Effect (3.x) and FS2 (3.x) libraries. By providing integration with Cats-Effect, MongoDB can now support fully non-blocking and asynchronous I/O operations executed in a purely functional way.

About Cats-Effect

Cats-Effect is a high-performance, asynchronous, composable framework for building real-world applications in a purely functional style. As a library, Cats Effect provides an IO monad that can be used for capturing, controlling, and composing effects (such as making a connection to a database or executing a query), and allows performing them within a resource-safe, typed context. More on it here.

Dependencies

To start, we will need to add the required dependencies to our build.sbt file:

Scala
 
libraryDependencies ++= Seq(
  "io.github.kirill5k" %% "mongo4cats-core"  % "0.3.0",
  "io.github.kirill5k" %% "mongo4cats-circe" % "0.3.0"
)


mongo4cats-core brings the core functionality needed to make connections to the database and for executing queries, whereas mongo4cats-circe adds an additional syntax for decoding our entities with Circe codecs (more on it later).

Connecting to The Database

To connect to our database, we will need to create an instance of MongoClient[F] first. The MongoClient[F] represents a pool of connections for a given MongoDB server deployment and typically only one instance of this class is required per application (even with multiple operations executed concurrently). The easiest way of creating a client is by calling a fromConnectionString method:

Scala
 
import mongo4cats.client._

val mongoClient: Resource[IO, MongoClient[IO]] = 
	MongoClientF.fromConnectionString[IO]("mongodb://localhost:27017")


When creating a client using any of MongoClient constructor methods, we get a Resource[IO, MongoClient[IO]]which ensures that the connection is closed after its use.

Once we have our client, we can then start using it for making connections to our database:

Scala
 
mongoClient.use { client =>
  for {
    db <- client.getDatabase("testdb")
  } yield ()
}


Data Modeling

Internally, MongoDB stores all of its data in a BSON format, which is a close cousin of a traditional JSON that we all got used to. Similarities between these two formats allow us to derive MongoDB codecs with tools that are normally used for doing transformations of case classes into a plain JSON in Scala. One of such tools is Circe.

To begin with, let’s create a simple case class to model data in our collection:

Scala
 
import mongo4cats.bson.ObjectId

sealed trait PaymentMethod

final case class Paypal(email: String) extends PaymentMethod

final case class CreditCard(
  name: String, 
  number: String, 
  expiry: String, 
  cvv: Int
) extends PaymentMethod

final case class Payment(
  id: ObjectId,
  amount: BigDecimal,
  method: PaymentMethod,
  date: Instant
)


Once our data model is defined, we can get our collection from the database:

Scala
 
import io.circe.generic.auto._
import mongo4cats.circe._

for {
  ...
  db <- client.getDatabase("testdb")
  coll <- db.getCollectionWithCodec[Payment]("payments")
} yield ()


Calling MongoDatabase[F] methodgetCollectionWithCodec[T] requires to have an instance of MongoCodecProvider[T] available in the implicit scope, which will then be used by the collection internally for obtaining codecs for encoding and decoding our entities into BSON documents. Luckily, since we have this import included:

import mongo4cats.circe._


The codec provider will be derived automatically with the help of Encoder[T] and Decoder[T] instances brought in by including Circe’s automatic derivation import:

import io.circe.generic.auto._


Now we have everything ready to start working with the data!

Inserting Documents

To insert an object into the database, simply call insertOne to insert a single document or insertMany to insert a sequence of documents:

Scala
 
val creditCard = CreditCard("John Bloggs", "1111222233334444", "1021", 919)
val paypal = Paypal("john.bloggs@test.com")
val payment1 = Payment(ObjectId(), BigDecimal(2.5), paypal, Instant.parse("2021-04-05T12:00:00Z"))
val payment2 = Payment(ObjectId(), BigDecimal(9.99), creditCard, Instant.parse("2021-04-12T12:00:00Z"))

for {
  ...
  _ <- coll.insertOne(payment1)
  _ <- coll.insertMany(List(payment2))
} yield ()


Querying Documents

The most straightforward way to query a collection would be to use its find method:

Scala
 
import mongo4cats.collection.operations.Filter

val afterDateFilter = Filter.gte("date", Instant.parse("2021-04-01T00:00:00Z"))
val beforeDateFilter = Filter.lt("date", Instant.parse("2021-05-01T00:00:00Z"))
  
for {
  ...
  payments <- coll.find
    .filter(afterDateFilter && beforeDateFilter)
    .sortByDesc("amount")
    .limit(5)
    .all
} yield ()


As can be noted from the example, calling find returns a query builder, using which you have an option of adding additional filters, sorts, projections and limits. The query can be executed by either calling:

When building a filter using the Filter class from mongo4cats.collection.operations package, there is a vast variety of additional query predicates and projections available for removing documents that you know are not relevant to incoming queries. Moreover, these filters can be chained together using a logical operator && (and) and || (or).

Updating Documents

Now, let’s see how we can find a document and update some of its fields.

Scala
 
import mongo4cats.collection.operations.Update

val payment1Filter = Filter.idEq(payment1.id)

val amountUpdate = Update
  .set("amount", BigDecimal(5.0))
  .currentTimestamp("updatedAt")

for {
  ...
  found <- coll.findOneAndUpdate(payment1Filter, amountUpdate)
} yield ()


In this example, a new operation type was introduced  — Update , which allows us to build and sequence multiple update operations that then will be executed on documents that match with the provided filter. Here we find a document with the id matching of the id of payment payment1, update its price, add a new currentTimestamp field and return the original document. Alternatively, there are also methods for just updating one or multiple documents: updateOne and updateMany, respectively. Both of these methods have an API similar to findOneAndUpdate.

Similarly, there are also methods of replacing or deleting documents: findOneAndReplace and findOneAndDelete, respectively.

Streaming

As was mentioned above, when building a find query, there are options for returning a result in a form of a single element, sequence of elements, or in the form of FS2 Stream. FS2 is a library for purely functional, effectful, and polymorphic stream processing. FS2 is built upon the Cats and Cats Effect, while its core types (streams and pulls) are polymorphic in the effect type (as long as it is compatible with cats-effect type classes), and thus can be used with other effect libraries.

Scala
 
for {
  ...
  total <- coll.find
    .stream
    .map(_.amount)
    .fold(BigDecimal(0))(_ + _)
    .compile
    .last
} yield ()


In the presented example we are streaming all documents from our collection and calculate the total amount paid.

It needs to be noted, that calling stream creates an unbounded stream that will pull as many documents as there are until the collection gets empty or until the memory gets full. If there are a lot of documents in the collection, then creating a stream like this might lead to potential out-of-memory failures. To avoid this, there is an alternative method available — boundedStream(capacity) , that will distribute always at max capacicy elements.

Conclusion

In this blog post, I tried to highlight the most common cases when working with MongoDB and show how easy it is to use with the effect system like Cats Effect through mongo4cats. The examples presented here only cover the most basic and common parts, for more advanced usage patterns you can refer to the GitHub or official MongoDB documentation.

 

 

 

 

Top