Spring Integration with JMS and Map Transformers
in this article i explained how spring built-in transformers works for while transforming object message to map message.
sometimes the messages need to be transformed before they can be consumed to achieve a business purpose. for example, a producer uses a plain xml as its payload to produce a message, while a
consumer is interested in java object or types like plain text ,name-value pairs, or json model. spring integration provides endpoints such as service activators, channel adapters,
message bridges, gateways, transformers, filters, and routers. in this example how transformers endpoint transform object message to map message.
references:
- spring integration
- spring with jms
- spring with junit
- mockrunner
- sts
high level view
spring-mockrunner.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:c="http://www.springframework.org/schema/c" xmlns:context="http://www.springframework.org/schema/context" default-autowire="default" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemalocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <bean id="destinationmanager" class="com.mockrunner.jms.destinationmanager"/> <bean id="inboundqueue" factory-bean="destinationmanager" factory-method="createqueue"> <constructor-arg index="0" value="mockrunner-in-queue" /> </bean> <bean id="outboundqueue" factory-bean="destinationmanager" factory-method="createqueue"> <constructor-arg index="0" value="mockrunner-out-queue" /> </bean> <bean id="configurationmanager" class="com.mockrunner.jms.configurationmanager"/> <bean id="jmsqueueconnectionfactory" class="com.mockrunner.mock.jms.mockqueueconnectionfactory"> <constructor-arg index="0" ref="destinationmanager" /> <constructor-arg index="1" ref="configurationmanager" /> </bean> <bean id="jmstemplate" class="org.springframework.jms.core.jmstemplate"> <property name="connectionfactory" ref="jmsqueueconnectionfactory"/> </bean> <jms:listener-container connection-factory="jmsqueueconnectionfactory" > <jms:listener destination="mockrunner-out-queue" ref="mapmessagelistener" method="onmessage" /> </jms:listener-container> <bean id="mapmessagelistener" class="com.spijb.listener.mapmessagelistener" /> </beans>
in spring-mockrunner.xml file, i defined mockqueue, mockqueueconnectionfactory for inbound queue, and outbound queue for quick testing purpose. inboundqueue is where you will publish object message from
objecttomaptransformertest.java class. outboundqueue where this queue expecting mapmessage type object and this queue is listing mapmessagelistener.java class. for more information mockrunner works please check my previous article mockrunner with spring jms
.
pom.xml
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>org.springframework.samples</groupid> <artifactid>spring-int-jms-basic</artifactid> <version>0.0.1-snapshot</version> <properties> <!-- generic properties --> <java.version>1.6</java.version> <project.build.sourceencoding>utf-8</project.build.sourceencoding> <project.reporting.outputencoding>utf-8</project.reporting.outputencoding> <!-- spring --> <spring-framework.version>3.2.3.release</spring-framework.version> <!-- logging --> <logback.version>1.0.13</logback.version> <slf4j.version>1.7.5</slf4j.version> <!-- test --> <junit.version>4.11</junit.version> </properties> <dependencies> <!-- spring and transactions --> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-context</artifactid> <version>${spring-framework.version}</version> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-tx</artifactid> <version>${spring-framework.version}</version> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-core</artifactid> <version>2.2.4.release</version> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-jmx</artifactid> <version>2.2.4.release</version> </dependency> <dependency> <groupid>org.springframework.integration </groupid> <artifactid>spring-integration-jms</artifactid> <version>2.2.4.release</version> </dependency> <!-- logging with slf4j & logback --> <dependency> <groupid>org.slf4j</groupid> <artifactid>slf4j-api</artifactid> <version>${slf4j.version}</version> <scope>compile</scope> </dependency> <dependency> <groupid>ch.qos.logback</groupid> <artifactid>logback-classic</artifactid> <version>${logback.version}</version> <scope>runtime</scope> </dependency> <!-- test artifacts --> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-test</artifactid> <version>${spring-framework.version}</version> <scope>test</scope> </dependency> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>${junit.version}</version> <scope>test</scope> </dependency> <dependency> <groupid>com.mockrunner</groupid> <artifactid>mockrunner-jms</artifactid> <version>1.0.3</version> </dependency> <dependency> <groupid>javax.jms</groupid> <artifactid>jms</artifactid> <version>1.1</version> </dependency> <dependency> <groupid>org.codehaus.jackson</groupid> <artifactid>jackson-mapper-asl</artifactid> <version>1.9.3</version> <scope>compile</scope> </dependency> </dependencies> </project>
spring-int-jms.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:c="http://www.springframework.org/schema/c" xmlns:int="http://www.springframework.org/schema/integration" xmlns:context="http://www.springframework.org/schema/context" default-autowire="default" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xsi:schemalocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <import resource="spring-mockrunner.xml"/> <int:poller default="true" fixed-delay="50"/> <int:channel id="inputchannel"> <int:queue capacity="5"/> </int:channel> <int:channel id="outputchannel"> <int:queue capacity="5"/> </int:channel> <int:object-to-map-transformer id="objecttomaptransformer" input-channel="inputchannel" output-channel="outputchannel"> </int:object-to-map-transformer> <int-jms:inbound-channel-adapter id="inboundjmsadapter" connection-factory="jmsqueueconnectionfactory" destination="inboundqueue" channel="inputchannel"> <int:poller fixed-rate="1000" /> </int-jms:inbound-channel-adapter> <int-jms:outbound-channel-adapter id="outboundjmsadapter" channel="outputchannel" connection-factory="jmsqueueconnectionfactory" destination="outboundqueue">
<int:poller fixed-rate="1000" /> </int-jms:outbound-channel-adapter> </beans>
the endpoint is configured to connect to a jms server, fetch the messages,and publish them onto a local channel i.e inputchannel. where as connection-factory, and destination referred mockqueueconnectionfactory, and mockqueue(inboundqueue) beans from spring-mockrunner.xml file.
- inputchannel and outputchannel defined as queue channel
- objecttomaptransformer: object-to-map-transformer element that takes the payload from the input channel original here mockrunner-in-queue object message and emits a name-value paired map object onto the output channel i.e outputchannel and outboundjmsadapter bean fetch this message and publish to queue i.e mockrunner-out-queue.
- inboundjmsadapter : inbound-channel-adapter bean is responsible for receiving messages from a jms server here it is reading from mock queue name mockrunner-in-queue see objecttomaptransformertest.java class.
- outboundjmsadapter : outbound-channel-adapter bean is responsible to fetch messages from the channel i.e outputchannel and publish them to jms queue or topic. in this outbounjmsadapter reading message outputchannel as mapmessage and publish to outboundqueue(mockrunner-out-queue).
mapmessagelistener.java
package com.spijb.listener; import javax.jms.jmsexception; import javax.jms.mapmessage; import javax.jms.session; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.jms.listener.sessionawaremessagelistener; public class mapmessagelistener implements sessionawaremessagelistener<mapmessage> { private static final logger log = loggerfactory.getlogger(mapmessagelistener.class); @override public void onmessage(mapmessage message, session session) throws jmsexception { log.info("message received \r\n"+message); } }
it is plain mapmessagelistener class to print received message from queue.
department.java
package com.spijb.domain; import java.io.serializable; public class department implements serializable{ private static final long serialversionuid = 1l; private final integer deptno; private final string name; private final string location; public department() { deptno=10; name="sales"; location="tx"; } public department(integer dno,string name,string loc) { this.deptno=dno; this.name=name; this.location=loc; } public integer getdeptno() { return deptno; } public string getname() { return name; } public string getlocation() { return location; } @override public string tostring() { return this.deptno+"-> "+this.name+"->"+this.location; } }
domain object to send as a message, by default constructor assign deptno 10 , name as sales, location as tx also provide parameter constructor.
spring junit class objecttomaptransformertest.java
package com.spijb.invoker; import javax.jms.jmsexception; import javax.jms.message; import javax.jms.objectmessage; import javax.jms.session; import org.junit.test; import org.junit.runner.runwith; import org.springframework.beans.factory.annotation.autowired; import org.springframework.jms.core.jmstemplate; import org.springframework.jms.core.messagecreator; import org.springframework.test.context.contextconfiguration; import org.springframework.test.context.junit4.springjunit4classrunner; import com.mockrunner.mock.jms.mockqueue; import com.spijb.domain.department; @runwith(springjunit4classrunner.class) @contextconfiguration({"classpath:spring-mockrunner.xml","classpath:spring-int-jms.xml"}) public class objecttomaptransformertest { @autowired private jmstemplate jmstemplate; @autowired private mockqueue inboundqueue; @test public void shouldsendmessage() throws interruptedexception { final department defaultdepartment = new department(); jmstemplate.send(inboundqueue,new messagecreator() { @override public message createmessage(session session) throws jmsexception { objectmessage objectmessage = session.createobjectmessage(); objectmessage.setobject(defaultdepartment); return objectmessage; } }); thread.sleep(5000); } }
spring with junit class where you can send message to inputchannel i.e inboundqueue using mockrunner.
output :
info: started inboundjmsadapter oct 06, 2014 1:24:25 pm org.springframework.integration.endpoint.abstractendpoint start info: started org.springframework.integration.config.consumerendpointfactorybean#1 13:24:26.882 [org.springframework.jms.listener.defaultmessagelistenercontainer#0-1] info c.spijb.listener.mapmessagelistener - message received com.mockrunner.mock.jms.mockmapmessage: {location=tx, name=sales, deptno=10} oct 06, 2014 1:24:30 pm org.springframework.context.support.abstractapplicationcontext doclose info: closing org.springframework.context.support.genericapplicationcontext@5840979b: startup date [mon oct 06 13:24:25 cdt 2014]; root of context hierarchy oct 06, 2014 1:24:30 pm org.springframework.context.support.defaultlifecycleprocessor$lifecyclegroup stop info: stopping beans in phase 2147483647
in the above highlighted one is output as map.