Build Fault Tolerant Applications With Cassandra API for Azure Cosmos DB
Azure Cosmos DB is a resource governed system that allows you to execute a certain number of operations per second based on the provisioned throughput you have configured. If clients exceed that limit and consume more request units than what was provisioned, it leads to rate limiting of subsequent requests and exceptions being thrown — they are also referred to as 429 errors.
With the help of a practical example, I’ll demonstrate how to incorporate fault-tolerance in your Go applications by handling and retrying operations affected by these rate limiting errors. To help you follow along, the sample application code for this blog is available on GitHub — it uses the gocql driver for Apache Cassandra.
In this post, we’ll go through
- Initial setup and configuration before running the sample application
- Execution of various load test scenarios and analyze the results
- A quick overview of the Retry Policy implementation.
One way of tackling rate limiting is by adjusting provisioned throughput to meet your application requirements. There are multiple ways to do this, including using Azure portal, Azure CLI, and CQL (Cassandra Query Language) commands.
But What If You Wanted to Handle These Errors in the Application Itself?
The good thing is that the Cassandra API for Azure Cosmos DB translates the rate limiting exceptions into overloaded errors on the Cassandra native protocol. Since the gocql
driver allows you to plugin your own RetryPolicy, you can write a custom implementation to intercept these errors and retry them after a certain (cool down) time period. This policy can then be applied to each Query or at a global level using a ClusterConfig.
The Azure Cosmos DB extension library makes it quite easy to use Retry Policies in your Java applications. An equivalent Go version is available on GitHub and has been used in the sample application for this blog post.
Retry Policy in Action
As promised, you will walk through the entire process using a simple yet practical example. The sample application used to demonstrate the concepts is a service that exposes a REST endpoint to POST
orders data which is persisted to a Cassandra table in Azure Cosmos DB.
You will run a few load tests on this API service to see how rate limiting manifests itself and how it’s handled.
Prerequisites
Start by installing hey, a load testing program. You can download OS specific binaries (64-bit) for Linux, Mac and Windows (please refer to the GitHub repo for latest information in case you face issues downloading the utility)
You can use any other tool that allows you to generate load on an HTTP endpoint
Clone this GitHub repo and change into the right directory:
git clone github.com/abhirockzz/cosmos-go-rate-limiting
cd cosmos-go-rate-limiting
Setup Azure Cosmos DB
Create an Azure Cosmos DB account with the Cassandra API option selected
To create a Keyspace and Table, use the following CQL
:
xxxxxxxxxx
CREATE KEYSPACE ordersapp WITH REPLICATION = {'class' : 'SimpleStrategy'};
CREATE TABLE ordersapp.orders (
id uuid PRIMARY KEY,
amount int,
state text,
time timestamp
);
Start the Application
Open a terminal and set the environment variables for the application:
xxxxxxxxxx
export COSMOSDB_CASSANDRA_CONTACT_POINT=.cassandra.cosmos.azure.com
export COSMOSDB_CASSANDRA_PORT=10350
export COSMOSDB_CASSANDRA_USER=
export COSMOSDB_CASSANDRA_PASSWORD=
#optional (default: 5)
#export MAX_RETRIES=
To start the application:
xxxxxxxxxx
go run main.go
//wait for this output
Connected to Azure Cosmos DB
To test whether the application is working as expected, insert a few orders by invoking the REST endpoint (once for each order) from a different terminal:
xxxxxxxxxx
curl http://localhost:8080/orders
The application generates random data so you don’t have to enter it while invoking the endpoint
Confirm that the order was successfully stored. You can use the hosted CQL shell in the Azure portal and execute the below query:
xxxxxxxxxx
select count(*) from ordersapp.orders;
// you should see this output
system.count(*)
-----------------
1
(1 rows)
You’re all set.
Let the Load Tests Begin!
Invoke the REST endpoint with 300 requests. This is enough to overload the system since you only have 400 RU/s allocated by default.
To start the load test:
xxxxxxxxxx
hey -t 0 -n 300 http://localhost:8080/orders
Notice the logs in the application terminal. In the beginning, you will see that the orders are being successfully created. For example:
xxxxxxxxxx
Added order ID 25a8cec1-e67a-11ea-9c17-7f242c2eeac0
Added order ID 25a8f5ef-e67a-11ea-9c17-7f242c2eeac0
Added order ID 25a8f5ea-e67a-11ea-9c17-7f242c2eeac0
...
After a while, as the throughput degrades and eventually exceeds the provisioned limit, Azure Cosmos DB will rate limit the application requests. This will manifest itself in the form of an error which looks similar to this:
xxxxxxxxxx
Request rate is large: ActivityID=ac78fac3-5c36-4a20-8ad7-4b2d0768ffe4, RetryAfterMs=112, Additional details='Response status code does not indicate success: TooManyRequests (429); Substatus: 3200; ActivityId: ac78fac3-5c36-4a20-8ad7-4b2d0768ffe4; Reason: ({
"Errors": [
"Request rate is large. More Request Units may be needed, so no changes were made. Please retry this request later. Learn more: http://aka.ms/cosmosdb-error-429"
]
});
In the error message above, notice the following: TooManyRequests (429) and RetryAfterMs=112
Observing Query errors
To keep things simple, we will use the log output for testing/diagnostic purposes. Any error (related to rate–limiting in this case) encountered during query execution is intercepted by a gocql.QueryObserver. The randomly generated order ID is also logged with each error message so that you can check the logs to confirm if the failed order has been re–tried and (eventually) stored in Azure Cosmos DB.
Here is the code snippet:
xxxxxxxxxx
....
type OrderInsertErrorLogger struct {
orderID string
}
// implements gocql.QueryObserver
func (l OrderInsertErrorLogger) ObserveQuery(ctx context.Context, oq gocql.ObservedQuery) {
err := oq.Err
if err != nil {
log.Printf("Query error for order ID %sn%v", l.orderID, err)
}
}
....
// the Observer is associated with each query
rid, _ := uuid.GenerateUUID()
err := cs.Query(insertQuery).Bind(rid, rand.Intn(200)+50, fixedLocation, time.Now()).Observer(OrderInsertErrorLogger{orderID: rid}).Exec()
....
How many orders made it through?
Switch back to the load testing terminal and check some of the statistics (output has been redacted for brevity)
xxxxxxxxxx
Summary:
Total: 2.8507 secs
Slowest: 1.3437 secs
Fastest: 0.2428 secs
Average: 0.5389 secs
Requests/sec: 70.1592
....
Status code distribution:
[200] 300 responses
The numbers will differ in your specific case depending on multiple factors.
This is not a raw benchmarking test and neither do we have a production grade application, so you can ignore the Requests/sec
etc. But draw our attention to the Status code distribution
attribute which shows that our application responded with a HTTP
200
for all the requests.
Let’s confirm the final numbers. Open the Cassandra Shell in the Azure Cosmos DB portal and execute the same query:
xxxxxxxxxx
select count(*) from ordersapp.orders;
//output
system.count(*)
-----------------
301
You should see 300 additional rows (orders) have been inserted. The key takeaway is that all the orders were successfully stored in Azure Cosmos DB de–spite the rate limiting errors because our application code transparently retried them based on the Retry Policy that we configured (with a single line of code!)
xxxxxxxxxx
clusterConfig.RetryPolicy = retry.NewCosmosRetryPolicy(numRetries)
A Note on Dynamic Throughput Management
If your application spends most of its time operating at about 60–70% of it’s throughput, using Autoscale provisioned throughputcan help optimize your RU/s and cost usage by scaling down when not in use — you only pay for the resources that your workloads need on a per-hour basis.
So, what happens without the Retry Policy?
Deactivate the Policy to See the Difference
Stop the application (press control+c
in the terminal), set an environment variable and re-start the application:
xxxxxxxxxx
export USE_RETRY_POLICY=false
go run main.go
Before running the load test again, make a note of the number of rows in the orders table using select count(*) from ordersapp.orders;
xxxxxxxxxx
hey -t 0 -n 300 http://localhost:8080/orders
In the application logs, you will notice the same rate limiting errors. In the terminal where you ran the load test, at the end of the output summary, you will see that some the requests failed to complete successfully i.e. they returned a response other than HTTP 200
xxxxxxxxxx
...
Status code distribution:
[200] 240 responses
[429] 60 responses
Because the Retry Policy was not enforced, the application no longer re–tried the requests that failed due to rate-limiting.
Increase Provisioned Throughput
You can increase the Request Units using the Azure Portal (for example, double it to 800 RU/s
) and run the same load test
xxxxxxxxxx
hey -t 0 -n 300 http://localhost:8080/orders
You will not see the rate limiting (HTTP 429
) errors now and relatively low numbers for latency, requests per second etc.
Try increasing the number of requests (use the -n
flag) to see when the throughput threshold is breached for the application to get rate limited. As expected, all the orders will be persisted successfully (without any errors or retries)
The next section briefly covers how the custom Retry Policy works.
This is an experimental implementation, and you should write custom policies to suit fault-tolerance and performance requirements of your applications.
Behind the Scenes
CosmosRetryPolicy
adheres to the gocql.RetryPolicy interface by implementing the Attempt
and GetRetry
functions.
xxxxxxxxxx
type CosmosRetryPolicy struct {
MaxRetryCount int
FixedBackOffTimeMs int
GrowingBackOffTimeMs int
numAttempts int
}
Retry is initiated only if the number of retry attempts for that query are less than or equal to max retry config or max retry config is set to -1 (infinite retries)
xxxxxxxxxx
func (crp *CosmosRetryPolicy) Attempt(rq gocql.RetryableQuery) bool {
crp.numAttempts = rq.Attempts()
return rq.Attempts() <= crp.MaxRetryCount || crp.MaxRetryCount == -1
}
GetRetryType
function detects the type of error and in the case or a rate-limited error (HTTP 429
), it tries to extract the value for RetryAfterMs
field (from the error message) and uses that to sleep before retrying the query.
xxxxxxxxxx
func (crp *CosmosRetryPolicy) GetRetryType(err error) gocql.RetryType {
switch err.(type) {
default:
retryAfterMs := crp.getRetryAfterMs(err.Error())
if retryAfterMs == -1 {
return gocql.Rethrow
}
time.Sleep(retryAfterMs)
return gocql.Retry
//other case statements have been omitted for brevity
}
Azure Cosmos DB provides you the flexibility to not only configure and adjust your throughput requirements using a variety of ways but also provides the basic primitive that allows applications to handle rate limiting errors, thereby making them robust and fault-tolerant. This blog post demonstrated how you can do this for Go applications, but the concepts are applicable to any language and its respective CQL compatible driver that you choose for working with the Cassandra API for Azure Cosmos DB.
To Learn More:
Check out some of these resources from the official documentation:
- Use cases and benefits of Autoscale provisioned throughput
- Details of the Cassandra API support in Azure Cosmos DB
- Get up and running with a Go application and Cassandra API for Azure Cosmos DB
- Frequently asked questions about the Cassandra API in Azure Cosmos DB
- Request Units concepts