Java DSL for Spring Integration 1.2 Milestone 2 Is Available

I’m pleased to announce that the Java DSL for Spring Integration 1.2 M2 is available now!

First of all, I’d like to thank everyone who created issues, raised Pull Requests, provided feedback or just asked questions on StackOverflow. Especial thanks for early adopters since the previous Milestone 1. With their help, we have improved and fixed some issues with runtime flow registration.

The artifact org.springframework.integration:spring-integration-java-dsl:1.2.0.M2 is available in the Milestone repo. So, give it a shot and don’t hesitate to raise a GH issue for any feedback!

Some highlights of the current iteration:

JPA Support

After many Community requests we finally introduced the Jpa Factory and corresponding `IntegrationComponentSpec`s to provide a fluent API for the Spring Integration JPA components:

@Autowired
private EntityManagerFactory entityManagerFactory;

@Bean
public IntegrationFlow pollingAdapterFlow() {
    return IntegrationFlows
            .from(Jpa.inboundAdapter(this.entityManagerFactory)
                    .entityClass(StudentDomain.class)
                    .maxResults(1)
                    .expectSingleResult(true),
                e -> e.poller(p -> p.trigger(new OnlyOnceTrigger())))
            .channel(c -> c.queue("pollingResults"))
            .get();
}

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
            .handle(Jpa.updatingGateway(this.entityManagerFactory),
                    e -> e.transactional(true))
            .channel(c -> c.queue("persistResults"));
}

@Bean
public IntegrationFlow retrievingGatewayFlow() {
    return f -> f
            .handle(Jpa.retrievingGateway(this.entityManagerFactory)
                    .jpaQuery("from Student s where s.id = :id")
                    .expectSingleResult(true)
                    .parameterExpression("id", "payload"))
            .channel(c -> c.queue("retrieveResults"));
}

Mid-Flow Transaction Support

"Inspired" by the complexity of transaction support configuration for the Spring Integration JPA components (actually programmatic TransactionalInterceptor), we have introduced TransactionInterceptorBuilder. In addition, we provide the TransactionHandleMessageAdvice, which allows you to start transactions from any endpoint for the entire sub-flow, not only the handleRequestMessage as it is in the case of regular ConsumerEndpointSpec.advice(). Actually, the main trick is done by the HandleMessageAdvice, recently introduced in Spring Integration Core, which is a marker interface to distinguish advice for the handleRequestMessage only or for the flow starting from the current MessageHandler. For convenience, a bunch of .transactional() methods have been added to the ConsumerEndpointSpec.

Scatter-Gather Support

The Scatter-Gather EI pattern now has its own Java DSL API:

@Bean
public IntegrationFlow scatterGatherFlow() {
    return f -> f
      .scatterGather(scatterer -> scatterer
         .applySequence(true)
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10))
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10))
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10)),
      gatherer -> gatherer
         .releaseStrategy(group ->
                group.size() == 3 ||
                      group.getMessages()
                          .stream()
                          .anyMatch(m -> (Double) m.getPayload() > 5)),
      scatterGather -> scatterGather
         .gatherTimeout(10_000));
}

Where the scatterer is just a RecipientListRouter, gatherer — an AggregatingMessageHandler, and the last Consumer accept options for the ScatterGatherHandler.

More Routers Improvements

The .routeToRecipients() API now provides more configuration variants for recipients:

.routeToRecipients(r -> r
    .recipient("foo-channel", "'foo' == payload")
    .recipient("bar-channel", m ->
        m.getHeaders().containsKey("recipient")
            && (boolean) m.getHeaders().get("recipient"))
    .recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload",
        f -> f.<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("recipientListSubFlow1Result")))
    .recipientFlow((String p) -> p.startsWith("baz"),
        f -> f.transform("Hello "::concat)
            .channel(c -> c.queue("recipientListSubFlow2Result")))
    .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                   "bax".equals(m.getPayload())),
        f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
    .defaultOutputToParentFlow())

Previously, the .route() operator made the next .channel() in the IntegrationFlow as a defaultOutputChannel of the Router. According to the user experience, it doesn’t sound reasonable to make such a decision unconditional. We reworked .route() to align it with the standard AbstractMessageRouter behavior. The .defaultOutputChannel() and .defaultSubFlowMapping() have been added to utilize the default logic for the Router. To rollback to the previous behavior, the .defaultOutputToParentFlow() is present, as you noticed by the .routeToRecipients() sample above.

See the commit history for 1.2.0.M2 version for more information. And always read JavaDocs to understand the API you use!

Next Steps

We expect the first (and, we hope, the last) Release Candidate for version 1.2 in a couple weeks, after some adoption for Spring Integration 4.3.2 and Spring Boot 1.4.1. It’s soon enough, as spring-integration-java-dsl will move to the Spring Integration Core5.0 and Java 8 code base. The current 1.2 version will be still supported, but just for bug fixes.

 

 

 

 

Top