How to Split Up Synchronous and Asynchronous Parts of Your System in Java
A lot of developers say that it's very complicated to switch their applications over to asynchronous processing because they have a web app with naturally synchronous communication. In this post, I would like to introduce one way to do it using a few well-known libraries and tools to use while designing their systems. The example below is written in Java but I believe it's more about the basic principles and the same app can be re-written into any language.
Tools and libraries needed:
- Spring Boot
- RabbitMQ
Web Application
A web application is written in Spring MVC and running on Tomcat. What it does is send a string to a queue (the start of asynchronous communication) and waiting for a message in a different queue to send it back as an HTTP response.
First, we need to define several dependencies and then wait for Spring Boot to do all necessary auto-configuration magic.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.thedeanda</groupId>
<artifactId>lorem</artifactId>
</dependency>
</dependencies>
@SpringBootApplication
public class BlockingApplication {
public static void main(String[] args) {
SpringApplication.run(BlockingApplication.class, args);
}
@RestController
public static class MessageController {
private final RabbitTemplate rabbitTemplate;
public MessageController(CachingConnectionFactory connectionFactory) {
this.rabbitTemplate = new RabbitTemplate(connectionFactory);
}
@GetMapping("invoke")
public String sendMessage() {
Message response = rabbitTemplate.sendAndReceive("uppercase", null, request());
return new String(response.getBody());
}
private static Message request() {
Lorem LOREM = LoremIpsum.getInstance();
String name = LOREM.getFirstName() + " " + LOREM.getLastName();
return new Message(name.getBytes(), new MessageProperties());
}
}
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses("localhost:5672");
factory.setUsername("admin");
factory.setPassword("admin");
return factory;
}
}
Consumer Application
The second application is just the RabbitMQ consumer waiting for messages, calling an uppercase function on the consumed string, and then sending the result back to the output queue.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
@SpringBootApplication
public class ServiceApplication {
public static void main(String[] args) {
SpringApplication.run(ServiceApplication.class, args);
}
public static class MessageListener {
public String handleMessage(byte[] message) {
Random rand = new Random();
// Obtain a number between [0 - 49] + 50 = [50 - 99]
int n = rand.nextInt(50) + 50;
String content = new String(message);
try {
Thread.sleep(n);
} catch (InterruptedException e) {
e.printStackTrace();
}
return content.toUpperCase();
}
}
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses("localhost:5672");
factory.setUsername("admin");
factory.setPassword("admin");
return factory;
}
@Bean
public SimpleMessageListenerContainer serviceListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setConcurrentConsumers(20);
container.setMaxConcurrentConsumers(40);
container.setQueueNames("uppercase_messages");
container.setMessageListener(new MessageListenerAdapter(new MessageListener()));
return container;
}
}
How Does it Work Under the Hood?
After an application startup and the first invocation of our endpoint, we can see that Spring AMQP support automatically created a new reply queue and waits for a response from our service application.
2019-05-12 17:23:21.451 INFO 4574 --- [nio-8080-exec-1] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to]
2019-05-12 17:23:21.457 INFO 4574 --- [nio-8080-exec-1] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-VF-iqD9rLEuljIBstbCI1A identity=10e58093] started
If we look into a message in the consumer application, we can see that Spring automatically propagated the information about reply queue along with correlation id, which is used to pass it back to the web application to be able to pair requests and responses together.
This is actually where the magic happens. Of course, if you want to make it more sophisticated, you can have more services included in collaboration and then put a final response for web application into a different queue than the automatically generated one just with a proper correlation id. Also, don't forget to set a reasonable timeout up.
There is also one big disadvantage of this solution — application throughput. I made it intentionally so that I can follow up on this post with a further investigation of the AsyncProfiler
! But currently, we are using Tomcat as primary HTTP Server with a default of 200 threads, which means our application is not able to handle more than 200 messages concurrently because our server's threads are waiting on RabbitMQ reply queue for a response until the message comes in or the timeout occurs.
Thank you for reading this post, and stay tuned for the followup! If you would like to try this out yourself, just check out my GitHub repository. If you like being notified about new posts, then start following me on Twitter: @p_bouda.