Spring Integration - Building a Sample Application

Spring Integration (SI) is a framework enabling a collection of individual applications to integrate together to deliver a business enterprise system. The framework is essentially a lightweight messaging system that enables spring based applications to communicate with one another and supports integration with external systems via declarative adaptors. It is based on the 'filters and pipes' design architecture. A key feature of it is that it achieves this integration in a minimally intrusive way.

The framework is built on 3 main components:

The basic idea behind the SI framework is that applications communicate with each other by sending/receiving messages. These messages would typically contain the information (payload) required by the next application in the process pipeline. The transport of messages from one application to another is performed by Channel components. The Endpoints perform some action based on the payload. This could be routing the messages to another endpoint or processing the payload itself.

The objective of this post is to provide an introduction to Spring Integration. To help achieve this, I developed a sample application which will be discussed below. The source for this sample application is available at here. The project was built and run using spring-integration-4.0.0, maven 3.2.1 and jdk1.6.

The main dependency is for the relevant spring-integration jar as declared in the pom.xml:
<dependencies>
 <dependency>
  <groupid>org.springframework.integration</groupid>
  <artifactid>spring-integration-stream</artifactid>
  <version>4.0.0.RC1</version>
 </dependency>
</dependencies>
I ran the application using the maven exec plugin. This allows me to clean, package and run the application by invoking
mvn clean package exec:java -P OnlineShop
from the command line. 

Developing a sample application: Tabernus

My goal as usual was to build something very simple which would help me to become familiar with key concepts of this framework and to this end I've knocked up a simple app which does not connect up individual systems but rather invokes methods on a POJO. Extending this to actual working applications shouldn't be too difficult.

The scenario I'm going to model revolves around purchasing items from an online store (Tabernus). This store only sells 3 types of items: Books, Music CDs, and software. During a Sale, the owners have decided to apply different discounts based on the item type. In this instance books, music and software benefit from discounts of 5%, 10%, and 15% respectively.

The following diagram shows our domain entities.



The class diagram shows that a Customer can place an Order comprising of a number of OrderItems which are of type Book, MusicCD or Software. The problem I need to solve is to design a system which can interrogate each Order and apply the correct discount based on the item type. Subsequently it should be able to compute the total cost of the order once the discounts have been applied.

To model this using Spring Integration we need the following pipeline





The above diagram shows various components most of which can be divided into 2 categories, channels (blue cylinder shapes) and endpoints (rectangular boxes).  The exception to this is the Poller component whose purpose is to enable the various endpoints to function correctly and discussion of it will be given later.

We'll start off by briefly covering the various stages in this pipeline as indicated by the numbers in red. Following this we will delve deeper into how we build this pipeline using the SI framework.

The pipeline is comprised of 6 major stages as reflected by the numbers in the diagram,

  1. The Gateway component represents the entry point to the messaging system. All new Orders will be submitted to this component which will in turn wrap them as messages and place them into the channel appropriately named ordersChannel.
  2. Using the Splitter component - each Order is decomposed into a collection of it's constituent OrderItem instances. Each of these is wrapped in a Message and placed in the orderItemsChannel.
  3. The Router component considers each OrderItem in turn and places it in the relevant channel, e.g. Book items will be placed in the bookItemsChannel etc. This allows us to consider the different item types separately.
  4. The ServiceActivator needs to consider messages within each of the 3 channels and calculate the correct discount based on the channel. After completing the calculation for each OrderItem, it will place the OrderItem in the processedItemsChannel.
  5. The Aggregator component will collect all OrderItem instances placed in the processedItemsChannel and reconstruct the original Order. This will subsequently be placed in the deliveriesChannel, which represents the end of the pipeline.
  6. The Poller Component is required to configure how often the various endpoints will interrogate their respective input channels for messages.

To implement the pipeline shown above using the SI framework, we need to
  1. implement the various end points.
  2. configure the pipeline in an xml file (Shop.xml) - identifying the various channels and endpoints and how they wire up together.
At this point I should mention that SI offers 2 approach to configuring your process pipeline, annotations based and xml. In this article I'll be using the latter.

Let's start to look at some code. We'll consider each stage described above and show the java implementation of the endpoint and xml configuration required to wire up the components.

Step 1 - Gateway

To begin with, we need to implement the Client that will invoke the Gateway component to place the Order. The client (OnlineShop.java) is shown below,
public class OnlineShop {

 public static void main(String[] args) {

  AbstractApplicationContext context =
   new ClassPathXmlApplicationContext("/META-INF/com/prodcod/shop.xml", OnlineShop.class);

  Shop shop = (Shop) context.getBean("shop");

  final Order order = createOrder();

  shop.placeOrder(order);

  context.close();
 }
The logic here is quite simple. The client creates a dummy Order and passes this as an argument when it invokes the placeOrder() method on the gateway component. The gateway  component referred here as Shop is injected by Spring.

The Gateway component looks like:
// Gateway component
public interface Shop {

 @Gateway(requestChannel="ordersChannel")
 void placeOrder(Order order);

}
As you can see, this is simply an interface, whose implementation will be provided by Spring when it is injected into the client application. This is achieved by the use of the @Gateway annotation which informs Spring that this is a Gateway component and it needs to provide the implementation. Additionally the annotation accepts an attribute, requestChannel which defines the channel on which the Order instance will be placed. The framework does this by simply wrapping our instance of Order within a Message instance and placing it in the channel, 'ordersChannel'.

The Gateway component and the 'ordersChannel' are declared as follows in the file shop.xml
<int:gateway id="shop" service-interface="com.prodcod.service.Shop"></int:gateway>
<int:channel id="ordersChannel"></int:channel>
Step 2 - Splitter
The next end point is the Splitter component. Appropriately named, it's role is to take a single message containing a payload of a collection of items and splitting it into a number of messages, each of which contains a single element from the collection. In our case, we want to decompose the Order into it's constituent OrderItem instances. It does this by taking a Message containing the payload of Order from 'ordersChannel' and then processing it before sending messages (each containing an OrderItem instance) to the 'orderItemsChannel'.  Our implementation of the splitter is called OrderSplitter and is defined as below,
public class OrderSplitter extends AbstractMessageSplitter{

 @Override
 protected Object splitMessage(Message message) { 
  return ((Order)message.getPayload()).getOrderItems();
 }

}
Implementing a splitter is quite easy and involves extending the AbstractMessageSplitter class and overriding the splitMessage() method. This simply takes a message containing the payload of Order and returns it's collection of OrderItems.

Step 3 - Router
Having decomposed the Order into it's constituent OrderItems, we now need to separate them into groups of Books, MusicCD, and Software. This is achieved using a router. Our implementation of the Router looks like,
public class OrderItemRouter {

 public String routeOrder(OrderItem orderItem) {

  String channel = "";
  if(isBook(orderItem)) {
   channel = "bookItemsChannel";
  }
  else if(isMusic(orderItem)) {
   channel = "musicItemsChannel";
  }
  else if(isSoftware(orderItem)) {
   channel = "softwareItemsChannel";
  }

  return channel;
  }

 .....................
 .....................

 }
Nothing too complicated here. For each OrderItem, the method routeOrder() will determine it's item type and return the name of the channel that this message should be sent to. The channel name is returned by the method. Spring will then ensure that the message containing the OrderItem is relayed to the named channel.

The configuration for OrderItemRouter looks like,
 <int:router input-channel="orderItemsChannel" method="routeOrder" ref="orderRouter">
 <beans:bean class="com.prodcod.service.OrderItemRouter" id="orderRouter">
</beans:bean>
</int:router>
The config identifies that the class OrderItemRouter is a Router component which will consume messages from the orderItemsChannel. Further Spring needs to invoke the method routeOrder() which contains the logic to perform the routing.

The channels for each item type are declared as follows
<int:channel id="bookItemsChannel">
  <int:queue capacity="10"></int:queue>
</int:channel>

<int:channel id="musicItemsChannel">
  <int:queue capacity="10"></int:queue>
</int:channel>

<int:channel id="softwareItemsChannel">
  <int:queue capacity="10"></int:queue>
</int:channel>
Step 4 - ServiceActivator

The next step is to calculate the discounted price for each item type and this is performed by a ServiceActivator component. This is implemented as follows
public class Shopkeeper {

 private static final BigDecimal BOOK_DISCOUNT = new BigDecimal(0.05);
 private static final BigDecimal MUSIC_DISCOUNT = new BigDecimal(0.10);
 private static final BigDecimal SOFTWARE_DISCOUNT = new BigDecimal(0.15);

 /**
  * Performs discount on books
  * @param bookOrderItem OrderItem comprising of a book item
  * @return OrderItem with discount price newly calculated
  */
 public OrderItem processBooks(OrderItem bookOrderItem){  

  final BigDecimal finalPrice = calculateDiscountedPrice(bookOrderItem, BOOK_DISCOUNT);

  bookOrderItem.setDiscountedPrice(finalPrice);

  return bookOrderItem;
 }

 /**
  * Performs discount on music
  * @param musicOrderItem OrderItem comprising of a music item
  * @return OrderItem with discount price newly calculated
  */
 public  OrderItem processMusic(OrderItem musicOrderItem){  

  final BigDecimal finalPrice = calculateDiscountedPrice(musicOrderItem, MUSIC_DISCOUNT);

  musicOrderItem.setDiscountedPrice(finalPrice);

  return musicOrderItem;
 }

 /**
  * Performs discount on software
  * @param softwareOrderItem OrderItem comprising of a book item
  * @return OrderItem with discount price newly calculated
  */
 public  OrderItem processSoftware(OrderItem softwareOrderItem){  

  final BigDecimal finalPrice = calculateDiscountedPrice(softwareOrderItem, SOFTWARE_DISCOUNT);

  softwareOrderItem.setDiscountedPrice(finalPrice);

  return softwareOrderItem;
 }
}
This class exposes 3 methods to compute the new discounted price for each item type. Each method returns the OrderItem instance with the new price. The ServiceActivator is configured as follows:
 <int:service-activator input-channel="bookItemsChannel" method="processBooks" output-channel="processedItems" ref="shopkeeper">

 <int:service-activator input-channel="musicItemsChannel" method="processMusic" output-channel="processedItems" ref="shopkeeper">

 <int:service-activator input-channel="softwareItemsChannel" method="processSoftware" output-channel="processedItems" ref="shopkeeper">

 <beans:bean class="com.prodcod.service.Shopkeeper" id="shopkeeper">

</beans:bean></int:service-activator></int:service-activator></int:service-activator>
This tells Spring that the Shopkeeper class is a ServiceActivator and will consume messages from any of the 3 channels defined in the input-channel attribute. When a message appears in one of these channels, Spring will invoke the appropriate method on the ServiceActivator class as specfied by the attribute method. Anything returned from all three methods will be placed in the processedItems channel, ready for the next step of the processing pipeline.

Step 5 - Aggregator
The final stage is to take the individual OrderItems with their newly computed discounted prices and reconstruct the Order. This is achieved using an aggregator. Our implementation of an aggregator is listed below
public class OrderCompleter {

  public Order prepareDelivery(List<orderitem> orderItems) {
         final Order order = new Order();
         order.setOrderItems(orderItems);  
         return order;
  }
}
</orderitem>
The aggregator exposes a method that takes a collection of OrderItem objects. These will come from the processedItems channel declared as
 <int:channel id="processedItems">
</int:channel>
Recall this is the output channel for the service activator class as discussed above. The aggregator is configured in the xml file as
<int:aggregator input-channel="processedItems" method="prepareDelivery" output-channel="deliveries">
 <beans:bean class="com.prodcod.service.OrderCompleter"></beans:bean>
</int:aggregator>
The configuration tells Spring that the aggregator component will consume messages from the processedItems channel. These will be processed by the method prepareDelivery on the class OrderCompleter. Any output from this class will be relayed to the channel-adaptor deliveries, which is declared as
<int-stream:stdout-channel-adapter id="deliveries">
</int-stream:stdout-channel-adapter>
The stdout-channel-adapter component writes to the systems STDOUT output stream.
Step 6 - Poller
To complete the setup we have to configure a poller component. This is required to enable the channels to work correctly.  All our channels are of a queue type and so their respective consumers need to know when to query them. This is achieved using a poller mechanism. It is configured in the following way
<int:poller default="true" fixed-delay="1000" id="poller"></int:poller>
In this case,  we have declared a global poller (as indicated by the default attribute). This will be used by the various end points to determine when they should interrogate their respective input-channels for messages. The second attribute fixed-delay is used to configure the polling interval.

Running the Application


Building and running the app shows the following output:


The logging shows that the Customer submitted an Order for 3 items, one of each type. All items cost £100 each. The Order was then split into 3 OrderItems each of which was routed to the correct processing channel based on the item type. The ServiceActivator (Shopkeeper) then calculated the discount for each item and this was set on the OrderItem instance. The OrderItems were then aggregated using the OrderCompleter class which displays the final discounted price of £270 to be paid by the Customer. Note that the messages are logged to be in different stages of the processing pipeline despite starting off in the same order.

This completes the tutorial on the Spring Integration Framework. Any comments relating to corrections, omissions, etc are welcome.

 

 

 

 

Top