Pages

Tuesday, June 30, 2015

Post Json message to JMS Queue of Hornet Queue on JBoss

Here we do Posting the Json string contained JMS text message into a JMS HornetQ on Jboss server.
Here we are using Clustered based JMS setup on Jboss.

Test data contains the below json texts in *.json files.
For example TC1.json file contains below texts:
1
2
3
4
5
{
   "ID":"clkeidkerk948fjfmfkk49k",
   "name":"Purushottam",
   "value":"20000010"
}


Below are the steps:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.commons.io.FileUtils;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.UDPBroadcastGroupConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.jms.client.HornetQConnectionFactory;

public class PostQueue {


    public static QueueConnection connection = null;

    public PostQueue() {
    }

    private void initJMSLister() {
 
 try {
     String discoveryGroupName = "dg-group1";
     String groupAddress = "231.7.7.7";
     int groupPort = 9876;
     int localPort = -1;
     String localBindAddress = "142.133.174.76";
     long refreshWaitTimeout = 1000;
     long initialWaitTimeout = 5000;
above is the starting the code .

Step2:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
     UDPBroadcastGroupConfiguration udpConfig = new UDPBroadcastGroupConfiguration(groupAddress, groupPort, localBindAddress, localPort);
     System.out.println("udpConfig=\n" + udpConfig);
     DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(discoveryGroupName, refreshWaitTimeout, initialWaitTimeout,udpConfig);
  System.out.println("groupConfiguration=\n" + groupConfiguration);
     HornetQConnectionFactory jmsConnectionFactory = HornetQJMSClient.createConnectionFactoryWithHA(groupConfiguration,JMSFactoryType.CF);
  System.out.println("jmsConnectionFactory=\n" + jmsConnectionFactory);
  
     String queueName = "QueueName1";
     final Queue queue = HornetQJMSClient.createQueue(queueName);
     connection = jmsConnectionFactory.createQueueConnection("testuser", "testuser@1");
  System.out.println("connection=\n" + connection);
     final QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
     QueueReceiver receiver = session.createReceiver(queue);
     receiver.setMessageListener(new MessageListener() {
          @Override
          public void onMessage(Message message) {
            
            System.out.println("Message received.");
            TextMessage eventMessage = (TextMessage) message;
            String jsonEventMessage = null;
            try {
              jsonEventMessage = eventMessage.getText();
              
              System.out.println("Message Id: "+jsonEventMessage);
            }catch(Exception e){e.printStackTrace();}
            
          }
        });
     connection.start();


Step3: This part will explained in 3 sections as shown below:
Listener part:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
     System.out.println(" Event Queue Listerner Started");
     new Thread() {
  public void run() {
      System.out.println(" Event Queue Sender Started");
      try {
      javax.jms.QueueSender sender1 = session.createSender(queue);
   
            String[] ext = {"json"};
            Iterator<File> it = FileUtils.iterateFiles(new File("/opt/jboss/testjms/testdata"), ext, true);
            while(it.hasNext()){
              File test = (File)it.next();
              String tmp = FileUtils.readFileToString(test, "UTF-8");
              System.out.println(test.getName()+"\n"+tmp);
              
              TextMessage tm = session.createTextMessage(tmp);
              sender1.send(tm);
              
              System.out.println("msg. "+test.getName() + " sent.");
              Thread.sleep(1000);
            }
          } catch (Exception e1) {
            e1.printStackTrace();
          }
  }

     }.start();


Shutdown hook for JMS connection:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
     Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {   
   if(connection!=null)
    try {
        connection.close();
        System.out.println("connection closed");
    } catch (JMSException e) {
        e.printStackTrace();
    }
      }
     });

 } catch (Exception ee) {
     ee.printStackTrace();
 }
    }


Main method:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public static void main(String[] args) {

 try {
     PostQueue ab = new PostQueue();
     ab.initJMSLister();
 } catch (Exception ex) {
     ex.printStackTrace();
 }
    }
}

No comments:

Post a Comment