Spring Integration with JMS and Map Transformers

in this article i explained how spring built-in transformers  works for while transforming object message to map message.
sometimes the messages need to be transformed before they can be consumed to achieve a business purpose. for example, a producer uses a plain xml as its payload to produce a message, while a
consumer is interested in java object or types like plain text ,name-value pairs, or json model. spring integration provides endpoints such as service activators, channel adapters,

 message bridges, gateways, transformers, filters, and routers. in this example how transformers endpoint transform object message to map message. 

references:




high level view


spring-mockrunner.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:c="http://www.springframework.org/schema/c"
	xmlns:context="http://www.springframework.org/schema/context"
	default-autowire="default"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemalocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">


	  <bean id="destinationmanager" class="com.mockrunner.jms.destinationmanager"/>

      <bean id="inboundqueue" factory-bean="destinationmanager" factory-method="createqueue">
      		<constructor-arg index="0" value="mockrunner-in-queue" />
      </bean>
      
      <bean id="outboundqueue" factory-bean="destinationmanager" factory-method="createqueue">
      		<constructor-arg index="0" value="mockrunner-out-queue" />
      </bean>
      
      <bean id="configurationmanager" class="com.mockrunner.jms.configurationmanager"/>
      
      <bean id="jmsqueueconnectionfactory" class="com.mockrunner.mock.jms.mockqueueconnectionfactory">
      		<constructor-arg index="0" ref="destinationmanager" />
      		<constructor-arg index="1" ref="configurationmanager" />
      </bean>
      
      <bean id="jmstemplate" class="org.springframework.jms.core.jmstemplate">
      	<property name="connectionfactory" ref="jmsqueueconnectionfactory"/>
      </bean>
     
     <jms:listener-container  connection-factory="jmsqueueconnectionfactory" >
			<jms:listener destination="mockrunner-out-queue" ref="mapmessagelistener" method="onmessage" />
	  </jms:listener-container>
	  
	  <bean id="mapmessagelistener" class="com.spijb.listener.mapmessagelistener" />
      
</beans>
in spring-mockrunner.xml file, i defined mockqueue, mockqueueconnectionfactory for inbound queue, and outbound queue for quick testing purpose. inboundqueue is where you will publish object message from
objecttomaptransformertest.java class. outboundqueue where this queue expecting mapmessage type object and this queue is listing mapmessagelistener.java class. for more information mockrunner works please check my previous article mockrunner with spring jms  . 
 pom.xml 
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelversion>4.0.0</modelversion>
  <groupid>org.springframework.samples</groupid>
  <artifactid>spring-int-jms-basic</artifactid>
  <version>0.0.1-snapshot</version>
  
  <properties>

		<!-- generic properties -->
		<java.version>1.6</java.version>
		<project.build.sourceencoding>utf-8</project.build.sourceencoding>
		<project.reporting.outputencoding>utf-8</project.reporting.outputencoding>

		<!-- spring -->
		<spring-framework.version>3.2.3.release</spring-framework.version>
		
		<!-- logging -->
		<logback.version>1.0.13</logback.version>
		<slf4j.version>1.7.5</slf4j.version>

		<!-- test -->
		<junit.version>4.11</junit.version>

	</properties>
	
	<dependencies>
		<!-- spring and transactions -->
		<dependency>
			<groupid>org.springframework</groupid>
			<artifactid>spring-context</artifactid>
			<version>${spring-framework.version}</version>
		</dependency>
		<dependency>
			<groupid>org.springframework</groupid>
			<artifactid>spring-tx</artifactid>
			<version>${spring-framework.version}</version>
		</dependency>
		<dependency>
			<groupid>org.springframework.integration</groupid>
			<artifactid>spring-integration-core</artifactid>
			<version>2.2.4.release</version>
		</dependency>

		<dependency>
			<groupid>org.springframework.integration</groupid>
			<artifactid>spring-integration-jmx</artifactid>
			<version>2.2.4.release</version>
		</dependency>
		<dependency>
			<groupid>org.springframework.integration </groupid>
			<artifactid>spring-integration-jms</artifactid>
			<version>2.2.4.release</version>
		</dependency>
		

		<!-- logging with slf4j & logback -->
		<dependency>
			<groupid>org.slf4j</groupid>
			<artifactid>slf4j-api</artifactid>
			<version>${slf4j.version}</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupid>ch.qos.logback</groupid>
			<artifactid>logback-classic</artifactid>
			<version>${logback.version}</version>
			<scope>runtime</scope>
		</dependency>

	
		<!-- test artifacts -->
		<dependency>
			<groupid>org.springframework</groupid>
			<artifactid>spring-test</artifactid>
			<version>${spring-framework.version}</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupid>junit</groupid>
			<artifactid>junit</artifactid>
			<version>${junit.version}</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupid>com.mockrunner</groupid>
			<artifactid>mockrunner-jms</artifactid>
			<version>1.0.3</version>
		</dependency>
		<dependency>
			<groupid>javax.jms</groupid>
			<artifactid>jms</artifactid>
			<version>1.1</version>
		</dependency>
		<dependency>
		    <groupid>org.codehaus.jackson</groupid>
		    <artifactid>jackson-mapper-asl</artifactid>
		    <version>1.9.3</version>
		    <scope>compile</scope>
		</dependency>

	</dependencies>	
</project>
 spring-int-jms.xml 
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:c="http://www.springframework.org/schema/c"
	xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:context="http://www.springframework.org/schema/context"
	default-autowire="default"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
	xsi:schemalocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd
		http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.2.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
		
	  <import resource="spring-mockrunner.xml"/> 
      
      <int:poller default="true" fixed-delay="50"/>

      <int:channel id="inputchannel">
      	<int:queue  capacity="5"/>
      </int:channel>
      
       <int:channel id="outputchannel">
      	<int:queue  capacity="5"/>
      </int:channel>

	<int:object-to-map-transformer id="objecttomaptransformer" input-channel="inputchannel" output-channel="outputchannel">
	</int:object-to-map-transformer>      
       
	 <int-jms:inbound-channel-adapter  id="inboundjmsadapter"  connection-factory="jmsqueueconnectionfactory"  destination="inboundqueue" channel="inputchannel">
	 	<int:poller  fixed-rate="1000" />
	 </int-jms:inbound-channel-adapter>
	 
	 <int-jms:outbound-channel-adapter id="outboundjmsadapter" channel="outputchannel" connection-factory="jmsqueueconnectionfactory" destination="outboundqueue">
	 	<int:poller  fixed-rate="1000" />
	 </int-jms:outbound-channel-adapter>
	 

</beans>
 the endpoint is configured to connect to a jms server, fetch the messages,and publish them onto a local channel i.e inputchannel. where as connection-factory, and destination referred mockqueueconnectionfactory, and mockqueue(inboundqueue)  beans from spring-mockrunner.xml file. 

mapmessagelistener.java

package com.spijb.listener;

import javax.jms.jmsexception;
import javax.jms.mapmessage;
import javax.jms.session;

import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.jms.listener.sessionawaremessagelistener;

public class mapmessagelistener implements sessionawaremessagelistener<mapmessage> {

   private static final logger log = loggerfactory.getlogger(mapmessagelistener.class);
   @override
   public void onmessage(mapmessage message, session session) throws jmsexception {
      
      log.info("message received \r\n"+message);
      
   }

}

it is plain  mapmessagelistener class to print received message from queue. 

department.java

package com.spijb.domain;

import java.io.serializable;

public class department implements serializable{

   private static final long serialversionuid = 1l;
   private final integer deptno;
   private final string  name;
   private final string  location;
   
   public department()
   {
      deptno=10;
      name="sales";
      location="tx";
   }
   
   public department(integer dno,string name,string loc)
   {
      this.deptno=dno;
      this.name=name;
      this.location=loc;
   }

   public integer getdeptno() {
      return deptno;
   }

   public string getname() {
      return name;
   }

   public string getlocation() {
      return location;
   }
   @override
   public string tostring()
   {
      return this.deptno+"-> "+this.name+"->"+this.location;
   }
   
}

domain object  to send as a message, by default constructor assign deptno 10 , name as sales, location as tx also provide parameter constructor.

spring junit  class objecttomaptransformertest.java

package com.spijb.invoker;

import javax.jms.jmsexception;
import javax.jms.message;
import javax.jms.objectmessage;
import javax.jms.session;

import org.junit.test;
import org.junit.runner.runwith;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.jms.core.jmstemplate;
import org.springframework.jms.core.messagecreator;
import org.springframework.test.context.contextconfiguration;
import org.springframework.test.context.junit4.springjunit4classrunner;

import com.mockrunner.mock.jms.mockqueue;
import com.spijb.domain.department;

@runwith(springjunit4classrunner.class)
@contextconfiguration({"classpath:spring-mockrunner.xml","classpath:spring-int-jms.xml"})
public class objecttomaptransformertest {
   
   @autowired
   private jmstemplate jmstemplate;
   
   @autowired
   private mockqueue inboundqueue;
   
   
   @test
   public void shouldsendmessage() throws interruptedexception
   {
     final department defaultdepartment = new department();
     jmstemplate.send(inboundqueue,new messagecreator() {
      
      @override
      public message createmessage(session session) throws jmsexception {
        
         objectmessage objectmessage = session.createobjectmessage();
         objectmessage.setobject(defaultdepartment);
         return objectmessage;
      }
   });
     thread.sleep(5000);
   }
}

spring with junit class where you can send message to inputchannel i.e inboundqueue using mockrunner.

output :

info: started inboundjmsadapter
oct 06, 2014 1:24:25 pm org.springframework.integration.endpoint.abstractendpoint start
info: started org.springframework.integration.config.consumerendpointfactorybean#1
13:24:26.882 [org.springframework.jms.listener.defaultmessagelistenercontainer#0-1] info  c.spijb.listener.mapmessagelistener - message received 
com.mockrunner.mock.jms.mockmapmessage: {location=tx, name=sales, deptno=10}
oct 06, 2014 1:24:30 pm org.springframework.context.support.abstractapplicationcontext doclose
info: closing org.springframework.context.support.genericapplicationcontext@5840979b: startup date [mon oct 06 13:24:25 cdt 2014]; root of context hierarchy
oct 06, 2014 1:24:30 pm org.springframework.context.support.defaultlifecycleprocessor$lifecyclegroup stop
info: stopping beans in phase 2147483647

in the above highlighted one is output as map.

 

 

 

 

Top