Spring Cloud Stream: A Brief Guide

This article will teach you how to create a Spring Cloud Stream app that works with a messaging service (like Apache Kafka, RabbitMQ, etc.), You’ll be using functional, reactive code—leveraging Spring’s WebFlux—and Spring Cloud Stream’s functional building model,

Today, you are going to make an app that has a publisher, a processor, and a consumer. It will leverage two topics to publish a stream of integers, process the integers to calculate a running total, and consume the processed data. In the beginning, these messages will be simple (strings/integers)—but later you’ll see how Spring Cloud Stream makes mapping POJOs (Plain Old Java Objects) to messages using JSON mapping simple!

If all of the above jargon made sense to you, go ahead and skip the requirements section below. If you need a little more of a deep dive, the next few paragraphs will introduce the technologies to you.

What is Spring Cloud Stream?

Spring describes Spring Cloud Stream as “a framework for building highly scalable event-driven microservices connected with shared messaging systems.” This means that the Spring team created Spring Cloud Stream to work with messaging services like RabbitMQ or Apache Kafka. It works in distributed microservices that respond to streams of incoming data (data being the “events” in “event-driven”). It’s built on top of Spring Boot, works well with Spring MVC or Spring WebFlux, and can be used to create highly scalable messaging and stream processing applications.

RabbitMQ and Apache Kafka are messaging applications. They distribute a stream of messages from producers to consumers. They both easily support the simple publish-subscribe (pub-sub) strategy that we will be using in this tutorial; however, they are divergent technologies with different strengths and weaknesses. Very, very briefly: RabbitMQ is a great general-purpose message broker that pushes data from the messaging service to the consumers. Apache Kafka uses a hybrid model (combining aspects of pub-sub and message queuing) that allows you to scale and distribute processing. It excels in stream processing and allows for message storage and replay.

The simple messaging strategy you’re going to use here is called pub-sub, or publish and subscribe. A consumer subscribes to a topic (generally just identified by a text name). Publishers push messages to the topic, and the platform sends the message to all subscribed consumers. A processor subscribes to a topic and republishes the transformed data to a new topic.

Messaging network topology

In this tutorial, you will use RabbitMQ in a Docker container. However, you can use Apache Kafka simply by replacing the docker-compose.yml file and by changing the Spring Cloud Stream binding dependency.

Functional programming is a huge departure from the object-oriented model that dominated programming (especially Java) until the last decade. In functional programming, functions, not object instances and classes, are the main organizational unit of code execution. This decouples data and logic in a way that has benefits for applications, such as stream processing, and allows for powerful chaining and composition of functions.

Spring WebFlux is Spring’s functional web framework that is equivalent to object-oriented Spring MVC. It uses functional programming techniques to build web services. It natively supports reactive programming, which Spring describes as:

Reactive programming is a programming paradigm that promotes an asynchronous, non-blocking, event-driven approach to data processing. Reactive programming involves modeling data and events as observable data streams and implementing data processing routines to react to the changes in those streams.

Reactive programming is a set of tools and techniques that have evolved to treat data as a continuous stream of data, like water flowing through a pipe, rather than viewing them as discrete events to handle individually. In reactive programming, you define potentially complex transformations and mappings that you apply to the stream. They sometimes return a new stream and sometimes return a reduced result. Functional programming is not necessarily reactive, but reactive programming tools almost always use functional programming paradigms.

Set Up a Spring Cloud Stream Development Environment

To do this tutorial, you’ll need a few tools:

You can test the docker-compose installation by opening a shell and running the following command:

docker-compose --version

You may need to run the command as a root user using sudo:

sudo docker-compose --version

You should get something like:

docker-compose version 1.25.4, build 01110ad01

Bootstrap Your Spring Cloud Stream Project

Spring has a great project called the Spring Initializr that you can use to quickly build starter Spring Boot projects. You can take a look at the project website. However, here you’re going to use the REST API to download a pre-configured project.

Open a shell and run the commands below.

Java
 




xxxxxxxxxx
1


 
1
curl https://start.spring.io/starter.tgz -d dependencies=webflux,okta,cloud-stream,amqp,lombok \
2
  -d groupId=okta.springcloudstreams \
3
  -d baseDir=okta-spring-cloud-streams | tar -xzvf -
4
cd okta-spring-cloud-streams


This creates a Spring Boot project configured with five additional dependencies:

  1. webflux: Spring Boot WebFlux, the functional and reactive web framework
  2. okta: Okta’s Spring Boot Starter that simplifies integrating OAuth 2.0 and OIDC
  3. cloud-stream: Spring Cloud Stream the main dependency
  4. amqp: RabbitMQ binders for Spring Cloud Stream
  5. lombok: Project Lombok, a set of helper annotation that generates boilerplate code

Create an Okta Maven Plugin YAML File

If you already have an Okta developer account, create a configuration file at ~/.okta/okta.yaml with your account information. If you skip this step, the Okta Maven Plugin will create a new Okta Org for you.

If you do not already have an account, you can skip this step and go on to the next section: Configure the OIDC Settings.

~/.okta/okta.yaml

YAML
 




xxxxxxxxxx
1


 
1
okta:
2
 client:
3
   orgUrl: https://{yourOktaDomain}
4
   token: {yourApiToken}


You’ll need to create an API token. From the Okta developer console, go to API and Tokens. Click Create Token. Give the token a name. Copy the token value and place it in the ~/.okta/okta.yaml file along with your Okta domain.

If you do not have an Okta developer account, don’t worry about the okta.yaml file. The Okta Maven Plugin will configure it for you.

Configure OIDC for Your Spring Cloud Stream App

You should now have an open shell and be in the project root directory. The next step is to use the Okta Maven Plugin to configure the OAuth 2.0 and OIDC settings. This plugin is great for tutorials like this because it will simplify signing up for an account, if you don’t already have one, and will create an appropriately configured OIDC application straight from the command line.

Change the application.properties file to a YAML file using the following command (or using your IDE). You’re doing this so that the Okta Maven Plugin creates a .yml file instead of a .properties file. This step actually doesn’t matter all that much, but it’s more succinct to configure the streams binders using YAML.

YAML
 




xxxxxxxxxx
1


 
1
mv src/main/resources/application.properties src/main/resources/application.yml


Now run the Okta Maven Plugin.

Java
 




xxxxxxxxxx
1


1
mvn com.okta:okta-maven-plugin:setup


Follow the prompts to either create a new account or log in to your existing account. You should see some lines like this:

Java
 




xxxxxxxxxx
1
11


1
Current OrgUrl: https://dev-447850.okta.com
2
Configuring a new OIDC, almost done:
3
Created OIDC application, client-id: 0oa30gk10KNOMD0wZ4x6
4
 
          
5
[INFO] Dependency: 'com.okta.spring:okta-spring-boot-starter' found in project.
6
[INFO] ------------------------------------------------------------------------
7
[INFO] BUILD SUCCESS
8
[INFO] ------------------------------------------------------------------------
9
[INFO] Total time:  5.099 s
10
[INFO] Finished at: 2020-04-13T13:54:23-08:00
11
[INFO] ------------------------------------------------------------------------


And you should see something like this in your src/main/resources/application.yml file:

Java
 




xxxxxxxxxx
1


1
okta:
2
  oauth2:
3
    client-secret: NHOilIaQPsgz5uOotjwm7p4v6MtHVTFyTT90wQ6p
4
    client-id: 0oa30gk10KNOMD0wZ4x6
5
    issuer: https://dev-123456.okta.com/oauth2/default


At this point you can run the Spring Boot application using the following shell command:

Java
 




xxxxxxxxxx
1


1
./mvnw spring-boot:run


And get something like this:

Java
 




xxxxxxxxxx
1


1
...
2
2020-04-13 13:59:29.769  INFO 1664 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
3
2020-04-13 13:59:29.769  INFO 1664 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
4
2020-04-13 13:59:29.842  INFO 1664 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080
5
2020-04-13 13:59:29.845  INFO 1664 --- [           main] o.s.demo.DemoApplication    


Notice that the output says Netty started on port(s): 8080. Netty is the default web server for Spring WebFlux applications (not Jetty or Tomcat, which are typical for SpringMVC). Since the Spring team designed WebFlux to be asynchronous and event-driven, Netty is the obvious choice because it uses the exact same model.

If you make a request on the running server, you’ll get a 404 - Not Authorized.

Use HTTPie to give it a try. Open a new shell and run:

Java
 




xxxxxxxxxx
1


 
1
$ http :8080
2
 
          
3
HTTP/1.1 401 Unauthorized
4
Cache-Control: no-cache, no-store, max-age=0, must-revalidate
5
...


You get the 401 because by including the Okta Spring Boot Starter you auto-configured OAuth SSO (single sign-on), and by default all requests require authentication. I’m pointing this out simply because at this point all you’ve done is include a dependency—without any configuration—so it might seem a little confusing.

Create a RabbitMQ Pub-Sub Messaging Service

You’re going to use Docker Compose to create a RabbitMQ messaging service. It’s ridiculously easy (assuming you have Docker and Docker Compose working).

Create a docker-compose.yml file in the project root directory.

Java
 




xxxxxxxxxx
1


1
version: '3'
2
services:
3
  rabbitmq:
4
    image: rabbitmq:management
5
    ports:
6
      - 5672:5672
7
      - 15672:15672


Open a new shell (simply because you’re going to want to leave this shell open) and run the following command.

docker-compose up

You may need to run the command as root:

Shell
 




xxxxxxxxxx
1


 
1
sudo docker-compose up


You could also run it as a daemon and not have to leave the shell window open using -d. I like having the log files visible, so I prefer to run it in its own window.

Assuming all went well, you’ll see some output like this:

Java
 




xxxxxxxxxx
1


1
...
2
rabbitmq_1  | 2020-04-13 22:12:32.121 [info] <0.677.0> Management plugin: HTTP (non-TLS) listener started on port 15672
3
rabbitmq_1  | 2020-04-13 22:12:32.122 [info] <0.783.0> Statistics database started.
4
rabbitmq_1  | 2020-04-13 22:12:32.122 [info] <0.782.0> Starting worker pool 'management_worker_pool' with 3 processes in it
5
rabbitmq_1  | 2020-04-13 22:12:32.198 [info] <0.8.0> Server startup complete; 3 plugins started.
6
rabbitmq_1  |  * rabbitmq_management
7
rabbitmq_1  |  * rabbitmq_management_agent
8
rabbitmq_1  | * rabbitmq_web_dispatch
9
rabbitmq_1  |  completed with 3 plugins.


Leave that process running, as that’s your RabbitMQ messaging service. You’ll notice that there are two ports the docker-compose file exposed: 15672 and 5672. 5672 is the port for the messaging service itself. 15672 exposes a web management page that you can check out by opening a browser, navigating to http://localhost:15672, and using the credentials guest:guest.

Web management page

Build a Spring Cloud Stream Application

Now you can create a Spring Cloud Stream application by replacing the DemoApplication.java class file with the following contents.

src/main/java/okta/springcloudstreams/demo/DemoApplication.java

Java
 




xxxxxxxxxx
1
75


1
package okta.springcloudstreams.demo;
2
 
          
3
import org.slf4j.Logger;
4
import org.slf4j.LoggerFactory;
5
import org.springframework.boot.SpringApplication;
6
import org.springframework.boot.autoconfigure.SpringBootApplication;
7
import org.springframework.context.annotation.Bean;
8
import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
9
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
10
import org.springframework.security.config.web.server.ServerHttpSecurity;
11
import org.springframework.security.web.server.SecurityWebFilterChain;
12
 
          
13
import java.util.Random;
14
import java.util.concurrent.atomic.AtomicInteger;
15
import java.util.function.Consumer;
16
import java.util.function.Function;
17
import java.util.function.Supplier;
18
 
          
19
@SpringBootApplication
20
@EnableWebFluxSecurity
21
@EnableReactiveMethodSecurity
22
public class DemoApplication {
23
 
          
24
    // Using the default logger
25
    private static Logger logger = LoggerFactory.getLogger(DemoApplication.class);
26
 
          
27
    // Standard entrypoint into the Spring Boot application
28
    public static void main(String[] args) {
29
        SpringApplication.run(DemoApplication.class, args);
30
    }
31
 
          
32
    // Configure web security to allow all transactions
33
    @Bean
34
    public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
35
        http.authorizeExchange().anyExchange().permitAll();
36
        return http.build();
37
    }
38
 
          
39
    // Publishes a random integer to the "ints" channel (as defined
40
    // in the application.yml file) every second.
41
    static class Source {
42
 
          
43
        private Random random = new Random();
44
 
          
45
        @Bean
46
        public Supplier<Integer> send() {
47
            return () -> random.nextInt(100);
48
        }
49
    }
50
 
          
51
    // Subscribe to the "total" channel and log the results
52
    static class Sink {
53
    
54
        @Bean
55
        public Consumer<String> receive() {
56
            return payload -> {
57
                logger.info(payload);
58
            };
59
        }
60
        
61
    }
62
    
63
    // Subscribes to the "ints" channel, calculating an
64
    // accumulated total, and publishing both to the "total" channel
65
    static class Accumulator {
66
 
          
67
        private AtomicInteger count = new AtomicInteger(0);
68
 
          
69
        @Bean
70
        public Function<Integer, String> accumulate() {
71
            return payload -> "Current value: " + payload + ", Total: " + this.count.addAndGet(payload);
72
        }
73
 
          
74
    }
75
}


Add the following to your src/main/resources/application.yml file (leaving the Okta OAuth section alone):

Java
 




xxxxxxxxxx
1
14


1
spring:
2
  cloud:
3
    stream:
4
      function:
5
        definition: accumulate;receive;send
6
      bindings:
7
        send-out-0:
8
          destination: ints
9
        receive-in-0:
10
          destination: total
11
        accumulate-in-0:
12
          destination: ints
13
        accumulate-out-0:
14
          destination: total


Let’s talk about that code for a minute. First, notice that you defined the bindings in the application.yml file instead of using annotations. As of Spring Cloud Stream 3.0, Spring considers annotation-based configuration legacy and moved toward functional binding names configured in properties or yml files. Take a look at their docs on the functional binding paradigm for complete info.

In the application.yml file above, you configured three functions with four bindings (one in, one out, and one with in and out).

This follows a naming convention:

Shell
 




xxxxxxxxxx
1


 
1
input: <functionName> + -in- + <function parameter index>
2
output: <functionName> + -out- + <function parameter index>


For example, accumulate-in-0 in the example above defines a binding for function accumulate that is an input that subscribes to a channel with the data it receives in the first input parameter (index 0).

Notice how the three bound functions, accumulate;receive;send, match the three functions in the DemoApplication class. Don’t get distracted by the static class structures. That was just a way to make the state local to the functions and to organize things—it would all have worked just as well to make the function beans methods of DemoApplication itself.

The application itself is very simple. send is bound to the ints channel, to which it is going to send a random integer every second. This, by the way, is a property of Spring’s implementation of the Supplier interface; Spring triggers it automatically every second, so it’s a great tool for testing and developing streams. accumulate receives the random integers from the ints channel, calculates a running total, and publishes both together as a String on the total channel. receive listens to the total channel and logs the messages.

Integer Accumulator App

The last thing I’ll mention is the security configuration, which you do in securityWebFilterChain(ServerHttpSecurity http). Here you are telling Spring WebFlux and Spring Security to allow all transactions. By default, Spring Security requires all requests to be authenticated, which is great, but you’re not there yet, so for the moment, you’re essentially turning off security. You’ll re-enable it later.

There are two annotations that, along with the okta-spring-boot-starter dependency, activate WebFlux security: @EnableWebFluxSecurity and @EnableReactiveMethodSecurity. The first turns on security generally, and the second allows for method-level security. The Okta starter dependency, okta-spring-boot-starter, brings in the necessary dependencies for Spring Security, so you don’t have to do that separately.

Try it out. First, make sure you have your RabbitMQ service running with Docker Compose. Then, from the project root directory, run the Spring Boot app using Maven:

Java
 




xxxxxxxxxx
1


 
1
./mvnw spring-boot:run


If all goes well, you’ll see output like below, showing a running list of random numbers and a running total.

Java
 




xxxxxxxxxx
1


1
...
2
2020-03-14 10:10:33.493  INFO 21276 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080
3
2020-03-14 10:10:33.495  INFO 21276 --- [           main] o.s.demo.DemoApplication                 : Started DemoApplication in 2.336 seconds (JVM running for 2.497)
4
2020-03-14 10:10:34.368  INFO 21276 --- [KyCe9nxFnKiTg-1] o.s.demo.DemoApplication                 : Current value: 56, Total: 112
5
2020-03-14 10:10:35.373  INFO 21276 --- [KyCe9nxFnKiTg-1] o.s.demo.DemoApplication                 : Current value: 13, Total: 125
6
2020-03-14 10:10:36.376  INFO 21276 --- [KyCe9nxFnKiTg-1] o.s.demo.DemoApplication                 : Current value: 15, Total: 140
7
...


Press Control-C to stop the Spring Boot application, but leave the Rabbit server running.

Add a Streaming REST Resource to the Application

In this section, you’re going to do a couple of things that highlight how easy Spring Boot, Spring Webflux, and Spring Cloud Stream make handling streams. First, instead of simply passing a string with the current value and accumulated total, you’re going to encapsulate that data in a Java class that Spring will automatically serialize to and deserialize from JSON. Second, you’re going to use WebFlux stream processing to return an event stream from a REST endpoint, which you’ll be able to view using HTTPie.

Add the POJO (Plain Old Java Object) mapping class: AccumulatorMessage.

src/main/java/okta/springcloudstreams/demo/AccumulatorMessage.java

Java
 




xxxxxxxxxx
1
13


1
package okta.springcloudstreams.demo;
2
 
          
3
import lombok.Data;
4
 
          
5
@Data
6
public class AccumulatorMessage {
7
    int currentValue;
8
    int total;
9
    AccumulatorMessage(int currentValue, int total) {
10
        this.currentValue = currentValue;
11
        this.total = total;
12
    }
13
}


This class takes advantage of Lombok’s @Data annotation, which means you don’t have to write getters and setters for the class properties—the program generates them automatically.

The next class is the REST resource class: RestResource.

src/main/java/okta/springcloudstreams/demo/RestResource.java

Java
 




xxxxxxxxxx
1
34


1
package okta.springcloudstreams.demo;
2
 
          
3
import org.slf4j.Logger;
4
import org.slf4j.LoggerFactory;
5
import org.springframework.context.annotation.Bean;
6
import org.springframework.http.MediaType;
7
import org.springframework.web.bind.annotation.GetMapping;
8
import org.springframework.web.bind.annotation.RestController;
9
import reactor.core.publisher.EmitterProcessor;
10
import reactor.core.publisher.Flux;
11
 
          
12
import java.util.function.Consumer;
13
 
          
14
@RestController
15
public class RestResource {
16
 
          
17
    private static Logger logger = LoggerFactory.getLogger(RestResource.class);
18
 
          
19
    private final EmitterProcessor<String> streamProcessor = EmitterProcessor.create();
20
 
          
21
    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
22
    public Flux<String> getSee() {
23
        return this.streamProcessor;
24
    }
25
 
          
26
    @Bean
27
    public Consumer<Flux<String>> receiveSse() {
28
        return recordFlux ->
29
                recordFlux
30
                        .doOnNext(this.streamProcessor::onNext)
31
                        .doOnNext(value -> logger.info("*" +value))
32
                        .subscribe();
33
    }
34
}


This class receives events from the total channel (which you will configure in a moment), logs them, and places them in an EmitterProcessor. It also defines a REST endpoint that returns this EmitterProcessor as a stream of String events. This is the endpoint you’ll be able to GET using HTTPie.

You need to update application.yml to the following (but leave the Okta OAuth section alone). This adds the input binding for the new receiveSse() method.

src/main/resources/application.yml

Java
 




xxxxxxxxxx
1
16


1
spring:
2
  cloud:
3
    stream:
4
      function:
5
        definition: accumulate;receive;send;receiveSse
6
      bindings:
7
        send-out-0:
8
          destination: ints
9
        receiveSse-in-0:
10
          destination: total
11
        receive-in-0:
12
          destination: total
13
        accumulate-in-0:
14
          destination: ints
15
        accumulate-out-0:
16
          destination: total


You also need to update DemoApplication so that it uses the new AccumulatorMessage class instead of sending Strings over the total channel. Notice that all you have to do is specify the appropriate return and parameter types and Spring maps the Java class to and from JSON.

src/main/java/okta/springcloudstreams/demo/DemoApplication.java

Java
 




xxxxxxxxxx
1
66


1
package okta.springcloudstreams.demo;
2
 
          
3
import org.slf4j.Logger;
4
import org.slf4j.LoggerFactory;
5
import org.springframework.boot.SpringApplication;
6
import org.springframework.boot.autoconfigure.SpringBootApplication;
7
import org.springframework.context.annotation.Bean;
8
import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
9
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
10
import org.springframework.security.config.web.server.ServerHttpSecurity;
11
import org.springframework.security.web.server.SecurityWebFilterChain;
12
 
          
13
import java.util.Random;
14
import java.util.concurrent.atomic.AtomicInteger;
15
import java.util.function.Consumer;
16
import java.util.function.Function;
17
import java.util.function.Supplier;
18
 
          
19
@SpringBootApplication
20
@EnableWebFluxSecurity
21
@EnableReactiveMethodSecurity
22
public class DemoApplication {
23
 
          
24
    private static Logger logger = LoggerFactory.getLogger(DemoApplication.class);
25
 
          
26
    public static void main(String[] args) {
27
        SpringApplication.run(DemoApplication.class, args);
28
    }
29
 
          
30
    @Bean
31
    public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
32
        http.authorizeExchange().anyExchange().permitAll();
33
        return http.build();
34
    }
35
 
          
36
    static class Accumulator {
37
 
          
38
        private AtomicInteger count = new AtomicInteger(0);
39
 
          
40
        @Bean
41
        public Function<Integer, AccumulatorMessage> accumulate() {
42
            return payload -> new AccumulatorMessage(payload ,this.count.addAndGet(payload));
43
        }
44
 
          
45
    }
46
 
          
47
    static class Source {
48
 
          
49
        private Random random = new Random();
50
 
          
51
        @Bean
52
        public Supplier<Integer> send() {
53
            return () -> random.nextInt(100);
54
        }
55
    }
56
 
          
57
    static class Sink {
58
  
59
        @Bean
60
        public Consumer<AccumulatorMessage> receive() {
61
            return payload -> {
62
                logger.info(payload.toString());
63
            };
64
        }
65
    }
66
}


Run the Spring Boot application again.

./mvnw spring-boot:run

This time you’ll see a bunch of messages like this: AccumulatorMessage(currentValue=48, total=79). This is from the toString() method Lombok generated and logged in the receive method.

Leave the Spring Boot application running and open a new shell. You’re going to use HTTPie to subscribe to the streaming REST endpoint.

http --stream :8080/sse

You’ll see streaming data:

Java
 




xxxxxxxxxx
1


1
...
2
data:{"currentValue":60,"total":589}
3
data:{"currentValue":22,"total":611}
4
data:{"currentValue":38,"total":649}
5
data:{"currentValue":34,"total":683}
6
data:{"currentValue":89,"total":772}
7
data:{"currentValue":23,"total":795}
8
data:{"currentValue":90,"total":885}
9
...


Secure the App Using JWT OAuth & OIDC

The last step is to secure the /sse endpoint. It’s going to be beguilingly simple. Okta’s Spring Boot Starter has done a fair amount of work for you. Take a look at the project GitHub page for more info. We’ve done our best to make it easy to secure Spring MVC and Spring WebFlux applications.

In DemoApplication.java, replace the method securityWebFilterChain(ServerHttpSecurity http) with the following:

Java
 




xxxxxxxxxx
1


1
@Bean
2
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
3
    http
4
        .authorizeExchange(authorizeExchange ->
5
            authorizeExchange.anyExchange().authenticated())
6
        .oauth2ResourceServer().jwt();
7
    return http.build();
8
}


This configures Spring Security to authorize all requests and to set up an OAuth 2.0 resource server using JSON Web Token (JWT) authentication.

Stop the Spring Boot application, if you need to, using Control-C, and restart it.

./mvnw spring-boot:run

It now requires authentication. If you try and request the /sse endpoint again, you’ll get a 401 Unauthorized error.

Java
 




xxxxxxxxxx
1


 
1
$ http -S :8080/sse
2
 
          
3
HTTP/1.1 401 Unauthorized
4
Cache-Control: no-cache, no-store, max-age=0, must-revalidate
5
...
6
  


Generate a Token Using OIDC Debugger

To access the protected endpoint, you need a valid JWT. Fortunately, we have OIDC Debugger, which is a simple webpage designed to help test OIDC applications. Before you can use it, however, you need to add the OIDC Debugger redirect URL to your Okta OIDC application.

Open your Okta developer account. If you auto-created it using the Maven plugin, look at the issuer URL in the application.yml file and open the base URL in a browser (without the /oauth2/default). It will be something like: https://dev-123456.okta.com.

Once you’re in the Okta developer dashboard, from the top menu, go to Applications. You’ll see the demo application that the Maven plugin created for you.

Okta demo application

Click on the demo application. Select the General tab. Click Edit.

Under the Login section, add a new URL to Login redirect URIs: https://oidcdebugger.com/debug

You need to do this to whitelist the OIDC Debugger redirect URI so that it can generate and receive your token.

You also need to change the Allowed grant types. Check the Implicit (Hybrid) checkbox, and check the sub-check box below it: Allow Access Token with implicit grant type.Settings

While you’re there, take note of the Client ID, as you’ll need it in just a moment. Or leave this window open.

Open the OIDC Debugger website. You need to fill in a few values.

Authorize URI: https://{yourOktaUri}/oauth2/default/v1/authorize – You need to fill in your Okta URI. You can find it by looking at the okta.oauth2.issuer value in the application.yml file.

Client ID: fill in the value from your Okta OIDC application.

State: this can be any string value. In production, you typically use it to protect against cross-site request forgery attacks.

Response type: make sure you checked the token checkbox.Check token checkbox

Click Send Request and you should get a success page that shows you the returned access token. Copy it to the clipboard and store it in a shell variable in the shell window you’re using to run HTTPie.

Java
 




xxxxxxxxxx
1


1
TOKEN=eyJraWQiOiJxaUNtVGFJYnVIeXBLakRpTjZ3LWU...


Test JWT Token Authentication

Now that you’ve got your token, you’re ready to use it on your secured endpoint. Make sure your Spring Boot application is still running, then test the endpoint using your JWT and HTTPie.

Java
 




xxxxxxxxxx
1


1
http -S :8080/sse "Authorization: Bearer ${TOKEN}"


You should see your streaming data.

Java
 




xxxxxxxxxx
1
10


1
...
2
data:{"currentValue":57,"total":6055}
3
data:{"currentValue":1,"total":6056}
4
data:{"currentValue":2,"total":6058}
5
data:{"currentValue":10,"total":6068}
6
data:{"currentValue":62,"total":6130}
7
data:{"currentValue":29,"total":6159}
8
data:{"currentValue":85,"total":6244}
9
data:{"currentValue":84,"total":6328}
10
...


That’s it! You created a secure Spring Cloud Stream application.

The astute out there might object that the RabbitMQ server itself isn’t secured. That’s true, and there are various methods for securing it, chiefly using TLS and SSL certificates, but including that in this tutorial greatly expands the scope. We have a tutorial that demonstrates end-to-end security using Quarkus and Apache Kafka Streams, including how to generate all of the necessary SSL certificates and Java keyfiles.

Learn More About Reactive Programming and Spring Boot

All done. You created a Spring Cloud Stream application that publishes and subscribes to multiple channels. You used Docker to quickly and easily launch a RabbitMQ messaging service. You used reactive, functional programming and saw how to use Spring’s new functional binding model. You used Spring WebFlux to publish the data to a web stream and used HTTPie to stream the data from the command line. Finally, you used Okta’s Spring Boot Starter and to add OAuth 2.0 and OIDC authentication to the application.

You can find the source code for this tutorial on GitHub at oktadeveloper/okta-spring-cloud-streams-example.

If you liked this tutorial, chances are you’ll like some of our other ones:

If you have any questions, please leave a comment below. You can also follow us @oktadev on Twitter. We have a popular YouTube channel too—check it out!

 

 

 

 

Top