Wednesday, September 26, 2012

ServiceMix ESB: Quartz Integration


ServiceMix: Enterprise Service Bus.

Here, I am showing that how you integrate the Quartz component with Apache ServiceMix.

I assume that you some knowledge about ServiceMix and its architecture. I give you higher over view of ServiceMix.

Apache ServiceMix is a runtime container for service-oriented architecture components, web services or legacy system connectivity services. It is one of the most mature, open-source implementations of an enterprise service bus and an Apache top-level project.

For more, you can visit on http://servicemix.apache.org









I am not going to in detail of ServiceMix and its component and its working style. You can easily refer from various blogs and websites.

Let’s start implementing the Quartz component your existing running application which uses Apache ServiceMix.

First download the quartz jar file from its official site. Here i have used quartz-1.5.2.jar and add this jar to your lib folder.

Please, add below entry into say servicemix.xml [e.g. config file where you have specified all your component in spring’s bean like passion],

--------------------------------------------
<sm:activationSpec componentName="QuartzSchedular" service="foo:QuartzSchedular"  destinationService="foo:extremeEmailNotifier">
<sm:component>
<bean class="com.makhir.ExtremeQaurtzSchedular" id="QuartzSchedular">
<property name="marshaler">
<bean class="org.apache.servicemix.components.quartz.DefaultQuartzMarshaler" />
</property>
<property name="triggers">
<map>
<entry>
<key>
<bean class="org.quartz.SimpleTrigger">
<property name="repeatInterval" value="30000" /><!--Time in which your component will start its execution-->
<property name="repeatCount" value="-1" />
</bean>
</key>
<bean class="com.makhir.quartz.ExtremeQuartzJob">
<property name="name" value="makhirJob" />
<property name="group" value="ServiceMix" />
</bean>
</entry>
</map>
</property>
</bean>
</sm:component>
</sm:activationSpec>
----------------------------------------------------


Now, it’s time create a full component which is our primary goal.
Below is sample code which you needs to make your component into a working condition,
-------------
public class ExtremeQuartzSchedular extends QuartzComponent{
private final Log log = LogFactory.getLog(ExtremeQuartzSchedular.class);
private Map triggers;
private SchedulerFactory factory;
private Scheduler scheduler;
private WorkManager workManager = null;
@Override
public void setScheduler(Scheduler scheduler) {
   this.scheduler = scheduler;
}*/
@Override
public void setMarshaler(QuartzMarshaler marshaler) {
super.setMarshaler(marshaler);
}
@SuppressWarnings("rawtypes")
@Override
public void onJobExecute(JobExecutionContext context) throws JobExecutionException {
log.debug("Enter : onJobExecute");
try {
ComponentContextImpl contextImpl = (ComponentContextImpl) getContext();
workManager = contextImpl.getWorkManager();
File pollDirectory = new File("C:\\makhir\\workspace\\XJavaTest\\xmls");
File[] files = pollDirectory.listFiles();
for(int index = 0; index < files.length; index ++){
initializeJob(files[index]);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/*public Map getTriggers(){
return this.triggers;
}
public void setTriggers(Map triggers){
this.triggers = triggers;
}*/
protected void initializeJob(final File processFile){
log.debug("Enter : initializeJob");
log.debug("Proccessed file name : " + processFile.getName());
try {
workManager.scheduleWork(new Work() {
@Override
public void run() {
try {
doJob(processFile);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void release() {
}
});
// this code is for handling the work to be handled via workmanager with additional feature listener
/*workManager.scheduleWork(new Work() {
@Override
public void run() {
try {
doJob(processFile);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void release() {
}
}, 1000, new ExecutionContext(), new ExtremeWorkListner());*/ 
// 1000(1 sec) miliseconds within this time work must be started otherwise work will be rejected with rejected exception
} catch (WorkException e) {
log.error("WorkException :  Error while initialzing the job, " + e.getMessage());
e.printStackTrace();
} catch (Exception e) {
log.error("Exception :  Error while initialzing the job, " + e.getMessage());
}
log.debug("Exit : initializeJob");
}
protected void doJob(File file) throws IOException{
log.debug("Enter : doJob");
InputStream inputStream = null;
try {
inputStream = new BufferedInputStream(new FileInputStream(file));
MessageExchangeFactory exchanageFactory = getExchangeFactory();
InOut outMessage = exchanageFactory.createInOutExchange();
NormalizedMessage message = outMessage.createMessage();
message.setProperty("toEmailid", new String("abc@xyz.com"));
message.setProperty("fromEmailid", new String("notifications@xyz.com"));
message.setProperty("emailSubject", new String("How are you?"));
message.setProperty("bodyContent", convertStreamtoString(inputStream)));
outMessage.setInMessage(message);
send(outMessage);
} catch (MessagingException e) {
log.error("MessagingException : There was a problem exchanging the message");
e.printStackTrace();
} catch (JBIException e) {
log.error("JBIException : There was a problem exchanging the message");
e.printStackTrace();
}catch (Exception e) {
e.printStackTrace(); 
}
finally{
try {
if (inputStream != null)
inputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
log.debug("Exit : doJob");
}
@SuppressWarnings("unused")
private String convertStreamtoString(InputStream stream) throws IOException{
log.debug("Enter : convertStreamtoString");
StringBuffer buffer = new StringBuffer();
   byte[] b = new byte[4096];
   for (int n; (n = stream.read(b)) != -1;) {
     buffer.append(new String(b, 0, n));
   }
   String messageContent = buffer.toString();
   log.debug("Converted input stream to string : " + messageContent);
   log.debug("Exit : convertStreamtoString");
return messageContent;
}
}



--------------------------

Now I will describe you how this component works.

After doing all this stuffs, when you start your ServiceMix integrated server, the ExtremeQuartzSchedular executes with all the initialization done by ServiceMix itself.

Here, I have use this component for looking some directory from where files will be picked up and email their content to someone.

The NormalizedMessage will be sent to the destinationService as defined in config file.

Here, this message will send to extremeEmailNotifier.

That's it.

I will explain later on about extremeEmailNotifier which is basically for sending email.

Enjoy with ServiceMix.

Feedback and questions are welcome.

Regards,
Ashish Mishra (makhir)