Tutorial: Reactive Spring Boot, Part 7: Subscribing Multiple Consumers

Empty stadium seats

Learn more about subscribing to multiple consumers in this seventh installment on building a Reactive Spring Boot application.

This is the seventh part of our tutorial showing how to build a Reactive application using Spring Boot, Kotlin, Java, and JavaFX. The original inspiration was a 70-minute live demo.

This tutorial is a series of steps during which we will build a full Spring Boot application featuring a Kotlin back-end, a Java client, and a JavaFX user interface.

Here is everything you've missed so far:

Tutorial: Reactive Spring Boot, Part 1: Building a Kotlin REST Service

Tutorial: Reactive Spring Boot, Part 2: A REST Client for Reactive Streams

Tutorial: Reactive Spring Boot, Part 3: A JavaFX Spring Boot Application

Tutorial: Reactive Spring Boot, Part 4: A JavaFX Line Chart

Tutorial: Reactive Spring Boot, Part 5: Auto-Configuration for Shared Beans

Tutorial: Reactive Spring Boot, Part 6: Displaying Reactive Data

In the last step, we made our JavaFX line chart subscribe to prices from our Reactive Spring Boot service and display them in real-time. 

In this lesson, we update our live-updating chart to show prices for more than one stock, which means subscribing more than one consumer to our reactive stream of prices.

Below, you will find a video showing the process step-by-step and a textual walk-through (adapted from the transcript of the video) for those who prefer a written format.


Introducing a Price Subscriber

The ChartController, which manages how the data gets to the view, contains a call that subscribes the ChartController itself to the client that listens to the price service (look at the previous post to see the starting state of ChartController for this lesson). We need to change this so we can have more than one subscriber.

  1. Inside the subscribe call in ChartController, replace this with a call to the constructor of a new type, PriceSubscriber. This class will manage everything to do with consuming prices from the client.
  2. Create PriceSubscriber as an inner class, it needs to implement Consumer and consume StockPrice.
  3. (Tip: IntelliJ IDEA will offer to automatically create this for us if we press Alt+Enter on the PriceSubscriber name after we first type it.)
public void initialize() {
    // other code inside the initialize method here...

    webClientStockClient.pricesFor(symbol)
                        .subscribe(new PriceSubscriber());
}

private class PriceSubscriber implements Consumer<StockPrice> {
}


Moving Responsibilities to the PriceSubscriber

  1. Move the accept method from the ChartController and into the PriceSubscriber.
  2. Remove the “implements Consumer” declaration from the ChartController.
  3. Move seriesData from the ChartController into PriceSubscriber.
  4. Call the PriceSubscriber constructor with the symbol, and update the PriceSubscriber to accept this as a constructor parameter.
  5. (Tip: We can get IntelliJ IDEA to create the appropriate constructor if we pass symbol in, press Alt+Enter and select “Create constructor).
  6. Move the creation of the Series into the PriceSubscriber constructor and store the series as a field.
  7. Add a getter for series because the ChartController will need to get this series to add it to the chart.
  8. (Tip: IntelliJ IDEA can generate getters from fields.)
private class PriceSubscriber implements Consumer<StockPrice> {
    private Series<String, Double> series;
    private ObservableList<Data<String, Double>> seriesData = observableArrayList();

    private PriceSubscriber(String symbol) {
        series = new Series<>(symbol, seriesData);
    }

    @Override
    public void accept(StockPrice stockPrice) {
        Platform.runLater(() ->
            seriesData.add(new Data<>(valueOf(stockPrice.getTime().getSecond()),
                                      stockPrice.getPrice()))
        );
    }

    public Series<String, Double> getSeries() {
        return series;
    }
}


Let’s fix up the initialize method of ChartController.

  1. Extract the new PriceSubscriber into a local variable; we’re going to need to use this elsewhere in the method.
  2. Move this priceSubscriber near the top of the method, and inside data.add call the getter.
@Component
public class ChartController {
    @FXML
    private LineChart<String, Double> chart;
    private WebClientStockClient webClientStockClient;

    public ChartController(WebClientStockClient webClientStockClient) {
        this.webClientStockClient = webClientStockClient;
    }

    @FXML
    public void initialize() {
        String symbol = "SYMBOL";
        PriceSubscriber priceSubscriber = new PriceSubscriber(symbol);

        ObservableList<Series<String, Double>> data = observableArrayList();
        data.add(priceSubscriber.getSeries());
        chart.setData(data);

        webClientStockClient.pricesFor(symbol)
                            .subscribe(priceSubscriber);
    }

    private class PriceSubscriber implements Consumer<StockPrice> {
        // details in previous code snippet
    }
}


If we re-run the application, the chart should run the same way as it did before (assuming we still have the back-end service running as well), since we haven’t changed the behavior, just refactored the subscription code.

Adding a Second Subscriber

  1. Rename symbol to symbol1; since we’re going to have another one of these, rename the symbol itself, and let’s also rename our priceSubscriber as well.
  2. (Tip: Use the rename refactoring, Shift+F6, to do this; it makes sure all the code still compiles).
  3. Duplicate these lines and change the variable names to symbol2 and priceSubscriber2.
  4. (Tip: The keyboard shortcut for Duplicate Line is Ctrl+D/⌘D.)
  5. Add the second series to the chart to display this second set of prices.
  6. Duplicate the subscribe code and pass in the second symbol and second subscriber.
public void initialize() {
    String symbol1 = "SYMBOL1";
    PriceSubscriber priceSubscriber1 = new PriceSubscriber(symbol1);
    String symbol2 = "SYMBOL2";
    PriceSubscriber priceSubscriber2 = new PriceSubscriber(symbol2);

    ObservableList<Series<String, Double>> data = observableArrayList();
    data.add(priceSubscriber1.getSeries());
    data.add(priceSubscriber2.getSeries());
    chart.setData(data);

    webClientStockClient.pricesFor(symbol1)
                        .subscribe(priceSubscriber1);
    webClientStockClient.pricesFor(symbol2)
                        .subscribe(priceSubscriber2);
}


Now when we re-run the application, we can see two different series on the chart tracking the prices for two different stocks, like you can see at 3:30 in the video.

Code Clean-Up

Now that we have the application working the way we want, we can refactor the code if we wish.

  1. We could move the call to subscribe to the top near where we create the subscriber.
  2. We can clean up any warnings, for example making the inner class static.
public void initialize() {
    String symbol1 = "SYMBOL1";
    PriceSubscriber priceSubscriber1 = new PriceSubscriber(symbol1);
    webClientStockClient.pricesFor(symbol1)
                        .subscribe(priceSubscriber1);

    String symbol2 = "SYMBOL2";
    PriceSubscriber priceSubscriber2 = new PriceSubscriber(symbol2);
    webClientStockClient.pricesFor(symbol2)
                        .subscribe(priceSubscriber2);

    ObservableList<Series<String, Double>> data = observableArrayList();
    data.add(priceSubscriber1.getSeries());
    data.add(priceSubscriber2.getSeries());
    chart.setData(data);
}


Bonus Refactoring (Not in the Video!)

There are a number of other ways we could further refactor this to make it easier to read, to reduce duplication, or to group responsibilities differently. For example, we could move the call to subscribe to the PriceSubscriber to reduce duplication. We can even use var if we’re running a version of Java that supports it.

public class ChartController {

    // ... fields and constructor ...

    @FXML
    public void initialize() {
        var priceSubscriber1 = new PriceSubscriber("SYMBOL1", webClientStockClient);
        var priceSubscriber2 = new PriceSubscriber("SYMBOL2", webClientStockClient);

        ObservableList<Series<String, Double>> data = observableArrayList();
        data.add(priceSubscriber1.getSeries());
        data.add(priceSubscriber2.getSeries());
        chart.setData(data);
    }

    private static class PriceSubscriber implements Consumer<StockPrice> {
        private Series<String, Double> series;
        private ObservableList<Data<String, Double>> seriesData = observableArrayList();

        private PriceSubscriber(String symbol, WebClientStockClient stockClient) {
            series = new Series<>(symbol, seriesData);
            stockClient.pricesFor(symbol)
                       .subscribe(this);
        }

        @Override
        public void accept(StockPrice stockPrice) {
            Platform.runLater(() ->
              seriesData.add(new Data<>(valueOf(stockPrice.getTime().getSecond()),
                                        stockPrice.getPrice()))
            );
        }

        private Series<String, Double> getSeries() {
            return series;
        }
    }
}


Conclusion

So now, we have a JavaFX application that subscribes to more than one price from our Spring Boot service, and displays each set of price data in real-time in multiple series on a line chart.

The full code is available on GitHub.

Further Reading

Tutorial: Reactive Spring Boot, Part 2: A REST Client for Reactive Streams

Tutorial: Reactive Spring Boot, Part 3: A JavaFX Spring Boot Application

Tutorial: Reactive Spring Boot, Part 4: A JavaFX Line Chart

Tutorial: Reactive Spring Boot, Part 5: Auto-Configuration for Shared Beans

Tutorial: Reactive Spring Boot, Part 6: Displaying Reactive Data

 

 

 

 

Top