Tutorial: Reactive Spring Boot, Part 9: Java RSocket Client

Check out the latest installment on building Reactive applications.


In this lesson, we add an RSocket client that can talk to the RSocket server we created in the last lesson.

This is the ninth part of our tutorial showing how to build a Reactive application using Spring Boot, Kotlin, Java, and JavaFX. The original inspiration was a 70-minute live demo.

Here is everything you've missed so far:

Tutorial: Reactive Spring Boot, Part 2: A REST Client for Reactive Streams

Tutorial: Reactive Spring Boot, Part 3: A JavaFX Spring Boot Application

Tutorial: Reactive Spring Boot, Part 4: A JavaFX Line Chart

Tutorial: Reactive Spring Boot, Part 5: Auto-Configuration for Shared Beans

Tutorial: Reactive Spring Boot, Part 6: Displaying Reactive Data

Tutorial: Reactive Spring Boot, Part 7: Subscribing Multiple Subscribers

This blog post contains a video showing the process step-by-step and a textual walk-through (adapted from the transcript of the video) for those who prefer a written format.


This tutorial is a series of steps during which we will build a full Spring Boot application featuring a Kotlin back-end, a Java client and a JavaFX user interface.

By now, we have an application that works end to end via Spring's WebClient. In the last lesson, we introduced a new RSocket server, which is currently running, and in this lesson, we're going to see how to create a client to connect to it.

Creating an Integration Test

As with the  WebClientStockClient, we're going to drive the RSocket client with an integration test. The test looks almost identical to the WebClientStockClientIntegrationTest.


Java
 




x
16


1
class RSocketStockClientIntegrationTest {
2
    @Test
3
    void shouldRetrieveStockPricesFromTheService() {
4
        // given
5
        RSocketStockClient rSocketStockClient = new RSocketStockClient();
6
 
7
        // when
8
        Flux<StockPrice> prices = rSocketStockClient.pricesFor("SYMBOL");
9
 
10
        // then
11
        Assertions.assertNotNull(prices);
12
        Flux<StockPrice> fivePrices = prices.take(5);
13
        Assertions.assertEquals(5, fivePrices.count().block());
14
        Assertions.assertEquals("SYMBOL", fivePrices.blockFirst().getSymbol());
15
    }
16
}



  1. Create a copy of WebClientStockClientIntegrationTest in the same test directory and rename it to  RSocketStockClientIntegrationTest.
  2. Change the  WebClientStockClient variable to an RSocketStockClient variable and rename the variable to rSocketStockClient.
  3. (Tip: Using IntelliJ IDEA's rename refactoring will change the name of this variable everywhere it's used so there's no need to find/replace).
  4. We know this isn't going to need a web client because that was only needed for the WebClient stock client. Remove the constructor argument and the field declaration.

(Note: This code will not compile yet).

Creating the RSocket Stock Client


Java
 




xxxxxxxxxx
1


 
1
public class RSocketStockClient {
2
    public Flux<StockPrice> pricesFor(String symbol) {
3
        return null;
4
    }
5
}



  1.  RSocketStockClient doesn't exist yet, so create it as an empty class.
  2. (Tip: Pressing Alt+Enter on the red RSocketStockClient code gives the option of creating this class.)
  3. The test assumes a pricesFor method, so create the missing method on  RSocketStockClient.
  4. (Tip: pressing Alt+Enter on the red pricesFor method gives the option of creating this method with the correct signature on RSocketStockClient.)

Introducing a StockClient Interface

Of course, the method declaration looks exactly the same as it does in WebClientStockClient, so it feels like a good time to introduce an interface that both clients can implement.


Java
 




xxxxxxxxxx
1


 
1
public interface StockClient {
2
    Flux<StockPrice> pricesFor(String symbol);
3
}



  1. Create a new interface called StockClient. We want the pricesFor method to appear on the interface since this is the method that has the same signature on both client classes.
  2. (Tip: use IntelliJ IDEA's Extract Interface feature on WebClientStockClient to automatically create the new interface with a pricesFor method.)


Java
 




xxxxxxxxxx
1


 
1
public class WebClientStockClient implements StockClient {
2
    // initialisation here...
3
 
4
    @Override
5
    public Flux<StockPrice> pricesFor(String symbol) {
6
        // implementation here...
7
    }
8
}



Make sure the   WebClientStockClient class has been updated to implement the new StockClient and has the @Override annotation on the pricesFormethod.


Do the same in RSocketStockClient.

Now the test is compiling run it to see it fail. It should fail on the  assertNotNull assertion since we're returning null from the pricesFor method.

Implementing the RSocket Connection

Normally with Test Driven Development, we'd take small steps to make the tests pass and have more granular tests. To keep this lesson focused, we're going to jump right in and implement the working RSocket client.

1. Add spring-boot-starter-rsocket to the pom.xml file for the stock-client module.


XML
 




x


 
1
<dependencies>
2
 <dependency>
3
  <groupId>org.springframework.boot</groupId>
4
  <artifactId>spring-boot-starter-webflux</artifactId>
5
 </dependency>
6
 <dependency>
7
  <groupId>org.springframework.boot</groupId>
8
  <artifactId>spring-boot-starter-rsocket</artifactId>
9
 </dependency>
10
 <!-- more dependencies... -->




  1. Add an RSocketRequester field called rSocketRequester to RSocketStockClient.
  2. Add a constructor parameter for it.
  3. (Tip: IntelliJ IDEA can automatically generate constructor parameters for fields.)


Java
 




xxxxxxxxxx
1


 
1
public class RSocketStockClient implements StockClient {
2
    private RSocketRequester rSocketRequester;
3
 
4
    public RSocketStockClient(RSocketRequester rSocketRequester) {
5
        this.rSocketRequester = rSocketRequester;
6
    }
7
 
8
    // pricesFor method...
9
}



  1. In pricesFor, call rSocketRequester.route. For the route, we want to use the same route we defined in the back-end RSocket service, which in our case was "stockPrices".
  2. Send the stock symbol to the server, via the data method.
  3. We expect the call to return a Flux of StockPrice, so pass StockPrice.class into the retrieveFlux method.
  4. Return the result of these calls from the pricesFor method instead of null.
Java
 




xxxxxxxxxx
1
14


 
1
public class RSocketStockClient implements StockClient {
2
    private RSocketRequester rSocketRequester;
3
 
4
    public RSocketStockClient(RSocketRequester rSocketRequester) {
5
        this.rSocketRequester = rSocketRequester;
6
    }
7
 
8
    @Override
9
    public Flux<StockPrice> pricesFor(String symbol) {
10
        return rSocketRequester.route("stockPrices")
11
                               .data(symbol)
12
                               .retrieveFlux(StockPrice.class);
13
    }
14
}




Creating the RSocketRequester

The test doesn't compile because we added an rSocketRequester constructor parameter and we don't have an instance of  RSocketRequester in our test.

  1. Create a private method called  createRSocketRequester in the test near the top where other objects are typically initialized.
  2. Create a field for an RSocketRequester.Builder. If we add the @Autowired annotation, Spring will inject an instance of this into our test.
  3. To tell Spring to manage the test we need to annotate it with @SpringBootTest.
  4. Inside createRSocketRequester, use the rSocketRequester to connect via TCP to our  RSocketServer, which is running on localhost at port 7000.
  5. Call block to wait until we're connected.


Java
 




x


 
1
@SpringBootTest
2
class RSocketStockClientIntegrationTest {
3
    @Autowired
4
    private RSocketRequester.Builder builder;
5
 
6
    private RSocketRequester createRSocketRequester() {
7
        return builder.connectTcp("localhost", 7000).block();
8
    }
9
 
10
    @Test
11
    void shouldRetrieveStockPricesFromTheService() {
12
        // implementation...
13
    }
14
}



A Passing Integration Test

We expect this test to work when we run it, but actually we're missing something important. We get an error that we're missing a SpringBootConfiguration, which might be a little puzzling. In fact, this module doesn't have a  SpringBootApplication at all, because it was designed to be library code shared among other application code, it's not an application in its own right. Let's look at one way to get our test to work.


Java
 




xxxxxxxxxx
1


 
1
@SpringBootApplication
2
public class TestApplication {
3
}



  1. Create a class  TestApplication in the same directory as the test.
  2. Annotate this with @SpringBootApplication.
  3. Re-run the integration test, everything should start up as expected, and the test should pass.
Java
 




xxxxxxxxxx
1


 
1
@SpringBootApplication
2
public class TestApplication {
3
}




Testing With StepVerifier

Since the test passes we can assume the client successfully connects to the server via RSocket, gets a Flux of  StockPrice objects, can take the first five of these and checks the first one has the correct symbol. This is a slightly simplistic approach to testing reactive applications. There are other approaches, one of which is to use the StepVerifier. Using this approach, we can code our expectations for the events that we see.


  1. Create a new StepVerifier with five prices from the prices Flux.
  2. Use expectNextMatches to check the symbol for all five prices is correct.
  3. Call verifyComplete to not only check these expectations are met, but also that there are no more StockPrice objects after these five.
  4. Delete the old assertions (the StepVerifier replaces them all).
Java
 




xxxxxxxxxx
1
17


 
1
@Test
2
void shouldRetrieveStockPricesFromTheService() {
3
    // given
4
    RSocketStockClient rSocketStockClient = new RSocketStockClient(createRSocketRequester());
5
 
6
    // when
7
    Flux<StockPrice> prices = rSocketStockClient.pricesFor("SYMBOL");
8
 
9
    // then
10
    StepVerifier.create(prices.take(5))
11
                .expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
12
                .expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
13
                .expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
14
                .expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
15
                .expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
16
                .verifyComplete();
17
}




This approach can support much more than this simple example and is also very useful for testing time-based publishers like ours.

Adding a Retry Backoff and Error Handling Strategies

We have one final piece to consider. Our  WebClientStockClient defined a retryBackoff strategy, and a simple approach for handling errors. We can actually apply exactly the same approach to our  RSocketStockClient as well.


Java
 




xxxxxxxxxx
1


 
1
public class RSocketStockClient implements StockClient {
2
    @Override
3
    public Flux<StockPrice> pricesFor(String symbol) {
4
        return null;
5
    }
6
}



  1. Copy the retryBackoff and doOnError steps from WebClientStockClient and paste into  RSocketStockClient.pricesFor.
  2. Re-run the test, it should all still pass.
Java
 




x


 
1
@Log4j2
2
public class RSocketStockClient implements StockClient {
3
    private RSocketRequester rSocketRequester;
4
 
5
    public RSocketStockClient(RSocketRequester rSocketRequester) {
6
        this.rSocketRequester = rSocketRequester;
7
    }
8
 
9
    @Override
10
    public Flux<StockPrice> pricesFor(String symbol) {
11
        return rSocketRequester.route("stockPrices")
12
                               .data(symbol)
13
                               .retrieveFlux(StockPrice.class)
14
                               .retryBackoff(5, Duration.ofSeconds(1), Duration.ofSeconds(20))
15
                               .doOnError(IOException.class, e -> log.error(e.getMessage()));
16
    }
17
}




Now, we have an RSocket server on the back end emitting stock prices and an RSocket client that can connect to it and see those prices. In the next lesson, we're going to see how to switch from using the  WebClientStockClient to using our newRSocketStockClient.

The full code is available on GitHub.

Further Reading

Kotlin Microservice With Spring Boot Tutorial

Spring Reactive Programming in Java

 

 

 

 

Top