Tutorial: Reactive Spring Boot, Part 9: Java RSocket Client
In this lesson, we add an RSocket client that can talk to the RSocket server we created in the last lesson.
This is the ninth part of our tutorial showing how to build a Reactive application using Spring Boot, Kotlin, Java, and JavaFX. The original inspiration was a 70-minute live demo.
Here is everything you've missed so far:
Tutorial: Reactive Spring Boot, Part 2: A REST Client for Reactive Streams
Tutorial: Reactive Spring Boot, Part 3: A JavaFX Spring Boot Application
Tutorial: Reactive Spring Boot, Part 4: A JavaFX Line Chart
Tutorial: Reactive Spring Boot, Part 5: Auto-Configuration for Shared Beans
Tutorial: Reactive Spring Boot, Part 6: Displaying Reactive Data
Tutorial: Reactive Spring Boot, Part 7: Subscribing Multiple Subscribers
This blog post contains a video showing the process step-by-step and a textual walk-through (adapted from the transcript of the video) for those who prefer a written format.
This tutorial is a series of steps during which we will build a full Spring Boot application featuring a Kotlin back-end, a Java client and a JavaFX user interface.
By now, we have an application that works end to end via Spring's WebClient. In the last lesson, we introduced a new RSocket server, which is currently running, and in this lesson, we're going to see how to create a client to connect to it.
Creating an Integration Test
As with the WebClientStockClient
, we're going to drive the RSocket client with an integration test. The test looks almost identical to the WebClientStockClientIntegrationTest
.
class RSocketStockClientIntegrationTest {
void shouldRetrieveStockPricesFromTheService() {
// given
RSocketStockClient rSocketStockClient = new RSocketStockClient();
// when
Flux<StockPrice> prices = rSocketStockClient.pricesFor("SYMBOL");
// then
Assertions.assertNotNull(prices);
Flux<StockPrice> fivePrices = prices.take(5);
Assertions.assertEquals(5, fivePrices.count().block());
Assertions.assertEquals("SYMBOL", fivePrices.blockFirst().getSymbol());
}
}
- Create a copy of WebClientStockClientIntegrationTest in the same test directory and rename it to
RSocketStockClientIntegrationTest
. - Change the
WebClientStockClient
variable to anRSocketStockClient
variable and rename the variable torSocketStockClient
. - (Tip: Using IntelliJ IDEA's rename refactoring will change the name of this variable everywhere it's used so there's no need to find/replace).
- We know this isn't going to need a web client because that was only needed for the
WebClient
stock client. Remove the constructor argument and the field declaration.
(Note: This code will not compile yet).
Creating the RSocket Stock Client
xxxxxxxxxx
public class RSocketStockClient {
public Flux<StockPrice> pricesFor(String symbol) {
return null;
}
}
-
RSocketStockClient
doesn't exist yet, so create it as an empty class. - (Tip: Pressing Alt+Enter on the red
RSocketStockClient
code gives the option of creating this class.) - The test assumes a
pricesFor
method, so create the missing method onRSocketStockClient
. - (Tip: pressing Alt+Enter on the red
pricesFor
method gives the option of creating this method with the correct signature onRSocketStockClient
.)
Introducing a StockClient Interface
Of course, the method declaration looks exactly the same as it does in WebClientStockClient
, so it feels like a good time to introduce an interface that both clients can implement.
xxxxxxxxxx
public interface StockClient {
Flux<StockPrice> pricesFor(String symbol);
}
- Create a new interface called
StockClient
. We want thepricesFor
method to appear on the interface since this is the method that has the same signature on both client classes. - (Tip: use IntelliJ IDEA's Extract Interface feature on
WebClientStockClient
to automatically create the new interface with apricesFor
method.)
xxxxxxxxxx
public class WebClientStockClient implements StockClient {
// initialisation here...
public Flux<StockPrice> pricesFor(String symbol) {
// implementation here...
}
}
WebClientStockClient
class has been updated to implement the new
StockClient
and has the @Override annotation on the
pricesFor
method.
RSocketStockClient
.
Now the test is compiling run it to see it fail. It should fail on the assertNotNull
assertion since we're returning null from the pricesFor
method.
Implementing the RSocket Connection
Normally with Test Driven Development, we'd take small steps to make the tests pass and have more granular tests. To keep this lesson focused, we're going to jump right in and implement the working RSocket client.
1. Add spring-boot-starter-rsocket to the pom.xml file for the stock-client module.
x
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<!-- more dependencies... -->
- Add an RSocketRequester field called
rSocketRequester
toRSocketStockClient
. - Add a constructor parameter for it.
- (Tip: IntelliJ IDEA can automatically generate constructor parameters for fields.)
xxxxxxxxxx
public class RSocketStockClient implements StockClient {
private RSocketRequester rSocketRequester;
public RSocketStockClient(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
// pricesFor method...
}
- In
pricesFor
, callrSocketRequester.route
. For the route, we want to use the same route we defined in the back-end RSocket service, which in our case was "stockPrices
". - Send the stock symbol to the server, via the data method.
- We expect the call to return a Flux of
StockPrice
, so passStockPrice.class
into the retrieveFlux method. - Return the result of these calls from the
pricesFor
method instead of null.
xxxxxxxxxx
public class RSocketStockClient implements StockClient {
private RSocketRequester rSocketRequester;
public RSocketStockClient(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
public Flux<StockPrice> pricesFor(String symbol) {
return rSocketRequester.route("stockPrices")
.data(symbol)
.retrieveFlux(StockPrice.class);
}
}
Creating the RSocketRequester
The test doesn't compile because we added an rSocketRequester
constructor parameter and we don't have an instance of RSocketRequester
in our test.
- Create a private method called
createRSocketRequester
in the test near the top where other objects are typically initialized. - Create a field for an RSocketRequester.Builder. If we add the @Autowired annotation, Spring will inject an instance of this into our test.
- To tell Spring to manage the test we need to annotate it with @SpringBootTest.
- Inside createRSocketRequester, use the
rSocketRequester
to connect via TCP to ourRSocketServer
, which is running on localhost at port 7000. - Call block to wait until we're connected.
x
class RSocketStockClientIntegrationTest {
private RSocketRequester.Builder builder;
private RSocketRequester createRSocketRequester() {
return builder.connectTcp("localhost", 7000).block();
}
void shouldRetrieveStockPricesFromTheService() {
// implementation...
}
}
A Passing Integration Test
We expect this test to work when we run it, but actually we're missing something important. We get an error that we're missing a SpringBootConfiguration, which might be a little puzzling. In fact, this module doesn't have a SpringBootApplication
at all, because it was designed to be library code shared among other application code, it's not an application in its own right. Let's look at one way to get our test to work.
xxxxxxxxxx
public class TestApplication {
}
- Create a class
TestApplication
in the same directory as the test. - Annotate this with @SpringBootApplication.
- Re-run the integration test, everything should start up as expected, and the test should pass.
xxxxxxxxxx
public class TestApplication {
}
Testing With StepVerifier
Since the test passes we can assume the client successfully connects to the server via RSocket, gets a Flux of StockPrice
objects, can take the first five of these and checks the first one has the correct symbol. This is a slightly simplistic approach to testing reactive applications. There are other approaches, one of which is to use the StepVerifier. Using this approach, we can code our expectations for the events that we see.
- Create a new
StepVerifier
with five prices from the prices Flux. - Use expectNextMatches to check the symbol for all five prices is correct.
- Call verifyComplete to not only check these expectations are met, but also that there are no more StockPrice objects after these five.
- Delete the old assertions (the
StepVerifier
replaces them all).
xxxxxxxxxx
void shouldRetrieveStockPricesFromTheService() {
// given
RSocketStockClient rSocketStockClient = new RSocketStockClient(createRSocketRequester());
// when
Flux<StockPrice> prices = rSocketStockClient.pricesFor("SYMBOL");
// then
StepVerifier.create(prices.take(5))
.expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
.expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
.expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
.expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
.expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
.verifyComplete();
}
This approach can support much more than this simple example and is also very useful for testing time-based publishers like ours.
Adding a Retry Backoff and Error Handling Strategies
We have one final piece to consider. Our WebClientStockClient
defined a retryBackoff
strategy, and a simple approach for handling errors. We can actually apply exactly the same approach to our RSocketStockClient
as well.
xxxxxxxxxx
public class RSocketStockClient implements StockClient {
public Flux<StockPrice> pricesFor(String symbol) {
return null;
}
}
- Copy the
retryBackoff
anddoOnError
steps fromWebClientStockClient
and paste intoRSocketStockClient.pricesFor
. - Re-run the test, it should all still pass.
x
public class RSocketStockClient implements StockClient {
private RSocketRequester rSocketRequester;
public RSocketStockClient(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
public Flux<StockPrice> pricesFor(String symbol) {
return rSocketRequester.route("stockPrices")
.data(symbol)
.retrieveFlux(StockPrice.class)
.retryBackoff(5, Duration.ofSeconds(1), Duration.ofSeconds(20))
.doOnError(IOException.class, e -> log.error(e.getMessage()));
}
}
Now, we have an RSocket server on the back end emitting stock prices and an RSocket client that can connect to it and see those prices. In the next lesson, we're going to see how to switch from using the WebClientStockClient
to using our newRSocketStockClient
.
The full code is available on GitHub.
Further Reading
Kotlin Microservice With Spring Boot Tutorial
Spring Reactive Programming in Java