Thursday, July 12, 2012

JMS with ActiveMQ

JMS short for Java Message Service provides a mechanism for integrating applications in a loosely coupled, flexible manner. JMS delivers data asynchronously across applications on a store and forward basis. Applications communicate through MOM(Message Oriented Middleware) which acts as an intermediary without communicating directly.

JMS Architecture

Main components of JMS are:
  • JMS Provider: A messaging system that implements the JMS interfaces and provides administrative and control features
  • Clients: Java applications that send or receive JMS messages. A message sender is called the Producer, and the recipient is called a Consumer
  • Messages: Objects that communicate information between JMS clients
  • Administered objects: Preconfigured JMS objects created by an administrator for the use of clients.
There are several JMS providers available like Apache ActiveMQ and OpenMQ. Here I have used Apache ActiveMQ.

Installing and starting Apache ActiveMQ on windows
  1. Download ActiveMQ windows binary distribution
  2. Extract the it to a desired location
  3. Using the command prompt change the directory to the bin folder inside ActiveMQ installation folder and run the following command to start ActiveMQ
  4. activemq
After starting ActiveMQ you can visit the admin console using http://localhost:8161/admin/ and do the administrative tasks

JMS Messaging Models

JMS has two messaging models, point to point messaging model and publisher subscriber messaging model.

Point to point messaging model

Producer sends the message to a specified queue within JMS provider and the only one of the consumers who listening to that queue receives that message.

Image courtesy Oracle
Point to Point Model Example

Example 1 and example 2 are almost similar the only difference is example 1 creates queues within the program and the example 2 uses jndi.properties file for naming look ups and creating queues.

Example 1
package com.eviac.blog.jms;

import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.log4j.BasicConfigurator;

public class Producer {

 public Producer() throws JMSException, NamingException {

  // Obtain a JNDI connection
  InitialContext jndi = new InitialContext();

  // Look up a JMS connection factory
  ConnectionFactory conFactory = (ConnectionFactory) jndi
    .lookup("connectionFactory");
  Connection connection;

  // Getting JMS connection from the server and starting it
  connection = conFactory.createConnection();
  try {
   connection.start();

   // JMS messages are sent and received using a Session. We will
   // create here a non-transactional session object. If you want
   // to use transactions you should set the first parameter to 'true'
   Session session = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);

   Destination destination = (Destination) jndi.lookup("MyQueue");

   // MessageProducer is used for sending messages (as opposed
   // to MessageConsumer which is used for receiving them)
   MessageProducer producer = session.createProducer(destination);

   // We will send a small text message saying 'Hello World!'
   TextMessage message = session.createTextMessage("Hello World!");

   // Here we are sending the message!
   producer.send(message);
   System.out.println("Sent message '" + message.getText() + "'");
  } finally {
   connection.close();
  }
 }

 public static void main(String[] args) throws JMSException {
  try {
   BasicConfigurator.configure();
   new Producer();
  } catch (NamingException e) {
   e.printStackTrace();
  }

 }
}
package com.eviac.blog.jms;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.BasicConfigurator;

public class Consumer {
 // URL of the JMS server
 private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

 // Name of the queue we will receive messages from
 private static String subject = "MYQUEUE";

 public static void main(String[] args) throws JMSException {
  BasicConfigurator.configure();
  // Getting JMS connection from the server
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
  Connection connection = connectionFactory.createConnection();
  connection.start();

  // Creating session for seding messages
  Session session = connection.createSession(false,
    Session.AUTO_ACKNOWLEDGE);

  // Getting the queue
  Destination destination = session.createQueue(subject);

  // MessageConsumer is used for receiving (consuming) messages
  MessageConsumer consumer = session.createConsumer(destination);

  // Here we receive the message.
  // By default this call is blocking, which means it will wait
  // for a message to arrive on the queue.
  Message message = consumer.receive();

  // There are many types of Message and TextMessage
  // is just one of them. Producer sent us a TextMessage
  // so we must cast to it to get access to its .getText()
  // method.
  if (message instanceof TextMessage) {
   TextMessage textMessage = (TextMessage) message;
   System.out.println("Received message '" + textMessage.getText()
     + "'");
  }
  connection.close();
 }
}
Example 2

jndi.properties
# START SNIPPET: jndi

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

# use the following property to configure the default connector
java.naming.provider.url = vm://localhost

# use the following property to specify the JNDI name the connection factory
# should appear as. 
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.MyQueue = example.MyQueue


# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.MyTopic = example.MyTopic

# END SNIPPET: jndi
package com.eviac.blog.jms;

import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.log4j.BasicConfigurator;

public class Producer {

 public Producer() throws JMSException, NamingException {

  // Obtain a JNDI connection
  InitialContext jndi = new InitialContext();

  // Look up a JMS connection factory
  ConnectionFactory conFactory = (ConnectionFactory) jndi
    .lookup("connectionFactory");
  Connection connection;

  // Getting JMS connection from the server and starting it
  connection = conFactory.createConnection();
  try {
   connection.start();

   // JMS messages are sent and received using a Session. We will
   // create here a non-transactional session object. If you want
   // to use transactions you should set the first parameter to 'true'
   Session session = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);

   Destination destination = (Destination) jndi.lookup("MyQueue");

   // MessageProducer is used for sending messages (as opposed
   // to MessageConsumer which is used for receiving them)
   MessageProducer producer = session.createProducer(destination);

   // We will send a small text message saying 'Hello World!'
   TextMessage message = session.createTextMessage("Hello World!");

   // Here we are sending the message!
   producer.send(message);
   System.out.println("Sent message '" + message.getText() + "'");
  } finally {
   connection.close();
  }
 }

 public static void main(String[] args) throws JMSException {
  try {
   BasicConfigurator.configure();
   new Producer();
  } catch (NamingException e) {
   e.printStackTrace();
  }

 }
}
package com.eviac.blog.jms;

import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.log4j.BasicConfigurator;

public class Consumer {
 public Consumer() throws NamingException, JMSException {
  Connection connection;
  
  // Obtain a JNDI connection
  InitialContext jndi = new InitialContext();

  // Look up a JMS connection factory
  ConnectionFactory conFactory = (ConnectionFactory) jndi
    .lookup("connectionFactory");
  // Getting JMS connection from the server and starting it
  // ConnectionFactory connectionFactory = new
  // ActiveMQConnectionFactory(url);
  connection = conFactory.createConnection();

  // // Getting JMS connection from the server
  // ConnectionFactory connectionFactory = new
  // ActiveMQConnectionFactory(url);
  // Connection connection = connectionFactory.createConnection();
  try {
   connection.start();

   // Creating session for seding messages
   Session session = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);

   // Getting the queue
   Destination destination = (Destination) jndi.lookup("MyQueue");

   // MessageConsumer is used for receiving (consuming) messages
   MessageConsumer consumer = session.createConsumer(destination);

   // Here we receive the message.
   // By default this call is blocking, which means it will wait
   // for a message to arrive on the queue.
   Message message = consumer.receive();

   // There are many types of Message and TextMessage
   // is just one of them. Producer sent us a TextMessage
   // so we must cast to it to get access to its .getText()
   // method.
   if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;
    System.out.println("Received message '" + textMessage.getText()
      + "'");
   }
  } finally {
   connection.close();
  }
 }

 public static void main(String[] args) throws JMSException {
  BasicConfigurator.configure();
  try {
   new Consumer();
  } catch (NamingException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }

 }
}

Publisher Subscriber Model

Publisher publishes the message to a specified topic within JMS provider and all the subscribers who subscribed for that topic receive the message. Note that only the active subscribers receive the message.

Image courtesy Oracle
Point to Point Model Example
package com.eviac.blog.jms;

import javax.jms.*;
import javax.naming.*;

import org.apache.log4j.BasicConfigurator;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class DemoPublisherSubscriberModel implements javax.jms.MessageListener {
 private TopicSession pubSession;
 private TopicPublisher publisher;
 private TopicConnection connection;

 /* Establish JMS publisher and subscriber */
 public DemoPublisherSubscriberModel(String topicName, String username,
   String password) throws Exception {
  // Obtain a JNDI connection
  InitialContext jndi = new InitialContext();

  // Look up a JMS connection factory
  TopicConnectionFactory conFactory = (TopicConnectionFactory) jndi
    .lookup("topicConnectionFactry");

  // Create a JMS connection
  connection = conFactory.createTopicConnection(username, password);

  // Create JMS session objects for publisher and subscriber
  pubSession = connection.createTopicSession(false,
    Session.AUTO_ACKNOWLEDGE);
  TopicSession subSession = connection.createTopicSession(false,
    Session.AUTO_ACKNOWLEDGE);

  // Look up a JMS topic
  Topic chatTopic = (Topic) jndi.lookup(topicName);

  // Create a JMS publisher and subscriber
  publisher = pubSession.createPublisher(chatTopic);
  TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);

  // Set a JMS message listener
  subscriber.setMessageListener(this);

  // Start the JMS connection; allows messages to be delivered
  connection.start();

  // Create and send message using topic publisher
  TextMessage message = pubSession.createTextMessage();
  message.setText(username + ": Howdy Friends!");
  publisher.publish(message);

 }

 /*
  * A client can register a message listener with a consumer. A message
  * listener is similar to an event listener. Whenever a message arrives at
  * the destination, the JMS provider delivers the message by calling the
  * listener's onMessage method, which acts on the contents of the message.
  */
 public void onMessage(Message message) {
  try {
   TextMessage textMessage = (TextMessage) message;
   String text = textMessage.getText();
   System.out.println(text);
  } catch (JMSException jmse) {
   jmse.printStackTrace();
  }
 }

 public static void main(String[] args) {
  BasicConfigurator.configure();
  try {
   if (args.length != 3)
    System.out
      .println("Please Provide the topic name,username,password!");

   DemoPublisherSubscriberModel demo = new DemoPublisherSubscriberModel(
     args[0], args[1], args[2]);

   BufferedReader commandLine = new java.io.BufferedReader(
     new InputStreamReader(System.in));

   // closes the connection and exit the system when 'exit' enters in
   // the command line
   while (true) {
    String s = commandLine.readLine();
    if (s.equalsIgnoreCase("exit")) {
     demo.connection.close();
     System.exit(0);

    }
   }
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}
JMS programming model: Image Courtesy Oracle

Wednesday, July 4, 2012

Apache Thrift with Java quickstart

Apache Thrift is a RPC framework founded by facebook and now it is an Apache project. Thrift lets you define data types and service interfaces in a language neutral definition file. That definition file is used as the input for the compiler to generate code for building RPC clients and servers that communicate over different programming languages. You can refer Thrift white paper also.

According to the official web site Apache Thrift is a,
software framework, for scalable cross-language services development, combines a software stack with a code generation engine to build services that work efficiently and seamlessly between C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, OCaml and Delphi and other languages.
Image courtesy wikipedia

Installing Apache Thrift in Windows

Installation Thrift can be a tiresome process. But for windows the compiler is available as a prebuilt exe. Download thrift.exe and add it into your environment variables.

Writing Thrift definition file (.thrift file)

Writing the Thrift definition file becomes really easy once you get used to it. I found this tutorial quite useful to begin with.

Example definition file (add.thrift)
namespace java com.eviac.blog.samples.thrift.server  // defines the namespace 

typedef i32 int  //typedefs to get convenient names for your types

service AdditionService {  // defines the service to add two numbers
        int add(1:int n1, 2:int n2), //defines a method
}

Compiling Thrift definition file

To compile the .thrift file use the following command.
 
thrift --gen <language> <Thrift filename>
For my example the command is,
 
thrift --gen java add.thrift
After performing the command, inside gen-java directory you'll find the source codes which is useful for building RPC clients and server. In my example it will create a java code called AdditionService.java

Writing a service handler

Service handler class is required to implement the AdditionService.Iface interface.

Example service handler (AdditionServiceHandler.java)
 
package com.eviac.blog.samples.thrift.server;

import org.apache.thrift.TException;

public class AdditionServiceHandler implements AdditionService.Iface {

 @Override
 public int add(int n1, int n2) throws TException {
  return n1 + n2;
 }

}
Writing a simple server

Following is an example code to initiate a simple thrift server. To enable the multithreaded server uncomment the commented parts of the example code.

Example server (MyServer.java)
package com.eviac.blog.samples.thrift.server;

import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServer.Args;
import org.apache.thrift.server.TSimpleServer;

public class MyServer {

 public static void StartsimpleServer(AdditionService.Processor<AdditionServiceHandler> processor) {
  try {
   TServerTransport serverTransport = new TServerSocket(9090);
   TServer server = new TSimpleServer(
     new Args(serverTransport).processor(processor));

   // Use this for a multithreaded server
   // TServer server = new TThreadPoolServer(new
   // TThreadPoolServer.Args(serverTransport).processor(processor));

   System.out.println("Starting the simple server...");
   server.serve();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
 
 public static void main(String[] args) {
  StartsimpleServer(new AdditionService.Processor<AdditionServiceHandler>(new AdditionServiceHandler()));
 }

}

Writing the client

Following is an example java client code which consumes the service provided by AdditionService.

Example client code (AdditionClient.java)
package com.eviac.blog.samples.thrift.client;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

public class AdditionClient {

 public static void main(String[] args) {

  try {
   TTransport transport;

   transport = new TSocket("localhost", 9090);
   transport.open();

   TProtocol protocol = new TBinaryProtocol(transport);
   AdditionService.Client client = new AdditionService.Client(protocol);

   System.out.println(client.add(100, 200));

   transport.close();
  } catch (TTransportException e) {
   e.printStackTrace();
  } catch (TException x) {
   x.printStackTrace();
  }
 }

}


Run the server code(MyServer.java). It should output following and will listen to the requests.
Starting the simple server...
Then run the client code(AdditionClient.java). It should output following.
300