Build Fault Tolerant Applications With Cassandra API for Azure Cosmos DB

Azure Cosmos DB is a resource governed system that allows you to execute a certain number of operations per second based on the provisioned throughput you have configured. If clients exceed that limit and consume more request units than what was provisioned, it leads to rate limiting of subsequent requests and exceptions being thrown — they are also referred to as 429 errors.

With the help of a practical example, I’ll demonstrate how to incorporate fault-tolerance in your Go applications by handling and retrying operations affected by these rate limiting errors. To help you follow along, the sample application code for this blog is available on GitHub — it uses the gocql driver for Apache Cassandra.

In this post, we’ll go through

One way of tackling rate limiting is by adjusting provisioned throughput to meet your application requirements. There are multiple ways to do this, including using Azure portal, Azure CLI, and CQL (Cassandra Query Language) commands.

But What If You Wanted to Handle These Errors in the Application Itself?

The good thing is that the Cassandra API for Azure Cosmos DB translates the rate limiting exceptions into overloaded errors on the Cassandra native protocol. Since the gocql driver allows you to plugin your own RetryPolicy, you can write a custom implementation to intercept these errors and retry them after a certain (cool down) time period. This policy can then be applied to each Query or at a global level using a ClusterConfig.

The Azure Cosmos DB extension library makes it quite easy to use Retry Policies in your Java applications. An equivalent Go version is available on GitHub and has been used in the sample application for this blog post.


Retry Policy in Action

As promised, you will walk through the entire process using a simple yet practical example. The sample application used to demonstrate the concepts is a service that exposes a REST endpoint to POST orders data which is persisted to a Cassandra table in Azure Cosmos DB.

You will run a few load tests on this API service to see how rate limiting manifests itself and how it’s handled.

Prerequisites

Start by installing hey, a load testing program. You can download OS specific binaries (64-bit) for Linux, Mac and Windows (please refer to the GitHub repo for latest information in case you face issues downloading the utility)

You can use any other tool that allows you to generate load on an HTTP endpoint

Clone this GitHub repo and change into the right directory:

Java
 




x


 
1
git clone github.com/abhirockzz/cosmos-go-rate-limiting 
2
cd cosmos-go-rate-limiting



Setup Azure Cosmos DB

Create an Azure Cosmos DB account with the Cassandra API option selected

To create a Keyspace and Table, use the following CQL:

Java
 




xxxxxxxxxx
1


 
1
CREATE KEYSPACE ordersapp WITH REPLICATION = {'class' : 'SimpleStrategy'};
2
    
3
    CREATE TABLE ordersapp.orders (
4
        id uuid PRIMARY KEY,
5
        amount int,
6
        state text,
7
        time timestamp
8
    );



Start the Application

Open a terminal and set the environment variables for the application:

Java
 




xxxxxxxxxx
1


 
1
export COSMOSDB_CASSANDRA_CONTACT_POINT=.cassandra.cosmos.azure.com 
2
export COSMOSDB_CASSANDRA_PORT=10350 
3
export COSMOSDB_CASSANDRA_USER= 
4
export COSMOSDB_CASSANDRA_PASSWORD= 
5
#optional (default: 5) 
6
#export MAX_RETRIES=



To start the application:

Java
 




xxxxxxxxxx
1


 
1
go run main.go 
2
    
3
//wait for this output 
4
Connected to Azure Cosmos DB



To test whether the application is working as expected, insert a few orders by invoking the REST endpoint (once for each order) from a different terminal:

Java
 




xxxxxxxxxx
1


 
1
curl http://localhost:8080/orders



The application generates random data so you don’t have to enter it while invoking the endpoint

Confirm that the order was successfully stored. You can use the hosted CQL shell in the Azure portal and execute the below query:

Java
 




xxxxxxxxxx
1


 
1
select count(*) from ordersapp.orders;
2
    
3
    // you should see this output
4
    system.count(*) 
5
    ----------------- 
6
        1 
7
    (1 rows)



You’re all set.


Let the Load Tests Begin!

Invoke the REST endpoint with 300 requests. This is enough to overload the system since you only have 400 RU/s allocated by default.

To start the load test:

Java
 




xxxxxxxxxx
1


 
1
hey -t 0 -n 300 http://localhost:8080/orders



Notice the logs in the application terminal. In the beginning, you will see that the orders are being successfully created. For example:

Java
 




xxxxxxxxxx
1


 
1
Added order ID 25a8cec1-e67a-11ea-9c17-7f242c2eeac0
2
Added order ID 25a8f5ef-e67a-11ea-9c17-7f242c2eeac0
3
Added order ID 25a8f5ea-e67a-11ea-9c17-7f242c2eeac0
4
...



After a while, as the throughput degrades and eventually exceeds the provisioned limit, Azure Cosmos DB will rate limit the application requests. This will manifest itself in the form of an error which looks similar to this:

Java
 




xxxxxxxxxx
1


 
1
Request rate is large: ActivityID=ac78fac3-5c36-4a20-8ad7-4b2d0768ffe4, RetryAfterMs=112, Additional details='Response status code does not indicate success: TooManyRequests (429); Substatus: 3200; ActivityId: ac78fac3-5c36-4a20-8ad7-4b2d0768ffe4; Reason: ({
2
      "Errors": [
3
        "Request rate is large. More Request Units may be needed, so no changes were made. Please retry this request later. Learn more: http://aka.ms/cosmosdb-error-429"
4
      ]
5
    });


In the error message above, notice the following: TooManyRequests (429) and RetryAfterMs=112

Observing Query errors

To keep things simple, we will use the log output for testing/diagnostic purposes. Any error (related to rate–limiting in this case) encountered during query execution is intercepted by a gocql.QueryObserver. The randomly generated order ID is also logged with each error message so that you can check the logs to confirm if the failed order has been re–tried and (eventually) stored in Azure Cosmos DB.

Here is the code snippet:

Java
 




xxxxxxxxxx
1
19


 
1
....
2
    type OrderInsertErrorLogger struct {
3
       orderID string
4
    }
5
    
6
    // implements gocql.QueryObserver
7
    func (l OrderInsertErrorLogger) ObserveQuery(ctx context.Context, oq gocql.ObservedQuery) {
8
      err := oq.Err
9
      if err != nil {
10
         log.Printf("Query error for order ID %sn%v", l.orderID, err)
11
      }
12
    }
13
    
14
    ....
15
    
16
    // the Observer is associated with each query
17
    rid, _ := uuid.GenerateUUID()
18
    err := cs.Query(insertQuery).Bind(rid, rand.Intn(200)+50, fixedLocation, time.Now()).Observer(OrderInsertErrorLogger{orderID: rid}).Exec()
19
    ....



How many orders made it through?

Switch back to the load testing terminal and check some of the statistics (output has been redacted for brevity)

Java
 




xxxxxxxxxx
1
11


 
1
Summary: 
2
    
3
      Total:        2.8507 secs 
4
      Slowest:      1.3437 secs 
5
      Fastest:      0.2428 secs 
6
      Average:      0.5389 secs 
7
      Requests/sec: 70.1592 
8
    .... 
9
    
10
    Status code distribution: 
11
      [200] 300 responses


The numbers will differ in your specific case depending on multiple factors.

This is not a raw benchmarking test and neither do we have a production grade application, so you can ignore the Requests/sec etc. But draw our attention to the Status code distribution attribute which shows that our application responded with a HTTP 200 for all the requests.

Let’s confirm the final numbers. Open the Cassandra Shell in the Azure Cosmos DB portal and execute the same query:

Java
 




xxxxxxxxxx
1


 
1
select count(*) from ordersapp.orders;
2
    
3
    //output
4
    
5
    system.count(*)
6
    -----------------
7
        301



You should see 300 additional rows (orders) have been inserted. The key takeaway is that all the orders were successfully stored in Azure Cosmos DB de–spite the rate limiting errors because our application code transparently retried them based on the Retry Policy that we configured (with a single line of code!)

Java
 




xxxxxxxxxx
1


 
1
clusterConfig.RetryPolicy = retry.NewCosmosRetryPolicy(numRetries)



A Note on Dynamic Throughput Management

If your application spends most of its time operating at about 60–70% of it’s throughput, using Autoscale provisioned throughputcan help optimize your RU/s and cost usage by scaling down when not in use — you only pay for the resources that your workloads need on a per-hour basis.

So, what happens without the Retry Policy?


Deactivate the Policy to See the Difference

Stop the application (press control+c in the terminal), set an environment variable and re-start the application:

Java
 




xxxxxxxxxx
1


 
1
export USE_RETRY_POLICY=false 
2
go run main.go



Before running the load test again, make a note of the number of rows in the orders table using select count(*) from ordersapp.orders;

Java
 




xxxxxxxxxx
1


 
1
hey -t 0 -n 300 http://localhost:8080/orders



In the application logs, you will notice the same rate limiting errors. In the terminal where you ran the load test, at the end of the output summary, you will see that some the requests failed to complete successfully i.e. they returned a response other than HTTP 200

Java
 




xxxxxxxxxx
1


 
1
...
2
Status code distribution: 
3
  [200] 240 responses 
4
  [429] 60 responses



Because the Retry Policy was not enforced, the application no longer re–tried the requests that failed due to rate-limiting.

Increase Provisioned Throughput

You can increase the Request Units using the Azure Portal (for example, double it to 800 RU/s) and run the same load test

Java
 




xxxxxxxxxx
1


 
1
hey -t 0 -n 300 http://localhost:8080/orders



You will not see the rate limiting (HTTP 429) errors now and relatively low numbers for latency, requests per second etc.

Try increasing the number of requests (use the -n flag) to see when the throughput threshold is breached for the application to get rate limited. As expected, all the orders will be persisted successfully (without any errors or retries)

The next section briefly covers how the custom Retry Policy works.

This is an experimental implementation, and you should write custom policies to suit fault-tolerance and performance requirements of your applications.


Behind the Scenes

CosmosRetryPolicy adheres to the gocql.RetryPolicy interface by implementing the Attempt and GetRetry functions.

Java
 




xxxxxxxxxx
1


 
1
type CosmosRetryPolicy struct {
2
        MaxRetryCount         int
3
        FixedBackOffTimeMs    int
4
        GrowingBackOffTimeMs  int
5
        numAttempts           int
6
}



Retry is initiated only if the number of retry attempts for that query are less than or equal to max retry config or max retry config is set to -1 (infinite retries)

Java
 




xxxxxxxxxx
1


 
1
func (crp *CosmosRetryPolicy) Attempt(rq gocql.RetryableQuery) bool { 
2
    crp.numAttempts = rq.Attempts() 
3
    return rq.Attempts() <= crp.MaxRetryCount || crp.MaxRetryCount == -1
4
}



GetRetryType function detects the type of error and in the case or a rate-limited error (HTTP 429), it tries to extract the value for RetryAfterMs field (from the error message) and uses that to sleep before retrying the query.

Java
 




xxxxxxxxxx
1
13


 
1
func (crp *CosmosRetryPolicy) GetRetryType(err error) gocql.RetryType {
2
    
3
       switch err.(type) {
4
       default:
5
             retryAfterMs := crp.getRetryAfterMs(err.Error())
6
             if retryAfterMs == -1 {
7
                 return gocql.Rethrow
8
             }
9
            time.Sleep(retryAfterMs)
10
            return gocql.Retry
11
    
12
    //other case statements have been omitted for brevity
13
}



Azure Cosmos DB provides you the flexibility to not only configure and adjust your throughput requirements using a variety of ways but also provides the basic primitive that allows applications to handle rate limiting errors, thereby making them robust and fault-tolerant. This blog post demonstrated how you can do this for Go applications, but the concepts are applicable to any language and its respective CQL compatible driver that you choose for working with the Cassandra API for Azure Cosmos DB.


To Learn More:

Check out some of these resources from the official documentation:

 

 

 

 

Top