Saturday, July 25, 2015

Apache Camel: Splitter Component And Aggregation EIP

Today, I'm going to explain about the Camel's Splitter component. I will also explain how and when to use this component. Here, I used Spring DSL to configure the splitter. So, let's start the camel riding.

The splitter component is based on Splitter EIP which allows you to split your incoming message into no. of smaller pieces and process them independently. It's considered as parallel processing of bulk messages which comes as a single bundled. The splitted message will be set as a body of message in newly created exchange which contains all the headers, properties of parent exchange. This component supports various data types including xml, json, csv, etc. This is about when to use this component.

There are various options available for this component which you can configure in spring dsl by setting respective attributes along with its value to make it more powerful route. Along with Splitter component I will also explain about the Aggregation EIP. The aggregation EIP is used to collect output from parallelly processed splitted messages. I think this is sufficient explanation for Splitter component. You can read from official Apache Camel website.



Now, Let's start the programming. I assume that you have created your project by using Maven.

<!-- Incoming XML -->
<Orders>
<Order>
<Header>...</Header>
<LineItems>
<LineItem>...</LineItem>
</LineItems>
<Footer>...</Footer>
</Order>
.
.
.
<Order>
<Header>...</Header>
<LineItems>
<LineItem>...</LineItem>
</LineItems>
<Footer>...</Footer>
</Order>
</Orders>

Now, Sping DSL for Splitter Copmonent,

<!-- This bean is used to collect the output from the splitted messages from splitter component -->
<bean id="aggregationBean" class="com.makhir.beans.AggregationStrategyBean" />

<camel:route id="SplitterExampleRoute">
<camel:from uri="direct:SplitterExampleRoute" />

<camel:log logName="SplitterExampleRoute" loggingLevel="INFO"
message="Entered in Splitter Component : ${body}" />

<camel:split strategyRef="aggregationBean" stopOnException="false">
<camel:xpath>//Orders/Order</camel:xpath>
<camel:to uri="direct:SplittedMessageProcessingRoute" />
</camel:split>

</camel:route>

<camel:route id="SplittedMessageProcessingRoute">
<camel:from uri="direct:SplittedMessageProcessingRoute" />

<camel:log logName="SplittedMessageProcessingRoute" loggingLevel="INFO"
message="Entered in SplittedMessageProcessingRoute : ${body}" />
<!--
Add your business logic for validation, transformation, etc.
-->
<camel:to uri="direct:some-route"/>
</camel:route>

Now, I'm writing the aggregation class which is as per below,

package com.makhir.beans;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class AggregationStrategyBean implements AggregationStrategy {

private static Logger logger = LoggerFactory.getLogger(AggregationStrategyBean.class);

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

try {
if (oldExchange == null || oldExchange.getIn() == null
|| oldExchange.getIn().getBody() == null
|| oldExchange.getIn().getBody(String.class) == null) {
checkAndProcessExchange(newExchange);
return newExchange;
} else if (newExchange == null || newExchange.getIn() == null
|| newExchange.getIn().getBody() == null
|| newExchange.getIn().getBody(String.class) == null) {
return oldExchange;
}
//Add your business logic for capturing the information from the splitted message(newExchange)
//and set it into parent message(oldExchange)

} catch (Exception e) {
logger.error("There was a problem while aggregating the data, error message " + e.getMessage());
}
return oldExchange;
}
}

That's it. You can try this by adding above routes in your camel context as per your requirement.

I hope you have enjoyed this camel ride.

Reference: Apache Camel

No comments:

Post a Comment