Sunday, September 10, 2017

Apache ActiveMQ "Message Queue" and its implementation using java

Hello Guys , Hope you all are doing good , Today we will learn more about activemq so lets get start.

What is ActiveMQ ?
Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server.

Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4. Apache ActiveMQ is released under the Apache 2.0 License

It provide message queue which we can use to publish our message and can create subscriber to get the data from message topic

What is the use of Active MQ ?
Active MQ  where "MQ" stands for message queue  is widely used for the distributed systems where we want one system should not wait for the other system to complete it task , so what basically we do , we provide active mq where one (System A) complete it task put the data in message queue and starts working on other task , (System B ) will get the data stored in message queue by System A and will starts its working on that also , there is a Data delivery Guarantee also a key feature.

So lets see the implementation of Active MQ using java

Download the ActiveMQ Server from here : DOWNLOAD NOW

Maven Dependency

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

<dependency>
    <groupId>com.thoughtworks.xstream</groupId>
    <artifactId>xstream</artifactId>
    <version>1.4.10</version>
</dependency>

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.3</version>
</dependency>




Consumer Code
package com.vp.activemq.consumer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.vp.activemq.producer.HelloWorldProducer;
public class HelloWorldConsumer implements Runnable , javax.jms.ExceptionListener{
public void run() {
// TODO Auto-generated method stub
try{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616/");
Connection con = factory.createConnection();
con.start();
con.setExceptionListener(this);
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination des = session.createQueue(HelloWorldProducer.QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(des);
Message message = consumer.receive(1000);
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
}else {
System.out.println("Received: " + message);
}
consumer.close();
session.close();
con.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
public void onException(JMSException arg0) {
// TODO Auto-generated method stub
System.out.println("some exception occured "+arg0.getErrorCode() );
}
}

Producer Code
package com.vp.activemq.producer;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class HelloWorldProducer implements Runnable {
public static final String QUEUE_NAME="producer.vp";
public void run() {
// TODO Auto-generated method stub
try{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616/");
Connection con =factory.createConnection();
con.start();
// create session
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create destination
Destination destination = session.createQueue(QUEUE_NAME);
// create messageproducer
MessageProducer producer =session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// create the message
String message ="Hello World Producer Message from "+Thread.currentThread().getName() +" "+this.hashCode();
TextMessage m = session.createTextMessage(message);
// tell the producer to send the message
System.out.println("Sent Message "+this.hashCode()+ " by "+Thread.currentThread().getName());
producer.send(m);
session.close();
con.close();
}catch(Exception e){
e.printStackTrace();
}
}
}

Main Class
package com.vp.activemq;
import com.vp.activemq.consumer.HelloWorldConsumer;
import com.vp.activemq.producer.HelloWorldProducer;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args ) throws InterruptedException
{
thread(new HelloWorldProducer(),false);
thread(new HelloWorldProducer(),false);
thread(new HelloWorldProducer(),false);
thread(new HelloWorldProducer(),false);
thread(new HelloWorldProducer(),false);
thread(new HelloWorldProducer(),false);
thread(new HelloWorldProducer(),false);
thread(new HelloWorldProducer(),false);
thread(new HelloWorldProducer(),false);
Thread.sleep(1000);
thread(new HelloWorldConsumer(),false);
thread(new HelloWorldConsumer(),false);
thread(new HelloWorldConsumer(),false);
thread(new HelloWorldConsumer(),false);
thread(new HelloWorldConsumer(),false);
thread(new HelloWorldConsumer(),false);
thread(new HelloWorldConsumer(),false);
thread(new HelloWorldConsumer(),false);
thread(new HelloWorldConsumer(),false);
}
public static void thread(Runnable run , boolean flag){
Thread blockThread = new Thread(run);
blockThread.setDaemon(flag);
blockThread.start();
}
}
view raw App.java hosted with ❤ by GitHub

The Full Source Code you can download from my git respository : DOWNLOAD FULL SOURCE

If you have any issue regarding the program execution , leave me a comment , Will try our best to resolve the program

Thanks for reading
-Noeik