Building Microservices With Akka HTTP: A CDC Approach

Building microservices isn’t easy, especially when they become more and more, many teams working in different microservices which interact each other and change fast.

Documentation, team interaction, and testing are ways to succeed, but if done in the wrong way, they create more complexity than advantages.

We can use tools like Swagger (for the documentation), Docker (for the testing environment), Selenium (for end-to-end testing), and so on, but we still end up in losing a lot of time changing the APIs because they are not good for who is going to use them, or setting up the right environment to execute the integration tests, maybe with production data (hopefully anonymized) which may take ages to complete.

There isn’t a right answer to all these problems, but I think that there is a thing which can help a lot: to see first from the user perspective!

What does it mean? Well, in the case of web application development, start from the mockups and flows definition, going deeper, in case of software development, use a TDD (Test-driven development) approach: write the test first, think about what we really want, and how we want to use it; but what about microservices? In this case, start from the consumer! What does the consumer want from the other service and how does it want to interact?

I’m talking about Consumer Driven Contracts (CDC) testing. With this methodology, the consumer must define a formal contract which describe its expectation from the provider, and the provider must respect this contract if it wants to provide the service.

The Business Case

We want to implement a new feature on “My Library,” so we need to introduce the Categories and we want to know how many of them we have. The idea is to split the logic in two services, one (Producer) which provides the list of all categories, and the other one (Consumer) which counts them.

Image title

Extremely easy, but good enough to create a good base structure and CDC understanding.

The Technology Stack

For this article I have chosen Scala as the language and Akka HTTP as the framework. I think it is a really good technology, which matches all base requirements necessary to build microservices:

On the data side, I have chosen Slick as the library to abstract the database interaction and FlyWay as the DB migration framework. They are both robust and stable, used many times with no issue.

Last, but definitely not least, the testing support! I love Scala Test and it is always part of my projects in Scala, but what about our CDC?

For CDC, there is a really nice framework, available for many platforms: Pact.

With Pact, we can define our consumer contracts and verify them against the Provider and the Consumer. I suggest spending a couple of minutes to read the homepage of official Pact website, which explains quite well the ideas behind it.

As I said, Pact is available for many platforms, and in our case, with both Consumer and Producer written in Scala, we can use only one implementation: Scala-Pact.

The Implementation

For the sake of simplicity, I have created a single sbt project with both consumer and producer, but they can easily be split and used as a template. You can find the source code for this at https://github.com/mariniss/mylibrary-contracts.

Let’s start our microservices implementation in CDC style! First, we have to define our project. We can easily do that creating a new Scala project using SBT and defining the build.sbt as following:

build.sbt

name := "myLibrary-contracts"

version := "0.1"

scalaVersion := "2.12.4"

enablePlugins(ScalaPactPlugin)


libraryDependencies ++= Seq(

 //Common dependencies
 "com.typesafe.akka"  %% "akka-stream"             % "2.4.20",
 "com.typesafe.akka"  %% "akka-http"               % "10.0.11",
 "com.typesafe.akka"  %% "akka-http-spray-json"    % "10.0.11",
 "org.slf4j"           % "slf4j-simple"            % "1.7.25",
 "org.scalatest"      %% "scalatest"               % "3.0.1"   % "test",
 "org.scalamock"      %% "scalamock"               % "4.0.0"   % "test",
 "com.typesafe.akka"  %% "akka-stream-testkit"     % "2.4.20"  % "test",
 "com.typesafe.akka"  %% "akka-testkit"            % "2.4.20"  % "test",
 "com.typesafe.akka"  %% "akka-http-testkit"       % "10.0.11" % "test",
 "com.itv"            %% "scalapact-argonaut-6-2"  % "2.2.0"   % "test",
 "com.itv"            %% "scalapact-scalatest"     % "2.2.0"   % "test",
 "com.itv"            %% "scalapact-http4s-0-16-2" % "2.2.0"   % "test",

 //Producer dependencies
 "com.typesafe.slick" %% "slick"                   % "3.2.1",
 "com.typesafe.slick" %% "slick-hikaricp"          % "3.2.1",
 "com.h2database"      % "h2"                      % "1.4.196",
 "org.flywaydb"        % "flyway-core"             % "5.0.7"
)

testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest,
 "-y", "org.scalatest.WordSpec",
 "-y", "org.scalatest.FunSpec")

parallelExecution in Test := false


As you can see, there are the standard dependencies for an Akka HTTP project (common for both provider and consumer), spry-json for JSON serialization and deserialization, SL4J for logging, scalatest and scalamock as the testing and mocking frameworks, and Scala Pact for the CDC testing.

The producer specific dependencies are only for the Database support and, as you can see, I’m using H2 (in memory DB), but you can easily replace it with other DBs support.

There is also a specific configuration for the testing environment; the parallel execution is disabled just because we have both producer and client in the same project, so we may have an issue during the Pact file generation and usage if done in parallel (we’ll see it later). Also, I have implemented the tests in two different formats, WordSpec and FunSpec, the first for all unit tests and the second for the Pact tests, but feel free to use whatever you like.

Consumer Implemenation

Now we have the base project structure, we can start create the Pact test on consumer side, so we can define what we expect from the provider given a specific scenario/state.

MyLibraryClientPactSpec.scala

package com.fm.mylibrary.consumer.pact

import com.fm.mylibrary.consumer.MyLibraryClient
import com.fm.mylibrary.model.Category
import com.fm.mylibrary.model.JsonProtocol._
import com.itv.scalapact.ScalaPactForger._
import org.scalatest.{FunSpec, Matchers}
import spray.json._


class MyLibraryClientPactSpec extends FunSpec with Matchers {

 describe("Connecting to the MyLibrary server") {

   it("should be able to fetch the categories"){

     val categories = List(Category("Java"), Category("DevOps"))

     forgePact
       .between("ScalaConsumer")
       .and("myLibraryServer")
       .addInteraction(
         interaction
           .description("Fetching categories")
           .given("Categories: [Java, DevOps]")
           .uponReceiving(
             method = GET,
             path = "/search/category",
             query = None)
           .willRespondWith(
             status = 200,
             headers = Map("Content-Type" -> "application/json"),
             body = categories.toJson.toString())
       )
       .runConsumerTest { mockConfig =>
         val results = new MyLibraryClient().fetchCategories()

         results.isDefined shouldEqual true
         results.get.size shouldEqual 2
         results.get.forall(c => categories.contains(c)) shouldEqual true
       }

   }
 }
}



Scala-pact is quite easy to use, thanks to the object ScalaPactForger it is possible to build the contract definition and expectation with few lines of code, more in details:

Of course, we can add more scenarios and interactions. We can also define more pacts for many producers. I suggest identifying the base scenarios and interaction necessary to describe normal usage, with the “happy paths” and the standard errors scenarios, but leave to the unit testing all the detailed tests with the various corner cases related to their implementation.

Now, you can try to compile and execute the test, but you'll get an error because we do not have the client nor the model yet, so let’s add the base logic to let the test passed.

I think we can proceed in two ways, go straight and build the client (because we already have a test) or improve the definition of our client creating a unit test and working on it in pure TDD style. Let’s follow the second option:


MyLibraryClientSpec.scala

package com.fm.mylibrary.consumer

import akka.http.scaladsl.model._
import com.fm.mylibrary.model.Category

import scala.concurrent.Future


class MyLibraryClientSpec extends BaseTestAppClient {

 implicit val myLibraryServerUrl:String = "//test"

 "Fetch categories" must {
   "execute the HTTP request to get all categories and returns them" in {
     val request = HttpRequest(HttpMethods.GET, "//test/search/category")
     val responseEntity = HttpEntity(bytes = """[{"name": "Java"}, {"name": "DevOps"}]""".getBytes,
                                     contentType = ContentTypes.`application/json`)
     val response = HttpResponse(status = StatusCodes.OK, entity = responseEntity)
     requestExecutor.expects(request).returning(Future.successful(response))

     val results = new MyLibraryClient().fetchCategories()

     results.isDefined shouldEqual true
     results.get.size shouldEqual 2
     results.get.contains(Category("Java")) shouldEqual true
     results.get.contains(Category("DevOps")) shouldEqual true
   }
 }
}

Quite a standard test; we expect that MyLibraryClient exposes a function which returns a list of “Category” objects using an external function which takes an HttpRequest and returns an HttpResponse.

As you can see, there is no explicit providing of this external dependencies; that’s because I want to provide it as an “implicit” value. It’s a way which helps on creating testable code, but I strongly suggest not to use it a lot, because it makes the code difficult to read, especially for those new to Scala.

I also like to define a trait with all necessary dependencies to easily build test cases:

BaseTestAppClient.scala

package com.fm.mylibrary.consumer

import akka.actor.ActorSystem
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorMaterializer
import akka.testkit.{ImplicitSender, TestKit}
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

import scala.concurrent.{ExecutionContextExecutor, Future}


class BaseTestAppClient extends TestKit(ActorSystem("BaseTestAppClient"))
           with WordSpecLike
           with ImplicitSender
           with Matchers
           with BeforeAndAfterAll
           with MockFactory {

 implicit val actorSystem: ActorSystem = system
 implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
 implicit val executionContext: ExecutionContextExecutor = system.dispatcher

 implicit val requestExecutor = mockFunction[HttpRequest, Future[HttpResponse]]


 override def afterAll {
   TestKit.shutdownActorSystem(system)
 }
}

It defines the actor system to use in our test and the function which executes the HTTP request.

Now we have the tests, let’s implement some logic:

MyClientLibrary.scala

package com.fm.mylibrary.consumer

import akka.actor.ActorSystem
import akka.http.scaladsl.client.RequestBuilding
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import com.fm.mylibrary.model.Category
import com.fm.mylibrary.model.JsonProtocol._

import scala.concurrent.{ExecutionContextExecutor, Future}


class MyLibraryClient(implicit val myLibraryServerUrl: String,
                      implicit val actorSystem: ActorSystem,
                      implicit val materializer: ActorMaterializer,
                      implicit val executionContext: ExecutionContextExecutor,
                      implicit val requestExecutor: HttpRequest => Future[HttpResponse]) extends BaseHttpClient {

 def fetchCategories(): Option[List[Category]] = executeSyncRequest(
   RequestBuilding.Get(s"$myLibraryServerUrl/search/category"),
   response =>
     if(response.status == StatusCodes.OK)
       Unmarshal(response.entity).to[Option[List[Category]]]
     else
       Future.successful(None)
 )
}


Category.scala

package com.fm.mylibrary.model

case class Category (name: String)


Relatively easy implementation. I have used implicit declaration dependencies, but feel free to make them explicit to improve the code readability.

I have create a trait which define the base components for every HTTP Client (which now is only one) and a function to perform the HTTPrequest in a synchronous way:

BaseHttpClient.scala

package com.fm.mylibrary.consumer

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorMaterializer

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
import scala.language.postfixOps

trait BaseHttpClient {

 implicit def actorSystem: ActorSystem
 implicit def materializer: ActorMaterializer
 implicit def executionContext: ExecutionContextExecutor

 implicit def requestExecutor: HttpRequest => Future[HttpResponse]

 val awaitTime: FiniteDuration = 5000 millis


 def executeSyncRequest[T](request: HttpRequest, responseHandler: HttpResponse => Future[T]): T = {
   val response: Future[T] = requestExecutor(request).flatMap({ response =>
     responseHandler(response)
   })

   Await.result(response, awaitTime)
 }
}

Now we are good to execute the unit test and if we have made no mistakes we should get a successful execution. Feel free to add more tests and refactor the client in order to adjust the structure at your pleasure (you can find a few more tests here).

We can also try to execute the Pact test ( MyLibraryClientPactSpec) but it is going to fail because it is supposed to execute a real HTTP call, scala-pact framework will start a real HTTP server which accepts and respondes to requests as described in the pact.

We have to complete our implementation, it’s basically the element which defines the actor system and the function to execute the HTTP calls:

MyLibraryAppClient.scala

package com.fm.mylibrary.consumer.app

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorMaterializer

import scala.concurrent.{ExecutionContextExecutor, Future}

object MyLibraryAppClient {

 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher

 implicit val requestExecutor: HttpRequest => Future[HttpResponse] = Http().singleRequest(_)
}

It is an object, so we can import it wherever we have to use our client, as you can see in the Pact test: import com.fm.mylibrary.consumer.app.MyLibraryAppClient._

Of course you can use another approach, but please be consistent in your choices and avoid to use different approaches/structures in the same or similar projects.

We should be finally ok to execute the pact test! If you are lucky, you should get an output like this:

> Adding interactions:
> - Interaction(None,Some(Categories: [Java, DevOps]),Fetching categories,InteractionRequest(Some(GET),Some(/search/category),None,None,None,None),InteractionResponse(Some(200),Some(Map(Content-Type -> application/json)),Some([{"name":"Java"},{"name":"DevOps"}]),None))
[ScalaTest-run-running-MyLibraryClientPactSpec] INFO org.http4s.blaze.channel.nio1.NIO1SocketServerGroup - Service bound to address /127.0.0.1:55653
> ScalaPact stub running at: http://localhost:55653
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.ServerChannelGroup - Connection to /127.0.0.1:55666 accepted at Tue Feb 13 11:43:08 GMT 2018.
[http4s-blaze-client-1] INFO org.http4s.client.PoolManager - Shutting down connection pool: allocated=1 idleQueue.size=1 waitQueue.size=0
[DEBUG] [02/13/2018 11:43:09.376] [ScalaTest-run-running-MyLibraryClientPactSpec] [EventStream(akka://default)] logger log1-Logging$DefaultLogger started
[DEBUG] [02/13/2018 11:43:09.377] [ScalaTest-run-running-MyLibraryClientPactSpec] [EventStream(akka://default)] Default Loggers started
[DEBUG] [02/13/2018 11:43:09.595] [ScalaTest-run-running-MyLibraryClientPactSpec] [AkkaSSLConfig(akka://default)] Initializing AkkaSSLConfig extension...
[DEBUG] [02/13/2018 11:43:09.598] [ScalaTest-run-running-MyLibraryClientPactSpec] [AkkaSSLConfig(akka://default)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@db2cd5
[DEBUG] [02/13/2018 11:43:09.834] [default-akka.actor.default-dispatcher-5] [default/Pool(shared->http://localhost:55653)] (Re-)starting host connection pool to localhost:55653
[DEBUG] [02/13/2018 11:43:10.123] [default-akka.actor.default-dispatcher-5] [default/Pool(shared->http://localhost:55653)] InputBuffer (max-open-requests = 32) now filled with 1 request after enqueuing GET /search/category Empty
[DEBUG] [02/13/2018 11:43:10.127] [default-akka.actor.default-dispatcher-2] [default/Pool(shared->http://localhost:55653)] [0] Unconnected -> Loaded(1)
[DEBUG] [02/13/2018 11:43:10.137] [default-akka.actor.default-dispatcher-2] [default/Pool(shared->http://localhost:55653)] [0] <unconnected> Establishing connection...
[DEBUG] [02/13/2018 11:43:10.167] [default-akka.actor.default-dispatcher-2] [default/Pool(shared->http://localhost:55653)] [0] <unconnected> pushing request to connection: GET /search/category Empty
[DEBUG] [02/13/2018 11:43:10.179] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/0] Resolving localhost before connecting
[DEBUG] [02/13/2018 11:43:10.200] [default-akka.actor.default-dispatcher-5] [akka://default/system/IO-DNS] Resolution request for localhost from Actor[akka://default/system/IO-TCP/selectors/$a/0#871918912]
[DEBUG] [02/13/2018 11:43:10.209] [default-akka.actor.default-dispatcher-5] [akka://default/system/IO-TCP/selectors/$a/0] Attempting connection to [localhost/127.0.0.1:55653]
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.ServerChannelGroup - Connection to /127.0.0.1:55669 accepted at Tue Feb 13 11:43:10 GMT 2018.
[DEBUG] [02/13/2018 11:43:10.212] [default-akka.actor.default-dispatcher-5] [akka://default/system/IO-TCP/selectors/$a/0] Connection established to [localhost:55653]
[DEBUG] [02/13/2018 11:43:10.291] [default-akka.actor.default-dispatcher-5] [default/Pool(shared->http://localhost:55653)] [0] </127.0.0.1:55669->localhost:55653> Received response: GET /search/category Empty -> 200 OK Strict(35 bytes)
[DEBUG] [02/13/2018 11:43:10.296] [default-akka.actor.default-dispatcher-8] [default/Pool(shared->http://localhost:55653)] [0] </127.0.0.1:55669->localhost:55653> Finished reading response entity for GET /search/category Empty -> 200 OK Strict(35 bytes)
[DEBUG] [02/13/2018 11:43:10.298] [default-akka.actor.default-dispatcher-5] [default/Pool(shared->http://localhost:55653)] [0] Loaded(1) -> Idle
[ScalaTest-run-running-MyLibraryClientPactSpec] INFO org.http4s.blaze.channel.ServerChannel - Closing NIO1 channel /127.0.0.1:55653 at Tue Feb 13 11:43:10 GMT 2018
[ScalaTest-run-running-MyLibraryClientPactSpec] INFO org.http4s.blaze.channel.nio1.NIO1SocketServerGroup - Closing NIO1SocketServerGroup
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-0
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-1
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-2
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-3
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-4
[DEBUG] [02/13/2018 11:43:10.355] [default-akka.actor.default-dispatcher-3] [default/Pool(shared->http://localhost:55653)] [0] </127.0.0.1:55669->localhost:55653> connection was closed by peer while no requests were in flight
[DEBUG] [02/13/2018 11:43:10.360] [default-akka.actor.default-dispatcher-3] [default/Pool(shared->http://localhost:55653)] [0] Idle -> Unconnected

Process finished with exit code 0


I have used IntelliJ IDEA CE to execute the tests, but you can use sbt directly with these commands:

This test verify the pact on consumer side and generates the contract/pacts which the provider must respect. You can find them in target/pacts, they are JSON files which follow the specific Pact structure. The one generated should be like this:

ScalaConsumer_myLibraryServer.json

{
 "provider" : {
   "name" : "myLibraryServer"
 },
 "consumer" : {
   "name" : "ScalaConsumer"
 },
 "interactions" : [
   {
     "request" : {
       "method" : "GET",
       "path" : "/search/category"
     },
     "description" : "Fetching categories",
     "response" : {
       "status" : 200,
       "headers" : {
         "Content-Type" : "application/json"
       },
       "body" : [
         {
           "name" : "Java"
         },
         {
           "name" : "DevOps"
         }
       ]
     },
     "providerState" : "Categories: [Java, DevOps]"
   }
 ]
}

As you can see, it is quite straightforward, the definition of two actors (provider and consumer) with the possible interaction.

Hopefully, so far so good. You can add more logic, more clients, more contracts, more services, etc. The project in the Git repo contains also a small service with the business logic, the challanging task of counting the categories. Here is the code:

CategoriesServiceSpec.scala

package com.fm.mylibrary.consumer.service

import com.fm.mylibrary.consumer.MyLibraryClient
import com.fm.mylibrary.model.Category
import org.scalamock.scalatest.MockFactory
import org.scalatest.{Matchers, WordSpec}

class CategoriesServiceSpec extends WordSpec with Matchers with MockFactory {

 private val mockMyLibraryClient = mock[MyLibraryClient]
 private val service = new CategoriesService(mockMyLibraryClient)

 "Count Categories" must {
   "return the number of all categories fetched form MyLibrary" in {
     val javaCategory = Category("Java")
     val devopsCategory = Category("DevOps")
     (mockMyLibraryClient.fetchCategories _).expects().returning(Some(List(javaCategory, devopsCategory)))

     val result = service.countCategories()

     result shouldBe 2
   }

   "return 0 in case of the fetch form MyLibrary fails" in {
     (mockMyLibraryClient.fetchCategories _).expects().returning(None)

     val result = service.countCategories()

     result shouldBe 0
   }
 }
}


CategoriesService.scala

package com.fm.mylibrary.consumer.service

import com.fm.mylibrary.consumer.MyLibraryClient

class CategoriesService(val myLibraryClient: MyLibraryClient) extends {

 def countCategories(): Int = myLibraryClient.fetchCategories() match {
   case None => 0
   case Some(categories) =>
     categories.size
 }
}

I haven’t used any dependencies injection framework because I believe that, if a microservice needs a DI framework it is because it is becoming really big and complex, but if you don’t think like me, feel free to use it. I have used Google Guice in past and it seems quite good.

Producer Implementation

Once we have defined our consumer with its contract we can move to the producer and implement it using the contratract generated from the consumer.

As always, we start from the tests. As for the producer, we’ll have two types of test, one to verify the pact and the other one to verify the business logic in details (unit tests). The server implementation is usually much bigger than a client, so I think it is better to start from the unit test, and once we have a fully working application, we can create the test to verify the pact (or contract).

Also, I always suggest an incremental approach (even for small project), so in this case we can just build a server which expose a single api and returns a static list of two categories (as defined in the Pact file) and then add configuration support, DB support, migration support and so on.

Here we go, the unit test for our API:

CategoriesRoutesSpec.scala

package com.fm.mylibrary.producer

import com.fm.mylibrary.model.Category
import com.fm.mylibrary.model.JsonProtocol._


class CategoriesRoutesSpec extends BaseTestAppServer {

 "The service" should {

   "return an empty JSon array if there are no categories" in {
     Get("/search/category") ~> routes ~> check {
       responseAs[List[Category]] shouldBe List(Category("DevOps"), Category("Java"))
     }
   }
 }
}

And the base test class BaseTestAppServer with all test dependencies:

BaseTestAppServer.scala

package com.fm.mylibrary.producer

import akka.http.scaladsl.testkit.ScalatestRouteTest
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}

import scala.concurrent.ExecutionContextExecutor


class BaseTestAppServer extends WordSpec
           with ScalatestRouteTest
           with Matchers
           with MockFactory
           with Routes
           with BeforeAndAfterAll {

 implicit val executionContext: ExecutionContextExecutor = system.dispatcher

}

The test is implemented using the Akka HTTP Route TestKit, you can found the official documentation here, it allows to build tests on routes in this format:

REQUEST ~> ROUTE ~> check {
    ASSERTIONS 
}

The BaseTestAppServer class contains the base dependencies like WordSpec, ScalatestRouteTest, Matchers, MockFactory, BeforeAndAfterAll and the trait which defines the application routes: Routes.

Of course it won't compile nor pass because there is no implementation yet, so let’s define our route:

Routes.scala

package com.fm.mylibrary.producer

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.Materializer
import com.fm.mylibrary.model.Category

import scala.concurrent.ExecutionContext
import spray.json._
import com.fm.mylibrary.model.JsonProtocol._


trait Routes {

 implicit val materializer: Materializer
 implicit val executionContext: ExecutionContext

 val searchRoutes: Route = {
   pathPrefix("search" / "category") {
     get {
       complete(
         List(Category("DevOps"), Category("Java")).toJson
       )
     }
   }
 }

 val routes: Route = searchRoutes
}

I’m using spray-json for the json marshalling/unmarshalling and it requires the definition of the protocol (or format) to use for the conversion, you can see the import of this object in the code: import com.fm.mylibrary.model.JsonProtocol._; it is also necessary to import import spray.json._ which provide all function for the conversion; in this case I’m using toJson, which looks for the implicit definition of protocol (or format) for the specific object it is going to convert.

JsonProtocol.scala

package com.fm.mylibrary.model

import spray.json._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport

object JsonProtocol extends SprayJsonSupport with DefaultJsonProtocol {
 implicit val categoryFormat = jsonFormat1(Category)
}

There is no need to define converters for objects like List, Array, Options, and so on because they are provided by spry-json in DefaultJsonProtocol 

There are other libraries for this purpose, like Argonaut and JSON4S, feel free to evaluate all of them and pick the one which best fits your needs.

If we execute the test again we should get a green line now. Again, add more test in order to cover every case. Before that, to check if our service respects the consumer contract, we have to complete the base serve implementation defining the Akka HTTP app:

MyLibraryAppServer.scala

package com.fm.mylibrary.producer.app

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.ActorMaterializer
import com.fm.mylibrary.producer.Routes

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}


object MyLibraryAppServer extends App
         with Routes
         with DebuggingDirectives {

 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher

 val log = actorSystem.log


 def startApplication(): Unit = {
   Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = "localhost", port = 9000).onComplete {
     case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
     case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
   }
 }

 def stopApplication(): Unit = {
   actorSystem.terminate()
 }

 startApplication()
}

This class defines two methods, one necessary to start our server and the other to stop it, it also defines the actor system and the execution context which will be used in the routes handling.

It extends the trait scala.App which provide the main method, so you can execute this class and it will start an http server that provides the defined routes.

But first, let’s check if the pact is fulfilled, we can easily verify it with a test class like that:

MyLibraryServerPactSpec.scala

package com.fm.mylibrary.producer.pact

import com.fm.mylibrary.producer.app.MyLibraryAppServer
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
import com.itv.scalapact.ScalaPactVerify._


class MyLibraryServerPactSpec extends FunSpec with Matchers with BeforeAndAfterAll {

 override def beforeAll() {
   MyLibraryAppServer.main(Array())
 }

 override def afterAll() {
   MyLibraryAppServer.stopApplication()
 }


 describe("Verifying MyLibrary server") {
   it("should be able to respect the contract"){
     verifyPact
       .withPactSource(loadFromLocal("target/pacts"))
       .noSetupRequired
       .runVerificationAgainst("localhost", 9999)
   }
 }
}

It uses the object verifyPact which can be used in a similar way as forgePact, defining the source of Pact files, target/pacts in our case (but could be a shared location or a Pact Broker), the eventual code necessary to setup the data or environment necessary to execute all interaction and then the host and port where the server is listening for request.

So, as per the Consumer test, scala-pact expects to do a real HTTP call, so we need to setup the application in order to handle this call. We can do it in many ways, I have choosen the safe and easy solution for me, which is to start the server as will be done in production, calling the main method of MyLibraryAppServer before to execute the test, and of course shutting down it afterward. If the application is easy we can use this approach, if it is not, we may implement a specific test runner for this kind of tests, but I suggest to be as much as possible similar to the production case.

Executing the test, we should get a pass and an output like this:

[DEBUG] [02/13/2018 16:45:09.053] [ScalaTest-run] [EventStream(akka://default)] logger log1-Logging$DefaultLogger started
[DEBUG] [02/13/2018 16:45:09.054] [ScalaTest-run] [EventStream(akka://default)] Default Loggers started
[DEBUG] [02/13/2018 16:45:09.110] [ScalaTest-run] [AkkaSSLConfig(akka://default)] Initializing AkkaSSLConfig extension...
[DEBUG] [02/13/2018 16:45:09.112] [ScalaTest-run] [AkkaSSLConfig(akka://default)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@1bb571c
[DEBUG] [02/13/2018 16:45:10.244] [default-akka.actor.default-dispatcher-3] [akka://default/system/IO-TCP/selectors/$a/0] Successfully bound to /127.0.0.1:9000
[INFO] [02/13/2018 16:45:10.256] [default-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(default)] application is up and running at 127.0.0.1:9000
Attempting to use local pact files at: 'target/pacts'
Looking for pact files in: target/pacts
Found directory: C:\Dev\git-1.0.6\home\src-rnd\myLibrary-contracts\target\pacts
Loading pact file: ScalaConsumer_myLibraryServer.json
Verifying against 'localhost' on port '9000' with a timeout of 2 second(s).
--------------------
Attempting to run provider state: Categories: [Java, DevOps]
Provider state ran successfully
--------------------
[DEBUG] [02/13/2018 16:45:10.883] [default-akka.actor.default-dispatcher-4] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [02/13/2018 16:45:11.146] [default-akka.actor.default-dispatcher-2] [akka.actor.ActorSystemImpl(default)] log: Response for
  Request : HttpRequest(HttpMethod(GET),http://localhost:9000/search/category,List(Host: localhost:9000, User-Agent: scala-pact/0.16.2, Timeout-Access: <function1>),HttpEntity.Strict(none/none,ByteString()),HttpProtocol(HTTP/1.1))
  Response: Complete(HttpResponse(200 OK,List(),HttpEntity.Strict(application/json,[{"name":"DevOps"},{"name":"Java"}]),HttpProtocol(HTTP/1.1)))
[http4s-blaze-client-1] INFO org.http4s.client.PoolManager - Shutting down connection pool: allocated=1 idleQueue.size=1 waitQueue.size=0
[DEBUG] [02/13/2018 16:45:11.262] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/1] Closing connection due to IO error java.io.IOException: An existing connection was forcibly closed by the remote host
Results for pact between ScalaConsumer and myLibraryServer
 - [  OK  ] Fetching categories
[DEBUG] [02/13/2018 16:45:11.391] [default-akka.actor.default-dispatcher-9] [EventStream] shutting down: StandardOutLogger started
[DEBUG] [02/13/2018 16:45:11.391] [default-akka.actor.default-dispatcher-7] [akka://default/system/IO-TCP/selectors/$a/0] Monitored actor [Actor[akka://default/user/StreamSupervisor-0/$$a#-487633161]] terminated

Process finished with exit code 0

Make sure to have the pact files in target/pacts, if there aren’t you can just execute MyLibraryClientPactSpec.

The consumer pact seems to be respected, so we can continue our implementation adding an external configuration file, a DB support and a DB migration support.

Add an external configuration is really easy, just create the file application.conf under src/main/resources and put it all configuration values, i.e.:

application.conf

akka {
 loglevel = DEBUG
}

http {
 interface = "0.0.0.0"
 port = 9000
}

database = {
 url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1"
 driver = org.h2.Driver
 connectionPool = disabled
 keepAliveConnection = true
 user = "sa"
 password = ""
}

Then you can create a trait which handles it, so loads the configuration and spreads the corresponding well-named constants:

Config.scala

package com.fm.mylibrary.producer

import com.typesafe.config.ConfigFactory

trait Config {

 private val config = ConfigFactory.load()

 private val httpConfig = config.getConfig("http")
 private val databaseConfig = config.getConfig("database")

 val httpInterface: String = httpConfig.getString("interface")
 val httpPort: Int = httpConfig.getInt("port")

 val databaseUrl: String = databaseConfig.getString("url")
 val databaseUser: String = databaseConfig.getString("user")
 val databasePassword: String = databaseConfig.getString("password")
}

By default, ConfigFactory.load() loads the configuration from the location src/main/resources/application.conf

We can also put a configuration version for the tests in src/test/resources:

application.conf

akka {
 loglevel = DEBUG
}

http {
 interface = "localhost"
 port = 9999
}

database = {
 url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1"
 driver = org.h2.Driver
 connectionPool = disabled
 keepAliveConnection = true
 user = "sa"
 password = ""
}

Not much is different from the real one in this case because I’m using an in-memory db for both.

Using it in the main class is quite easy; just add it as a class trait and replace the static value with the corresponding constants:

MyLibraryAppServer.scala

package com.fm.mylibrary.producer.app

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.ActorMaterializer
import com.fm.mylibrary.producer.{Config, Routes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}


object MyLibraryAppServer extends App
         with Routes
         with Config
         with DebuggingDirectives {

 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher

 val log = actorSystem.log


 def startApplication(): Unit = {
   Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = httpInterface, port = httpPort).onComplete {
     case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
     case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
   }
 }

 def stopApplication(): Unit = {
   actorSystem.terminate()
 }

 startApplication()
}

You can use the configuration in the Pact test as well in order to use the correct server address:

MyLibraryServerPactSpec.scala

package com.fm.mylibrary.producer.pact

import com.fm.mylibrary.producer.Config
import com.fm.mylibrary.producer.app.MyLibraryAppServer
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
import com.itv.scalapact.ScalaPactVerify._


class MyLibraryServerPactSpec extends FunSpec with Matchers with BeforeAndAfterAll with Config {

 override def beforeAll() {
   MyLibraryAppServer.main(Array())
 }

 override def afterAll() {
   MyLibraryAppServer.stopApplication()
 }


 describe("Verifying MyLibrary server") {
   it("should be able to respect the contract"){
     verifyPact
       .withPactSource(loadFromLocal("target/pacts"))
       .noSetupRequired
       .runVerificationAgainst(httpInterface, httpPort)
   }
 }
}

And now we can finally add the DB support with the migration as well.

First of all, we have to define our entities (or tables), and in our example we need only one: Category.

CategoryEntity.scala

package com.fm.mylibrary.producer.entity

import com.fm.mylibrary.model.Category
import slick.jdbc.H2Profile.api._


trait CategoryEntity {

 class Categories(tag: Tag) extends Table[Category](tag, "CATEGORY") {
   def name = column[String]("NAME", O.PrimaryKey)

   def * = name  <> (Category.apply, Category.unapply)
 }

 protected val categories = TableQuery[Categories]
}

It's a standard slick table definition; you can see this table has only one column which is primary key as well, and it is in relation to the class Category Table[Category]

It can be instantiated from the Category class, as defined here: def * = name <> (Category.apply, Category.unapply). Make sure the model class implements both apply and unapply, the easiest way is to define the model class as Case Class.

The last instruction is to define the TableQuery object which is necessary to execute any kind of query for that table. Let’s define also our main entry point for any DB interaction, I have implemented it with a trait which can be used by any class needs the DB access:

DatabaseSupport.scala

package com.fm.mylibrary.producer.db

import slick.jdbc.H2Profile
import slick.jdbc.H2Profile.api._


trait DatabaseSupport {

 val db: H2Profile.backend.Database = Database.forConfig("database")

 def closeDB(): Unit = db.close
}

We can now define the layer necessary to operate on the Category table, the DAO. I have created it in the same file of CategoryEntity but you can move it to a different file if you want to have a different packaging:

CategoryEntity.scala

package com.fm.mylibrary.producer.entity

import com.fm.mylibrary.model.Category
import com.fm.mylibrary.producer.db.DatabaseSupport
import slick.jdbc.H2Profile.api._

import scala.concurrent.Future

trait CategoryEntity {

 class Categories(tag: Tag) extends Table[Category](tag, "CATEGORY") {
   def name = column[String]("NAME", O.PrimaryKey)

   def * = name  <> (Category.apply, Category.unapply)
 }

 protected val categories = TableQuery[Categories]
}


class CategoryDAO extends CategoryEntity with DatabaseSupport {

 def insertOrUpdate(category: Category): Future[Int] =
       db.run(categories.insertOrUpdate(category))

 def findAll(): Future[Seq[Category]] =
       db.run(categories.result)
}


CategoryDAO extends both CategoryEntity and DatabaseSupport, the first is necessary to get the Category table query object and the second is necessary to get the database instance to use to execute the queries.

I have implemented only two methods, more than enough for our test. As you can see, I’m using the base method provided by Slick, and thanks to the fact that the entity Categories and the model Category are linked each other the DAO can returns directly the model with no explicit conversion. You can found more examples and information on how to implement Entities and DAOs with Slick in the official documentation.

I usually do not implement tests for DAO if they implement the standard queries provided by the library, I don’t see any point in testing external libraries method and also they are already covered by the routes tests. However, if the DAO implements complex queries with multiple tables involved I strongly suggest to unit test it with all possible corner cases.

In order to start our application now, it is necessary a DB with the Category table, and we can do it manually or better let the machine does the work for us. So we can implement a DB migration produced which is able to apply any DB changes necessary to execute the application at startup time.

As we have done for the DB support, we can implement a trait which provides the functionality to perform the migration:

DatabaseMigrationSupport.scala

package com.fm.mylibrary.producer.db

import com.fm.mylibrary.producer.Config
import org.flywaydb.core.Flyway


trait DatabaseMigrationSupport extends Config {

 private val flyway = new Flyway()
 flyway.setDataSource(databaseUrl, databaseUser, databasePassword)

 def migrateDB(): Unit = {
   flyway.migrate()
 }

 def reloadSchema(): Unit = {
   flyway.clean()
   flyway.migrate()
 }
}

This exposes two methods, one for the incremental migration and one to re-execute the whole migration. It uses the Config trait to get the database connection information.

By default, Flayway looks for the migration sql scripts file in src/main/resources/db/migration and it expects files with a specific name format:

Get more information from the official migration documentation.

So, our first migration script is the creation of Category table:

V1__Create_Category.sql

CREATE TABLE category (
 name VARCHAR(255) NOT NULL PRIMARY KEY
);

And we can execute it at server startup:

MyLibraryAppServer.scala

package com.fm.mylibrary.producer.app

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.ActorMaterializer
import com.fm.mylibrary.producer.db.DatabaseMigrationSupport
import com.fm.mylibrary.producer.{Config, Routes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}


object MyLibraryAppServer extends App
         with Routes
         with Config
         with DatabaseMigrationSupport
         with DebuggingDirectives {

 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher

 val log = actorSystem.log


 def startApplication(): Unit = {
   migrateDB()

   Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = httpInterface, port = httpPort).onComplete {
     case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
     case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
   }
 }

 def stopApplication(): Unit = {
   actorSystem.terminate()
 }

 startApplication()
}

We have added the DatabaseMigrationSupport and the invocation of migrateDB() method before the HTTP binding.

And last thing to link our new data source with the business logic, change the route in order to retrieve the categories from the DB:

Routes.scala

package com.fm.mylibrary.producer

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.Materializer
import com.fm.mylibrary.producer.entity.CategoryDAO

import scala.concurrent.ExecutionContext

import spray.json._
import com.fm.mylibrary.model.JsonProtocol._


trait Routes {

 implicit val materializer: Materializer
 implicit val executionContext: ExecutionContext

 private val categoryEntityDAO = new CategoryDAO()


 val searchRoutes: Route = {
   pathPrefix("search" / "category") {
     get {
       complete(
         categoryEntityDAO.findAll()
             .map(_.toJson)
       )
     }
   }
 }

 val routes: Route = searchRoutes
}

We just replaced the static list with the findAll method invocation from the dao.

You can see that the dao is instantiated inside the trait, if the logic becomes more complex I suggest to move it as a required parameter (implicit or class attribute) so you can inject them from outside. In our case there is no need, the logic is really easy and on test side we are using an in-memory DB, so there is no necessity to mock it.

Back on the test route, it will fail because there are no data, so let’s add them. We can easily do it with a trait which implements a method that adds a couple of categories:

MockData.data

package com.fm.mylibrary.producer.db

import com.fm.mylibrary.model.Category
import com.fm.mylibrary.producer.entity.CategoryDAO

import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration.Duration

trait MockData {

 implicit val executionContext: ExecutionContext


 def addMockCategories(): Unit = {
   val categoryEntityDAO = new CategoryDAO()

   val setupFuture = for {
     c1 <- categoryEntityDAO.insertOrUpdate(Category("Java"))
     c2 <- categoryEntityDAO.insertOrUpdate(Category("DevOps"))
   } yield c1 + c2

   Await.result(setupFuture, Duration.Inf)
 }

}

Add it in the BaseAppServerTestApp and MyLibraryAppServer so that we can easily verify the application with routes tests and Pact test:

MyLibraryAppServer.scala

package com.fm.mylibrary.producer.app

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.ActorMaterializer
import com.fm.mylibrary.producer.db.{DatabaseMigrationSupport, MockData}
import com.fm.mylibrary.producer.{Config, Routes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}


object MyLibraryAppServer extends App
         with Routes
         with Config
         with DatabaseMigrationSupport
         with MockData
         with DebuggingDirectives {

 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher

 val log = actorSystem.log


 def startApplication(): Unit = {
   migrateDB()
   addMockCategories()

   Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = httpInterface, port = httpPort).onComplete {
     case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
     case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
   }
 }

 def stopApplication(): Unit = {
   actorSystem.terminate()
 }

 startApplication()
}


BaseTestAppServer.scala

package com.fm.mylibrary.producer

import akka.http.scaladsl.testkit.ScalatestRouteTest
import com.fm.mylibrary.producer.db.{DatabaseMigrationSupport, MockData}
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}

import scala.concurrent.ExecutionContextExecutor


class BaseTestAppServer extends WordSpec
           with ScalatestRouteTest
           with Matchers
           with MockFactory
           with DatabaseMigrationSupport
           with MockData
           with Routes
           with BeforeAndAfterAll {

 implicit val executionContext: ExecutionContextExecutor = system.dispatcher

 override def beforeAll(): Unit = {
   migrateDB()
   addMockCategories()
 }

}

If we execute all tests we should get no issue; you can do that with the command  sbt test 

If we start the server, with sbt run, and execute the GET /search/category we should get our two categories:

Bottom Line

Consumer Driven Contract testing is a great technique which can save a lot of time and issues related to Integration tests.

All implementation are "contract-centric" so it means that we are force to think first in how we want consumer a specific service and we have to provide a specific service, and then we don’t need to setup infrastructure to execute integration tests between services.

On the other hand, Scala-pact is not well documented, so it could be challenging setup complex test, and the only way I have found is to navigate its examples and the source code.

We have seen a really easy example, barely used in a real environment, but hopefully, you can use it as starting point for the next microservices.

More About CDC and Pact

I have shown you the most basic usage of Pact, and it may be not enough for a real environment, where there are many teams, each of one working in “concurrency“ with many Producer and Consumers where the communication is important as well as the automation and the tools used to address it.

In case of CDC and Pact, you have to automate the contracts handling (publish/verify) and link it with the CI/CD (Continuous Integration/Continuous Delivery) process so that no clients can go into production if there is not relative producer which respects their contract and no producer can go into production if it breaks some contracts.

So, I strongly suggest treading the official documentation of Pact and maybe introducer Pact Broker into your CI/CD process, it is an application which provides the following features (from the official doc):

Feel free to ask any question, and if you need advice, I’ll be more than happy to help.

 

 

 

 

Top