Optimizing Data Repositories Usage in Java Multi-Threaded Applications

Java multi-threaded applications

Optimize your data repositories for Java multi-threaded applications.

Data repositories are often the bottleneck of highly demanding systems where the number of queries being executed is extremely large. DelayedBatchExecutor is a component used to decrease the number of required queries by batching them in Java multi-threaded applications.

You may also like: Java Concurrency in Depth, Part 1

n Queries of 1 Parameter Vs. 1 Query of n Parameters

Let's assume we have a Java application that executes a query to a relational database to retrieve a Product entity (row) given its unique identifier (id).

The query would look like something like this:

SELECT * FROM PRODUCT WHERE ID  = <productId>


Now, to retrieve n Products, there are two ways to proceed:

SELECT * FROM PRODUCT WHERE ID   = <productId1>
SELECT * FROM PRODUCT WHERE ID   = <productId2>
...
SELECT * FROM PRODUCT WHERE ID   = <productIdn>
--  Example using IN OPERATOR
SELECT * FROM PRODUCT WHERE ID IN (<productId1>, <productId2>, ..., <productIdn>)


The latter is more efficient in terms of network traffic and database server resources (CPU and Disk) because:

That is not only true for SELECT operations but also for other operations such as INSERTs, UPDATEs, and DELETEs as well, in fact, JDBC API includes batch operations for these operations.

The same applies to NoSQL repositories, most of them provide BULK operations explicitly.

DelayedBatchExecutor

Java applications, like REST microservices or asynchronous message processors, that require data retrieval from the database are usually implemented as multi-threaded applications (*1) where:

In this scenario, it is highly likely that the database executes the same query many times within a short interval of time.

If these n queries of 1 parameter were replaced with a single equivalent one with n parameters, as pointed out before, the application would use less database server and network resources.

The good news is that it can be achieved by a mechanism involving time windows as follows:

A time window is opened by the first thread that tries to execute the query, so its parameter is stored in a list and that thread is paused. The rest of the threads that execute the same query within the time window get their parameters added to the list and they are paused as well. At this point, no query has been executed on the database.

As soon as the time window is finished, or the list is full (a maximum capacity limit is previously defined), then a single query is executed with all the parameters stored in the list. Finally, once the result of this query is provided by the database, each thread receives its corresponding result and all the threads are resumed automatically.

I've built a simple and light implementation of this mechanism for myself (DelayedBatchExecutor), which is easy to use in new or existing applications. It is based on the Reactor library and it uses a Flux buffered publisher with timeout for the list of parameters.

Throughput and Latency Analysis Using DelayedBatchExecutor

Let's assume a REST microservice for Products that expose an endpoint for retrieving Product data from a database given its productId. Without DelayedBatchExecutor, if there are say 200 hits per second to the endpoint, the database executes 200 queries per second. If the endpoint were using a DelayedBatchExecutor configured with a time window of 50 ms and maximum capacity=10 parameters, the database would execute ONLY 20 queries of 10 parameters each per second at the cost of increasing the latency at most in 50 ms (*2) for each thread execution.

In other words, for the price of increasing the latency by 50 ms(*2), the database receives 10 times fewer queries per second while keeping the overall throughput of the system... Not bad!!

Other interesting configurations:

DelayedBatchExecutor in Action

Digging deeper into the Product microservice example, let's assume that for each incoming HTTP request, the controller of the microservice requires us to retrieve a Product (Java bean) giving its id, so it invokes the method:

public Product getProductById(Integer productId) of a DAO component (ProductDAO).

Let's have a look at the implementation of this DAO without and with DelayedBatchExecutor.

Without DelayedBatchExecutor

public class ProductDAO {

 public Product getProductById(Integer id) {
   Product product= ...// execute the query SELECT * FROM PRODUCT WHERE ID=<id>
                       // using your favourite API: JDBC, JPA, Hibernate...
   return product;
 }
 ...
}


With DelayedBatchExecutor

// Singleton
public class ProductDAO {

 DelayedBatchExecutor2<Product, Integer> delayedBatchExecutorProductById = 
                    DelayedBatchExecutor.define(Duration.ofMillis(50), 10, this::retrieveProductsByIds);

 public Product getProductById(Integer id) {
   Product product = delayedBatchExecutorProductById.execute(id);
   return product;
 }

 private List<Product> retrieveProductsByIds(List<Integer> idList) {

    List<Product>  productList = ...// execute query:SELECT * FROM PRODUCT WHERE ID IN (idList.get(0), ..., idList.get(n));
                                    // using your favourite API: JDBC, JPA, Hibernate...

    // The positions of the elements of the list to return must match the ones in the parameters list.
    // For instance, the first Product of the list to be returned must be the one with
    // the Id in the first position of productIdsList and so on...
    // NOTE: null could be used as value, meaning that no Product exist for the given productId

    return productList;
}
 ...
}


First of all, an instance of DelayedBatchExecutor has to be created in the DAO, which, in this case, is delayedBatchExecutorProductById. It requires the following three parameters:

NOTE: We'll see later why delayedBatchExecutorProductById is an instance of the class DelayedBatchExecutor2<Product, Integer>

Secondly, the DAO method public Product getProductById(Integer productId) has been refactored to simply invoke the execute method of the delayedBatchExecutorProductById instance and that's it. All the "magic" is done by the DelayedBatchExecutor.

The reason why delayedBatchExecutorProductById is an instance of DelayedBatchExecutor2<Product, Integer> is because its execute method returns a Product instance and receives an Integer instance as its argument. Hence, we have: DelayedBatchExecutor2<Product, Integer>.

If the execute method requires receiving two arguments (say an Integer and a String for instance) and returning an instance of Product, then the definition would be DelayedBatchExecutor3<Product, Integer,String> and so on.

Finally, the retrieveProductsByIds method must return a List<Product> and receive a List<Integer> as a parameter.

If we were using DelayedBatchExecutor3<Product, Integer,String> , then the retrieveProductsByIds would have to be List<Product> retrieveProductsByIds(List<Integer> productIdsList, List<String> stringList)

And that's it.

Once running, the concurrent threads executing the controller logic invoke the method getProductById(Integer id) at some point, and this method will return the correspoding Product. They won't know that they may actually have been paused and resumed by the DelayedBatchExecutor.

See full example in com.vp.sample.ProductDAO.

Beyond Data Repositories

Although this article pertains to data repositories, DelayedBatchExecutor could be used in other contexts, for instance, in requests to REST microservices. Again, it is much more expensive to launch n GET requests with one parameter that 1 GET with n parameters.

DelayedBatchExecutor Improvements

I created the DelayedBatchExecutor and used it, for a while, as a way of efficiently handling the execution of multiple queries launched by concurrent threads in personal projects. I believe it could also be useful for someone else, so I've decided to make it public.

With that said, there is a lot of room for improvement and to extend the functionality offered by DelayedBatchExecutor. The most interesting would be the ability to dynamically change the parameters of the DelayedBatchExecutor (window time and max capacity) according to the particular conditions of the execution in order to minimize latency while taking advantage of queries with n parameters.

Feel free to share your thoughts and experiences in the comments section. Thanks! 

Further Reading

Intricacies of Multi-Threading in Java

Java Concurrency in Depth, Part 1

Java Patterns for Concurrency

-------

(*1) Regardless of whether the application is deployed in a cluster of nodes or not, each (JVM) node runs the application as a Java Multithreaded Application. For REST Microservices, the threads are usually managed by the underlying Servlet engine, and for Asynchronous Messaging Processors the threads are managed by their Messaging Protocol implementation.

(*2) Strictly speaking, the latency would be increased by 50 ms of the window time plus the extra time required by the database to execute the n parameters query in comparison with when using just one parameter. However, this extra time is very small in most cases (a few milliseconds) where n is not extremely large. You can easily check it in your database.

 

 

 

 

Top