Full-Duplex Scalable Client-Server Communication with WebSockets and Spring Boot (Part I)

Introduction

In this article, we will be covering the steps to create a Java WebSocket server, powered by Spring Boot, that communicates with Java clients and supports load balancing across multiple instances. We’ll start by describing a use case scenario, analyzing the requirements, and then we’ll proceed to choose the technology stack that suits the case. We’ll implement the code, and finally test the application to get some performance and durability benchmarks.

Use Case: A Smart Home

The first thought we had when writing this, was to go with the classic demonstration of WebSockets usage that is the chat application. But if you were to search the internet about WebSocket examples, 99% of them are about chat apps. So we needed to come up with another idea, one that would be more fascinating and relevant to today’s technology, and would cover both point-to-point communication, as well as one-to-many broadcasting. And that is a smart home device network. 

In our scenario, all the smart devices have a persistent connection to a server. The server is responsible for sending commands to specific devices, such as turning on the living room lights, or enabling the alarm. It can also receive information from devices. For example, there can be a temperature sensor that takes readings every minute or an oven that sends alerts if the temperature is too high. Finally, the server may also issue commands to all devices, such as turn on/off.

The server also exposes a REST API, to be used by the user to collect information and control the devices.

The server maintains a persistent bi-directional communication to the devices

From now on, we’ll refer to the smart devices as clients.  So let’s write down the requirements:

Technology Stack

Now that the requirements are established, we’ll describe the tech stack we used to implement our solution. For the server, we’ll be using the microservices pattern. Each microservice (MS) is written in Java 11, using the Spring Boot framework and more specifically the Web on Servlet stack. The communication with the clients is handled by the Device Management MS. The Control MS exposes the REST API, and communicates with the Device Mgmt MS using an ActiveMQ Artemis message broker. For incoming traffic routing, service discovery, and load balancing we’ll be using Spring Cloud Gateway and Eureka.

Tech stack and microservices

As mentioned, the server and clients communicate via WebSocket. This protocol allows for persistent, full-duplex communication as well as detection of dropped connections by both the server and the clients.

Because WebSocket is a low-level protocol that doesn’t specify the structure of the transmitted messages, we also need to implement a higher-level protocol, that would be the “contract” between the sender and the receiver. For that, we will be using STOMP (Simple Text Oriented Message Protocol). We will also configure Spring to work with a dedicated STOMP broker for the actual broadcasting of messages. A simple approach would be to use a simple in-memory Broker. This approach falls short, though, when you scale up and add additional servers. Users connected to different servers would have no way of communicating or getting updates pushed to them for something that's happened on another server. Therefore, we will use an External Broker (ActiveMQ Artemis). For more details see here: Spring Framework Reference - External Broker

Let’s Write Some Code: Java WebSocket Server

The first part consists of writing the code for the server-side part. In this article, we are focusing mostly on the Device Management MS and Control MS, as there are many tutorials on how to set up a Eureka/gateway server.

Configuring Gateway/Eureka

The Eureka server runs on port 8761. The routing configuration of the gateway service application.yml:

YAML
 




xxxxxxxxxx
1
28


 
1
spring:
2
 application:
3
   name: gateway-server
4
 cloud:
5
   gateway:
6
     discovery:
7
       locator:
8
         enabled: true
9
     routes:
10
       - id: device-management-ms
11
         uri: lb://device-management-service
12
         predicates:
13
           - Path=/device-management-service/**
14
         filters:
15
           - StripPrefix=1
16
       - id: control-ms
17
         uri: lb://control-service
18
         predicates:
19
           - Path=/control-service/**
20
         filters:
21
           - StripPrefix=1
22
     globalcors:
23
       cors-configurations:
24
         '[/**]':
25
           allowedOrigins: ["*"]
26
           allowedMethods: ["*"]
27
           allowedHeaders: "*"
28
           allowCredentials: true


Creating the Device Management MS

This microservice will play the role of the WebSocket Server. At first, we have to add the following dependencies to the  pom.xml

XML
 




xxxxxxxxxx
1
26


1
<dependency>
2
   <groupId>org.springframework.boot</groupId>
3
   <artifactId>spring-boot-starter-websocket</artifactId>
4
</dependency>
5
<dependency>
6
   <groupId>org.springframework.boot</groupId>
7
   <artifactId>spring-boot-starter-artemis</artifactId>
8
</dependency>
9
<dependency>
10
   <groupId>org.springframework.boot</groupId>
11
   <artifactId>spring-boot-starter-security</artifactId>
12
</dependency>
13
<dependency>
14
   <groupId>org.springframework.security</groupId>
15
   <artifactId>spring-security-messaging</artifactId>
16
</dependency>
17
<dependency>
18
   <groupId>org.springframework.boot</groupId>
19
   <artifactId>spring-boot-starter-webflux</artifactId>
20
</dependency>
21
<dependency>
22
   <groupId>org.springframework.cloud</groupId>
23
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
24
</dependency>


WebSocket Configuration

Then we need to create a configuration class for WebSockets. This class will be annotated with the  @EnableWebSocketMessageBroker annotation and will  implement the interface: org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer

Let's see what we need to set within the configureMessageBroker method. We start by enabling a STOMP broker relay and setting the path and credentials for the ActiveMQ Artemis:

WebSocketConfig.java

Java
 




x


 
1
@Configuration
2
@EnableWebSocketMessageBroker
3
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
4
 
          
5
    @Value("${broker.relay.host}")
6
    private String brokerRelayHost;
7
 
          
8
    @Value("${broker.relay.user}")
9
    private String brokerRelayUser;
10
 
          
11
    @Value("${broker.relay.password}")
12
    private String brokerRelayPass;
13
 
          
14
    @Override
15
    public void configureMessageBroker(MessageBrokerRegistry config) {
16
        config.enableStompBrokerRelay("/queue", "/topic")
17
                .setRelayHost(brokerRelayHost)
18
                .setClientLogin(brokerRelayUser)
19
                .setClientPasscode(brokerRelayPass)
20
                .setSystemLogin(brokerRelayUser)
21
                .setSystemPasscode(brokerRelayPass)
22
                .setUserDestinationBroadcast("/topic/unresolved-user")
23
                .setUserRegistryBroadcast("/topic/log-user-registry");
24
        config.setApplicationDestinationPrefixes("/cdm");
25
    }
26
  
27
    @Override
28
    public void registerStompEndpoints(StompEndpointRegistry registry) {
29
        registry.addEndpoint("/websocket").setAllowedOrigins("*");
30
        registry.addEndpoint("/websocket").withSockJS();
31
    }
32
}


We continue by configuring the destination prefixes of the queues/topics that are used for WebSocket communication. When a client initiates a connection, it subscribes to a queue (or topic) to receive messages from the server. In our case, for point-to-point messaging the client would subscribe to a destination like “/user/queue/device”. Spring automatically recognizes the prefix “/user/”  and creates a unique queue belonging to this specific WebSocket session. An example would be “/queue/device-user1234-asdf-5678-asdf”, where the suffix 1234-asdf-5678-asdf  is the server-generated session-id belonging to this client. We’ll describe how this session-id is associated with the username that the client provides, during the authentication phase, later on.

Behind the scenes, Spring keeps these associations in a "User Registry". There are 2 options:

We go with the second option by setting the following properties of the  MessageBrokerRegistry:

Java
 




xxxxxxxxxx
1


1
.setUserDestinationBroadcast("/topic/unresolved-user")
2
.setUserRegistryBroadcast("/topic/log-user-registry");


These create two "special" topics on the message broker and must be set if you intend to have multiple instances of the server running. Let’s explain why:

You can read more in the Spring’s documentation here: websocket-stomp-user-destination

WebSocket Controller

A typical controller class to handle incoming messages from the clients. We can add a java.security.Principal  argument to the method and Spring will inject this parameter to be used by our application. The principal information comes from the authorization performed during the HTTP handshake.

WebsocketController.java

Java
 




xxxxxxxxxx
1
11


1
public class WebsocketController {
2
 
          
3
   private final DeviceMgmtService deviceMgmtService;
4
 
          
5
   @MessageMapping("/device")
6
   public void handleMessageFromDevice(@Payload ResponseMessage message, Principal principal) {
7
       log.info("Received message from device: {}", principal.getName());
8
       deviceMgmtService.handleMessageFromDevice(message);
9
   }
10
}


Spring Security Configuration

We are now going to secure the WebSocket endpoints and enable username-password authentication. A simplistic configuration is shown below:

SecurityConfig.java

Java
 




xxxxxxxxxx
1
25


 
1
@Configuration
2
@EnableWebSecurity
3
public class SecurityConfig extends WebSecurityConfigurerAdapter {
4
 
          
5
 @Autowired
6
 private CustomAuthenticationProvider authProvider;
7
 
          
8
 @Override
9
 protected void configure(AuthenticationManagerBuilder auth) {
10
   auth.authenticationProvider(authProvider);
11
 }
12
 
          
13
 @Override
14
 protected void configure(HttpSecurity http) throws Exception {
15
   http.csrf().disable().authorizeRequests()
16
       .antMatchers("/", "/index", "/server/**", "/websocket", "/websocket/**").permitAll()
17
       .and()
18
       .authorizeRequests()
19
       .anyRequest().authenticated()
20
       .and().httpBasic();
21
 }
22
}


In our case, we are using Basic Authentication with a Custom Authentication Provider. You may use a custom authentication provider, when you have to authenticate with third-party systems or perform other non-trivial tasks during authentication. In our case, the authentication provider looks like this:

CustomAuthenticationProvider.java

Java
 




xxxxxxxxxx
1
23


1
@Component
2
@RequiredArgsConstructor
3
public class CustomAuthenticationProvider implements AuthenticationProvider {
4
 
          
5
 @Override
6
 public Authentication authenticate(Authentication authentication) throws AuthenticationException {
7
   String username = authentication.getName();
8
   String password = authentication.getCredentials().toString();
9
   /*
10
    * Insert custom authentication logic here
11
    */
12
   return new UsernamePasswordAuthenticationToken(username, password, new ArrayList<>());
13
 }
14
 
          
15
 @Override
16
 public boolean supports(Class<?> authentication) {
17
   return authentication.equals(UsernamePasswordAuthenticationToken.class);
18
 }
19
}


You can refer to Authentication and Token Authentication for a full list of options and capabilities.

Remember the auto-generated session-id we mentioned earlier? Behind the scenes, Spring associates the username provided during the authentication, with this id. Therefore, you can send a message to any client by simply referring to its username. 

Message-Driven Communication

So far, we’ve configured the WebSocket server to work with ActiveMQ, accept messages at the “/app/device” endpoint, and support authentication with Spring Security. The final step is to configure the communication between the Device Management MS and Control MS. 

The Device Management MS must consume the events sent to a specific queue from Control MS. Note that only one instance of the service must consume the event, to avoid repetition. This is known as competing consumers pattern and it’s implemented by ActiveMQ by default.

Device Management MS

The ActionEvent represents the messages that are sent to the queue. We add the following to the  ArtemisConfig class for listening to the queue and converting the messages using Jackson:

ArtemisConfig.java

Java
 




xxxxxxxxxx
1
35


 
1
@Bean
2
public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
3
  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
4
  connectionFactory.setUser(username);
5
  connectionFactory.setPassword(password);
6
  connectionFactory.setConnectionTTL(120000L);
7
  return connectionFactory;
8
}
9
 
          
10
@Bean
11
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
12
  DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
13
  factory.setConnectionFactory(receiverActiveMQConnectionFactory());
14
  factory.setMessageConverter(jacksonJmsMessageConverter());
15
  factory.setConcurrency("3-10");
16
  return factory;
17
}
18
 
          
19
@Bean
20
public MessageConverter jacksonJmsMessageConverter() {
21
  MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
22
  converter.setTargetType(MessageType.TEXT);
23
  HashMap<String, Class<?>> typeIdMappings = new HashMap<>();
24
  typeIdMappings.put(ActionEvent.class.getSimpleName(), ActionEvent.class);
25
  converter.setTypeIdMappings(typeIdMappings);
26
  converter.setTypeIdPropertyName("_type");
27
  return converter;
28
}
29
 
          
30
@Bean
31
public CachingConnectionFactory cachingConnectionFactory() {
32
  return new CachingConnectionFactory(receiverActiveMQConnectionFactory());
33
}


Finally, we configure a  @JmsListener to the queue, which triggers when a message is consumed. 

AMQMessagingService.java

Java
 




xxxxxxxxxx
1


1
@JmsListener(destination = "${broker.queue}")
2
public void receiveMessage(ActionEvent message) {
3
   log.info("Received event from Control MS");
4
   deviceMgmtService.sendMessageToDevice(message);
5
}


The method calls the corresponding service that creates a STOMP message and delivers it over the WebSocket to the client. You can send a STOMP message to a client from any point inside the application by calling convertAndSendToUser.

DeviceMgmtServiceImpl.java

Java
 




xxxxxxxxxx
1
19


1
@Override
2
public void sendMessageToDevice(ActionEvent event) {
3
   SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
4
   headerAccessor.setLeaveMutable(true);
5
 
          
6
   String client = event.getDestination();
7
 
          
8
   /*
9
    * Create a dummy message to send to client
10
    */
11
   CommandMessage message = CommandMessage.builder()
12
           .time(LocalDateTime.now())
13
           .command(CommandEnum.ACTIVATE.toString())
14
           .build();
15
 
          
16
   log.info("Sending message to device: {}", client);
17
   simpMessagingTemplate.convertAndSendToUser(client, "/queue/device", message, headerAccessor.getMessageHeaders());
18
}


Notice that client is the username (provided at the authentication step) of the client we want to send a message to, and the “/queue/device” is the destination that the client has subscribed to. The message is the object that contains the content of the STOMP message.

Spring has some out-of-the-box annotations like  @SendToUser, or  @SendTo  that are meant to be used on a  @Controller  method, when invoked by a client. For example the  @SendToUser  annotation can be used to reply to the same client that hit the WebSocket endpoint. But in this case, the server sends a message at will (rather than replying to a client), so the annotations cannot be used.

This covers all of the key highlights of the Device Management MS development.

We will now make a short reference to the development of Control MS, but you can skip this part and jump right into the Java WebSocket Client, if you prefer.

Control MS

Creating the control microservice is pretty straightforward. The only remarks are the Artemis configuration class and the definition of a  jmsTemplate Bean which uses Jackson for message converting.

ArtemisConfig.java

Java
 




xxxxxxxxxx
1
33


 
1
@Bean
2
public ActiveMQConnectionFactory senderActiveMQConnectionFactory(){
3
  ActiveMQConnectionFactory connectionFactory =  new ActiveMQConnectionFactory(brokerUrl);
4
  connectionFactory.setUser(username);
5
  connectionFactory.setPassword(password);
6
  connectionFactory.setConnectionTTL(120000L);
7
  return connectionFactory;
8
}
9
 
          
10
@Bean
11
public MessageConverter jacksonJmsMessageConverter() {
12
  MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
13
  converter.setTargetType(MessageType.TEXT);
14
  HashMap<String, Class<?>> typeIdMappings = new HashMap<>();
15
  typeIdMappings.put(ActionEvent.class.getSimpleName(), ActionEvent.class);
16
  converter.setTypeIdMappings(typeIdMappings);
17
  converter.setTypeIdPropertyName("_type");
18
  return converter;
19
}
20
 
          
21
@Bean
22
public CachingConnectionFactory cachingConnectionFactory(){
23
  return new CachingConnectionFactory(senderActiveMQConnectionFactory());
24
}
25
 
          
26
@Bean("jmsTemplate")
27
public JmsTemplate jmsTemplate(){
28
  JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
29
  jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
30
  return jmsTemplate;
31
}


We also create the REST endpoints that the users "hit" to talk to the smart devices. Of course, in reality, there will be a browser and/or mobile-based application for the users to interact with. For the sake of simplicity, we will use a single dummy endpoint for the moment which will be invoked via curl or a tool like Postman:

DeviceController.java

Java
 




x





1
@RestController
2
@RequiredArgsConstructor
3
@Slf4j
4
@RequestMapping("/device")
5
public class DeviceController {
6
 
          
7
 private final AMQMessagingService messagingService;
8
 
          
9
 @PostMapping
10
 public ResponseEntity sendCommandToDevice(@RequestBody ActionDTO actionDTO) {
11
  log.info("Received POST request with body: {}", actionDTO.toString());
12
 
          
13
  /* Create Action event*/
14
  ActionEvent event = ActionEvent.builder()
15
    .destination(actionDTO.getDestination())
16
    .command(actionDTO.getCommand())
17
    .args(actionDTO.getArgs())
18
    .id(UUID.randomUUID())
19
    .build();
20
 
          
21
  messagingService.send(event);
22
  return ResponseEntity.ok().build();
23
 }
24
}


The  ActionDTO contains the following fields: 

Let’s Write Some Code: Java WebSocket Client

The server is ready to accept WebSocket connections at: ws://gateway.server:8000/device-management-service/websocket

At first, our client should do the following:

  1. Connect to the URL above using its authentication credentials

  2. Subscribe to “/user/queue/device” to receive personal messages

  3. Subscribe to “/topic/messages/outgoing” to receive broadcasts

Adding Dependencies

Ensure that the following dependencies are added to the pom.xml

XML
 




x
9


1
<dependency>
2
   <groupId>org.springframework.boot</groupId>
3
   <artifactId>spring-boot-starter-web</artifactId>
4
</dependency>
5
<dependency>
6
   <groupId>org.springframework.boot</groupId>
7
   <artifactId>spring-boot-starter-websocket</artifactId>
8
</dependency>


Configuring WebSocket Client

First, we need to create an  org.springframework.web.socket.messaging.WebSocketStompClient Bean. The key points here are to set the message converter to Jackson (because the content of the exchanged messages is in JSON) and set a TaskScheduler. This scheduler is used to send heartbeat messages to the server (in times of inactivity), in order to keep the connection alive.

 WebSocketConfig.java

Java
 




x


 
1
@Bean
2
public WebSocketStompClient stompClient() {
3
   WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
4
   List<Transport> transports = 
5
     List.of(new WebSocketTransport(simpleWebSocketClient));
6
   SockJsClient sockJsClient = new SockJsClient(transports);
7
   WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
8
   stompClient.setMessageConverter(new MappingJackson2MessageConverter());
9
   ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
10
   scheduler.initialize();
11
   stompClient.setTaskScheduler(scheduler);
12
   return stompClient;
13
}


Note about heartbeats: As of now (April 2020) there is an open issue in Spring https://github.com/spring-projects/spring-framework/issues/22822, related to heartbeats. As we mentioned, the client periodically sends heartbeat messages to the server, when it detects channel inactivity. Therefore, when a client is sending actual messages, the channel is considered active and no heartbeat is sent during this time. This poses a problem when the messages are handled by the server like this:

Java
 




xxxxxxxxxx
1


 
1
@MessageMapping("/device")
2
public void handleMessageFromDevice(@Payload ResponseMessage message, Principal principal) {
3
   log.info("Received message from device: {}", principal.getName());
4
}


As you can see this function doesn’t return anything to the client. Therefore, the message broker is not triggered so it won’t know that a message has been received. This may cause a timeout and trigger client disconnect by the broker.  There are two possible ways to fix this is:

  1. Always send ACK messages back to the client, which utilizes the queue and keeps the connection alive.

  2. Client-side, don’t use the default  taskScheduler. Instead, create a special server endpoint just for ping-pong messages and a client task scheduler that talks to this endpoint.

Re-connection Handling

The initSession method demonstrated below, returns a  StompSession object and is used to initiate a WebSocket connection over STOMP with the server.

ConnectionHandler.java

Java
 




xxxxxxxxxx
1
19


1
@Value("${ws.server.host}")
2
private String serverURL;
3
 
          
4
private final StompSessionHandler sessionHandler;
5
 
          
6
@Retryable(value = {Exception.class},
7
       maxAttempts = 10,
8
       backoff = @Backoff(delay=1000, multiplier = 2, random = true))
9
public StompSession initSession(String username, String password) throws Exception {
10
   String plainCredentials = username + ":" + password;
11
   String base64Credentials = Base64.getEncoder().encodeToString(plainCredentials.getBytes());
12
 
          
13
   final WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
14
   headers.add("Authorization", "Basic " + base64Credentials);
15
 
          
16
   return stompClient().connect(serverURL, headers, sessionHandler).get();
17
}


The method takes as arguments the username and password of the device, adds the credentials in the header (Base64 encoded) and initializes the connection to the server. Notice the  @Retryable  annotation. This is part of the Spring Retry utility and it’s used to rerun the method, should an exception be thrown. This annotation is not mandatory but adds to the stability of our solution. This particular policy will try to connect again ten times, and the delay between retries follows an exponential rule. To include Spring Retry add these dependencies and add  @EnableRetry  above any configuration class:

pom.xml

XML
 




xxxxxxxxxx
1
11
9


1
<dependency>
2
   <groupId>org.springframework.retry</groupId>
3
   <artifactId>spring-retry</artifactId>
4
   <version>1.1.5.RELEASE</version>
5
</dependency>
6
<dependency>
7
   <groupId>org.springframework</groupId>
8
   <artifactId>spring-aspects</artifactId>
9
</dependency>


Furthermore, we need to configure a Bean of type org.springframework.messaging.simp.stomp.StompSessionHandler. In this Bean we can implement the following methods:

StompSessionHandlerImpl.java

Java
 




xxxxxxxxxx
1
33


 
1
@Override
2
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
3
   this.isConnected.set(true);
4
   session.subscribe("/topic/messages/outgoing", this);
5
   session.subscribe("/user/queue/device", this);
6
}
7
 
          
8
@Override
9
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
10
   exception.printStackTrace();
11
}
12
 
          
13
@Override
14
public void handleTransportError(StompSession session, Throwable exception) {
15
   if (this.isConnected.get()) {
16
       this.isConnected.set(false);
17
       webSocketService.initSession("some_device", "some_pass");
18
   }
19
}
20
 
          
21
@Override
22
public Type getPayloadType(StompHeaders headers) {
23
   return CommandMessage.class;
24
}
25
 
          
26
@Override
27
public void handleFrame(StompHeaders headers, Object payload) {
28
   if (payload == null) return;
29
   CommandMessage msg = (CommandMessage) payload;
30
   log.info("Received: " + msg.getCommand()+ ", time: " + msg.getTime());
31
}


The callback method handleTransportError allows us to create a custom re-connection logic. The flow is simple: when an error occurs, call the initSession method to reinitialize a connection. Because  handleTransportError can be invoked multiple times, we need an extra variable that acts as a lock and ensures that  initSession will be called only once:

Java
 




xxxxxxxxxx
1


 
1
private AtomicBoolean isConnected;


Time to "Git" It a Try!

The source code can be found in here smart-home-websockets along with a README file describing how to start the whole thing using Docker and perform basic testing on your laptop. For a more extended testing, stay tuned for part II where we deploy the microservices in GCE (Google Compute Engine) and use Locust for simulating various traffic scenarios.

Stay Tuned!

Stay tuned for the second part where we will cover the following topics:

 

 

 

 

Top