Sunday, July 26, 2015

Apache Camel: Integration with JMS and ActiveMQ

Today, I will explain about how to work with JMS(Java Message Service) in your SOA/ESB. JMS is powerful API to exchange data between different systems now days. I am not going to in depth of JMS. I will explain just basic of it. To use JMS we require some middleware message broker which persist data for sometime. There mainly two ways of exchanging the data. 1) Point-to-Point (Queue) 2) Publish-Subscribe (Topic). You can use any of them based on your business requirement.

Point-to-Point

Publisher-Subscribers

Reference : JavaTPoint. You can read more details of JMS from given reference.

JMS Flow

Now, Let me give some basic about middleware message broker in SOA/ESB based applications. Message Broker is the tools which is placed between producer and consumer to exchange the data using formal messaging protocol. You can have clear picture by referring below images which I found on RedHat portal. There are many provider of message brokers. The notable and popular brokers are ActiveMQ and RabbitMQ. Here, I have used ActiveMQ as middleware message broker. You can read more about ActiveMQ from here.

Reference: RedHat
Now, Let's Integrate JMS and ActiveMQ using Apache Camel. We will go step by step to configure the JMS and ActiveMQ in using Apache Camel in Jboss Fuse ESB. I hope, you have already setup the all the things to build and run camel based apps in Jboss Fuse ESB.

Step 1: To add JMS and ActiveMQ maven dependency in your project.
Add below lines of code in your projects pom.xml.

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-jms</artifactId>
    <version>x.x.x</version>
    <!-- Your camel's version -->
</dependency>

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
   <version>x.x.x</version>
   <!-- Check Apache Camel's version and find the appropriate activemq version -->
</dependency>

After adding this clean and build your project so, maven can load the dependent library in //users/.m2

Step 2: Create activemq.xml
Creates the activemq.xml in which you have to define the where to store the messages which comes to ActiveMQ. What are the broker name, what is the default directory for storage, what will be the persistence adapter for storage, memory usage etc. Place this file in the server classpath or conf directory from where the server looks up config related things.

For Jboss Fuse ESB, you should place this activemq.xml in etc folder of your server. Please verify that this is added in the config file with name activemq-karaf-{activemq-karaf-version}.redhat-{fuse-version}-activemq.cfg. In this file you can find the ActiveMQ related configuration like as per below and this file reside in
\\server-base-dir\system\org\apache\activemq\activemq-karaf\{activemq-karaf-version}.redhat-{fuse-version}

broker-name=amq-broker
data=${karaf.data}/${broker-name}
config=${karaf.base}/etc/activemq.xml

Now, let's add the beans related to ActiveMQ and routes which produce and consume the message to from ActiveMQ message broker. I will also show how to send/receive to/from queue and topic.

Below are the beans which manage the transactions to exchange the messages between camel and ActiveMQ,

        <bean id="activeMQConFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="userName" value="userName_activemq"></property>
<property name="password" value="password_activemq"></property>
</bean>

<bean id="pooledConFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="maxConnections" value="5" />
<property name="connectionFactory" ref="activeMQConFactory" />
</bean>

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConFactory" />
<property name="concurrentConsumers" value="5" />
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig" />
</bean>

JMS Queue: Produce and consume messages using JMS Queue. Add below routes in your camel context file.

<!-- Route which sends message to the jms-queue with name test-queue -->

<camel:route id="MessageSenderToJMSQueue">
<camel:from uri="direct:SomeTestRoute" />

<camel:log logName="MessageSenderToJMSQueue" loggingLevel="INFO"
message="Entered in MessageSenderToJMSQueue : ${body}" />

<camel:removeHeaders pattern="*" />

<camel:to uri="activemq:queue:test-queue" />
</camel:route>

<!-- Route which receive message from the test-queue with name queue-name -->

<camel:route id="MessageReceiverFromJMSQueue">
<camel:from uri="activemq:queue:test-queue" />

<camel:log logName="MessageReceiverFromJMSQueue" loggingLevel="INFO"
message="Entered in MessageReceiverFromJMSQueue : ${body}" />

<camel:to uri="direct:SomeTestRoute" />
</camel:route>

JMS Topic: Produce and consume messages using JMS Topic. Add below routes in your camel context file.

<!-- Route which sends message to the jms-topic with name test-topic -->

<camel:route id="MessageSenderToJMSTopic">
<camel:from uri="direct:SomeTestRoute" />

<camel:log logName="MessageSenderToJMSTopic" loggingLevel="INFO"
message="Entered in MessageSenderToJMSTopic : ${body}" />

<camel:removeHeaders pattern="*" />

<camel:to uri="activemq:queue:test-topic" />
</camel:route>

<!-- Route which receive message from the jms-topic with name test-topic -->

<camel:route id="MessageReceiverFromJMSTopic">
<camel:from uri="activemq:topic:test-topic" />

<camel:log logName="MessageReceiverFromJMSTopic" loggingLevel="INFO"
message="Entered in MessageReceiverFromJMSTopic : ${body}" />

<camel:to uri="direct:SomeTestRoute" />
</camel:route>

That's it. Now, build your application and run on Fuse Server. Your apps is ready to produce and consume messages from ActiveMQ using JMS API.

I hope you have enjoyed this camel riding. Please don't forget to give your feedback and suggestion to improve this article and of course my blog writing.

Thanks a lot for your time to read this blog.

Saturday, July 25, 2015

EIP : Enterprise Integration Patterns


EIP Differentiated based on its works.

Reference : EIP

Apache Camel: Splitter Component And Aggregation EIP

Today, I'm going to explain about the Camel's Splitter component. I will also explain how and when to use this component. Here, I used Spring DSL to configure the splitter. So, let's start the camel riding.

The splitter component is based on Splitter EIP which allows you to split your incoming message into no. of smaller pieces and process them independently. It's considered as parallel processing of bulk messages which comes as a single bundled. The splitted message will be set as a body of message in newly created exchange which contains all the headers, properties of parent exchange. This component supports various data types including xml, json, csv, etc. This is about when to use this component.

There are various options available for this component which you can configure in spring dsl by setting respective attributes along with its value to make it more powerful route. Along with Splitter component I will also explain about the Aggregation EIP. The aggregation EIP is used to collect output from parallelly processed splitted messages. I think this is sufficient explanation for Splitter component. You can read from official Apache Camel website.



Now, Let's start the programming. I assume that you have created your project by using Maven.

<!-- Incoming XML -->
<Orders>
<Order>
<Header>...</Header>
<LineItems>
<LineItem>...</LineItem>
</LineItems>
<Footer>...</Footer>
</Order>
.
.
.
<Order>
<Header>...</Header>
<LineItems>
<LineItem>...</LineItem>
</LineItems>
<Footer>...</Footer>
</Order>
</Orders>

Now, Sping DSL for Splitter Copmonent,

<!-- This bean is used to collect the output from the splitted messages from splitter component -->
<bean id="aggregationBean" class="com.makhir.beans.AggregationStrategyBean" />

<camel:route id="SplitterExampleRoute">
<camel:from uri="direct:SplitterExampleRoute" />

<camel:log logName="SplitterExampleRoute" loggingLevel="INFO"
message="Entered in Splitter Component : ${body}" />

<camel:split strategyRef="aggregationBean" stopOnException="false">
<camel:xpath>//Orders/Order</camel:xpath>
<camel:to uri="direct:SplittedMessageProcessingRoute" />
</camel:split>

</camel:route>

<camel:route id="SplittedMessageProcessingRoute">
<camel:from uri="direct:SplittedMessageProcessingRoute" />

<camel:log logName="SplittedMessageProcessingRoute" loggingLevel="INFO"
message="Entered in SplittedMessageProcessingRoute : ${body}" />
<!--
Add your business logic for validation, transformation, etc.
-->
<camel:to uri="direct:some-route"/>
</camel:route>

Now, I'm writing the aggregation class which is as per below,

package com.makhir.beans;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class AggregationStrategyBean implements AggregationStrategy {

private static Logger logger = LoggerFactory.getLogger(AggregationStrategyBean.class);

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

try {
if (oldExchange == null || oldExchange.getIn() == null
|| oldExchange.getIn().getBody() == null
|| oldExchange.getIn().getBody(String.class) == null) {
checkAndProcessExchange(newExchange);
return newExchange;
} else if (newExchange == null || newExchange.getIn() == null
|| newExchange.getIn().getBody() == null
|| newExchange.getIn().getBody(String.class) == null) {
return oldExchange;
}
//Add your business logic for capturing the information from the splitted message(newExchange)
//and set it into parent message(oldExchange)

} catch (Exception e) {
logger.error("There was a problem while aggregating the data, error message " + e.getMessage());
}
return oldExchange;
}
}

That's it. You can try this by adding above routes in your camel context as per your requirement.

I hope you have enjoyed this camel ride.

Reference: Apache Camel