Friday, August 7, 2015

Introduction to YARN

YARN/Hadoop 2.x has a completely different architecture with compared to Hadoop 1.x.
In Hadoop 1.x JobTracker serves two major functions;
  1. Resource management
  2. Job scheduling/Job monitoring
Recall that in Hadoop 1.x, there is a single JobTracker per Hadoop cluster, serving these function while scaling can overwhelm the JobTracker. Also having a single JobTracker makes it a single point of failure, if the JobTracker goes down the entire cluster goes down with all the current jobs.
YARN tries to separate above mentioned two functionalities into two daemons.
  1. Global Resource Manager
  2. Per-application Application Master
Before YARN, Hadoop designed to support MapReduce type of jobs only. As time goes by people came up with Big Data computation problems which cannot be addressed by MapReduce, hence came up with different frameworks, which works on top of HDFS, to address their problems a better way. Some of which are Apache Spark, Apache HAMA and Apache Giraph. YARN provides a way for these new frameworks, to be integrated into Hadoop framework, sharing the same underlying HDFS. This way YARN enables Hadoop to handle jobs beyond MapReduce.

Yarn Architecture Overview


Yarn Architecture. [Image Source 'http://hadoop.apache.org/']

Yarn Components


As mentioned earlier, with the architectural refurbishment, YARN introduces a whole new set of terminologies which we have to be familiar with.

YARN uses the term 'application' instead of the term 'job', which is used in Hadoop 1.x. In YARN an application can be a single job or a Direct Acyclic Graph(DAG) of jobs, and an application does not essentially have to be a MapReduce type. An application is an instance of a certain application type, which associated with an Application Master. For each application an ApplicationMaster instance will be initiated.

YARN components includes;
  • Container
  • Global Resource Manager
  • Per-node Node Manager
  • Per-application Application Master
Let's go through each component one by one;

Container

Container is a place where a unit of work occurs. For instance each MapReduce task(not the entire job) runs in one container. An application/job will run on one or more containers. Set of physical resources are allocated for each container, currently CPU core and RAM are supported. Each node in a Hadoop cluster can run several containers.

Global Resource Manager

Resource Manager consists of two main elements;
  1. Scheduler
  2. Application Manager
The pluggable Scheduler is accountable for allocating resources to running applications. Scheduling resources are done based on the resource requirements of the applications and ensures optimal resource utilization. Example pluggable Schedulers includes Capacity Scheduler and Fair Scheduler.

The Application Master does several jobs;
  • Accepts job submissions by client programs.
  • Negotiate the first container to execute per-application Application Master.
  • Provide the service to restart a failed Application Master container.
As you can recall in Hadoop 1.x JobTracker handled restarting failed tasks and monitoring each task status. As you can observe in YARN ResourceManager does not handle any of these tasks, instead they have delegated to a different component called per-application Application Master, which we will encounter later. This separation has made the ResourceManager the ultimate authority for allocating resources, and it also decreases the load on ResourceManager and enables it to scale more than the JobTracker.

You may have noticed, Global Resource Manager could be a single point of failure. After its 2.4 release, Hadoop introduced the high availability Resource Manager concept, having Active/Standby ResourceManager pair to remove this single point of failure. you can read more about it from this link.

Per-node Node Manager

Node Manager is the component which actually provisions resources to applications. NodeManager daemon is a slave service which runs on each computation node in a Hadoop cluster.

NodeManager takes resource requests from ResourceManager and provisions Containers to applications. NodeManager keeps track of the health of each node and reports to ResourceManager periodically and that way ResourceManager can keep track of global health.

During each node startup, they registers with the ResourceManager and let ResourceManager know the amount of resources available. These informations are updated periodically.

NodeManager manages resources and periodically reports to ResourceManager about node status, it does not know anything about application status. Applications are handled by a different component called ApplicationMaster, which we are going to discuss next.

Per-application Application Manager

ApplicationMaster negotiates resource containers which are required to execute the application from ResourceManager and obtain resources from NodeManager and executes application. For each application which is actually an instance of a certain application type, an ApplicationManager instance is initiated.

In Hadoop 1.x when a task fails the JobTracker is responsible to re-execute that task, this increases the load of JobTracker and reduce its scalability.

In Hadoop 2.x ResourceManager is only accountable for scheduling resources. ApplicationMaster is responsible for negotiating resource containers from ResourceManager, if a task fails ApplicationMaster negotiates resources from ResourceManager and tries to re-execute the failed task.

Hadoop 1.x only supports MapReduce type of jobs as its design is tightly coupled to solve MapReduce type of computations. In contrast Hadoop 2.x has a more pluggable architecture and supports new frameworks which uses HDFS underneath. A new framework can be plugged in and play with Hadoop framework by developing its ApplicationMaster.

How YARN handles a client request?


When a client program submits an application to YARN framework, the ApplicationMaster gets decided based on application type. ResourceManager negotiates with NodeManager to obtain a Container to execute an instance of the ApplicationMaster. After ApplicationMaster instance is initiated, it gets registered with the ResourceManager. Client communicates with ApplicationMaster though the ResourceManager. ApplicationMaster negotiates with the ResourceManager for resources on a specific node, and obtains actual resources from NodeManager most probably of that specific node. Application codes which run on Containers reports their status to ApplicationMaster periodically. After job completion, AppMaster deregisters with ResourceManager and the containers used are released.

HDFS High Availability


In Hadoop 1.x there is a single NameNode which makes it a single point of failure. If the NameNode fails the entire cluster becomes inaccessible. To avoid this hassle, Hadoop 2.x introduces a High Availability NameNode concept, by having Active/Standby NameNode pair. In high level, while Active NameNode serves client requests, the Standby NameNode constantly synchronizes with the Active NameNode. If the Active NameNode fails, Standby NameNode, becomes the Active NameNode and keeps serving client requests. You can read in depth details from here.

Tuesday, November 25, 2014

Hadoop MapReduce Features : Custom Data Types

Hadoop requires every data type to be used as keys to implement Writable and Comparable interfaces and every data type to be used as values to implement Writable interface.

Writable interface

Writable interface provides a way to serialize and deserialize data, across network. Located in org.apache.hadoop.io package.
public interface Writable {
   void write(DataOutput out) throws IOException;
   void readFields(DataInput in) throws IOException;
}

Comparable interface

Hadoop uses Java's Comparable interface to facilitate the sorting process. Located in java.lang package.
public interface Comparable<T> {
   public int compareTo(T o);
}
WritableComparable interface

For convenience Hadoop provides a WritableComparable interface which wraps both Writable and Comparable interfaces to a single interface. Located in org.apache.hadoop.io package.
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
Hadoop provides a set of classes; Text, IntWritable, LongWritable, FloatWritable, BooleanWritable etc..., which implement WritableComparable interface, and therefore can be straightly used as key and value types.

Custom Data Types

Example

Assume you want an object representation of a vehicle to be the type of your key or value. Three properties of a vehicle has taken in to consideration.
  1. Manufacturer
  2. Vehicle Identification Number(VIN)
  3. Mileage
Note that the Vehicle Identification Number(VIN) is unique for each vehicle.

A tab delimited file containing the observations of these three variables contains following sample data in it.

Sample Data
Toyota 1GCCS148X48370053 10000
Toyota 1FAPP64R1LH452315 40000
BMW WP1AA29P58L263510 10000
BMW JM3ER293470820653 60000
Nissan 3GTEC14V57G579789 10000
Nissan 1GNEK13T6YJ290558 25000
Honda 1GC4KVBG6AF244219 10000
Honda 1FMCU5K39AK063750 30000
Custom Value Types

Hadoop provides the freedom of creating custom value types by implementing the Writable interface. Implementing the writable interface one should implement its two abstract methods write() and readFields(). In addition to that, in the following example java code, I have overridden the toString() method to return the text representation of the object.
package net.eviac.blog.datatypes.value;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

/**
 * @author pavithra
 * 
 * Custom data type to be used as a value in Hadoop.
 * In Hadoop every data type to be used as values must implement Writable interface.
 *
 */
public class Vehicle implements Writable {

  private String model;
  private String vin;
  private int mileage;

  public void write(DataOutput out) throws IOException {
    out.writeUTF(model);
    out.writeUTF(vin);
    out.writeInt(mileage);
  }

  public void readFields(DataInput in) throws IOException {
    model = in.readUTF();
    vin = in.readUTF();
    mileage = in.readInt();
  }

  @Override
  public String toString() {
    return model + ", " + vin + ", "
        + Integer.toString(mileage);
  }

  public String getModel() {
    return model;
  }
  public void setModel(String model) {
    this.model = model;
  }
  public String getVin() {
    return vin;
  }
  public void setVin(String vin) {
    this.vin = vin;
  }
  public int getMileage() {
    return mileage;
  }
  public void setMileage(int mileage) {
    this.mileage = mileage;
  }
  
}
Following MapReduce job, outputs total Mileage per Manufacturer, with using Vehicle as a custom value type.
package net.eviac.blog.datatypes.jobs;

import java.io.IOException;

import net.eviac.blog.datatypes.value.Vehicle;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;

/**
 * @author pavithra
 *
 */
public class ModelTotalMileage {
  
  static final Logger logger = Logger.getLogger(ModelTotalMileage.class);

  public static class ModelMileageMapper
  extends Mapper<Object, Text, Text, Vehicle>{

    private Vehicle vehicle = new Vehicle();
    private Text model = new Text();

    public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {

      String var[] = new String[6];
      var = value.toString().split("\t"); 

      if(var.length == 3){
        model.set(var[0]);
        vehicle.setModel(var[0]);
        vehicle.setVin(var[1]);
        vehicle.setMileage(Integer.parseInt(var[2]));
        context.write(model, vehicle);
      }
    }
  }

  public static class ModelTotalMileageReducer
  extends Reducer<Text,Vehicle,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<Vehicle> values,
        Context context
        ) throws IOException, InterruptedException {
      int totalMileage = 0;
      for (Vehicle vehicle : values) {
        totalMileage += vehicle.getMileage();
      }
      result.set(totalMileage);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    BasicConfigurator.configure();
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Model Total Mileage");
    job.setJarByClass(ModelTotalMileage.class);
    job.setMapperClass(ModelMileageMapper.class);
    job.setReducerClass(ModelTotalMileageReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Vehicle.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}
Output
BMW 70000
Honda 40000
Nissan 35000
Toyota 50000
Custom Key Types

Hadoop provides the freedom of creating custom key types by implementing the WritableComparable interface. In addition to using Writable interface, so they can be transmitted over the network, keys must implement Java's Comparable interface to facilitate the sorting process. Since outputs are sorted on keys by the framework, only the data type to be used as keys must implement Comparable interface.

Implementing Comparable interface a class should implement its abstract method compareTo(). A Vehicle object must be able to be compared to other vehicle objects, to facilitate the sorting process. Sorting process uses compareTo(), to determine how Vehicle objects should be sorted. For an instance, since VIN is a String and String implements Comparable, we can sort vehicles by VIN.

For partitioning process, it is important for key types to implement hashCode() as well, thus should override the equals() as well. hashCode() should use the same variable as equals() which in our case is VIN. If equals() method says two Vehicle objects are equal if they have the same VIN, Vehicle objects with the same VIN will have to return identical hash codes.

Following example provides a java code for complete custom key type.
package net.eviac.blog.datatypes.key;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * @author pavithra
 * 
 * Custom data type to be used as a key in Hadoop.
 * In Hadoop every data type to be used as keys must implement WritableComparable interface.
 *
 */
public class Vehicle implements WritableComparable<Vehicle> {

  private String model;
  private String vin;
  private int mileage;

  public void write(DataOutput out) throws IOException {
    out.writeUTF(model);
    out.writeUTF(vin);
    out.writeInt(mileage);
  }

  public void readFields(DataInput in) throws IOException {
    model = in.readUTF();
    vin = in.readUTF();
    mileage = in.readInt();
  }

  @Override
  public String toString() {
    return model + ", " + vin + ", "
        + Integer.toString(mileage);
  }
  
  public int compareTo(Vehicle o) {
    return vin.compareTo(o.getVin());
  } 
  
  @Override
  public boolean equals(Object obj) {
    if((obj instanceof Vehicle) && (((Vehicle)obj).getVin().equals(vin))){
      return true;
    }else {
      return false;
    }    
  }
  
  @Override
  public int hashCode() {
    int ascii = 0;
    for(int i=1;i<=vin.length();i++){
      char character = vin.charAt(i);
      ascii += (int)character;
    }
    return ascii;
  }

  public String getModel() {
    return model;
  }
  public void setModel(String model) {
    this.model = model;
  }
  public String getVin() {
    return vin;
  }
  public void setVin(String vin) {
    this.vin = vin;
  }
  public int getMileage() {
    return mileage;
  }
  public void setMileage(int mileage) {
    this.mileage = mileage;
  }  
  
}
Following MapReduce job, outputs Mileage per vehicle, with using Vehicle as a custom key type.
package net.eviac.blog.datatypes.jobs;

import java.io.IOException;

import net.eviac.blog.datatypes.key.Vehicle;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;

/**
 * @author pavithra
 *
 */
public class VehicleMileage {
  
  static final Logger logger = Logger.getLogger(VehicleMileage.class);

  public static class VehicleMileageMapper
  extends Mapper<Object, Text, Vehicle, IntWritable>{

    private Vehicle vehicle = new Vehicle();
    private IntWritable mileage = new IntWritable();

    public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {

      String var[] = new String[6];
      var = value.toString().split("\t"); 

      if(var.length == 3){
        mileage.set(Integer.parseInt(var[2]));
        vehicle.setModel(var[0]);
        vehicle.setVin(var[1]);
        vehicle.setMileage(Integer.parseInt(var[2]));
        context.write(vehicle, mileage);
      }
    }
  }

  public static class VehicleMileageReducer
  extends Reducer<Vehicle,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
    private Text vin = new Text();

    public void reduce(Vehicle key, Iterable<IntWritable> values,
        Context context
        ) throws IOException, InterruptedException {
      int totalMileage = 0;
      for (IntWritable mileage : values) {
        totalMileage += mileage.get();
      }
      result.set(totalMileage);
      vin.set(key.getVin());
      context.write(vin, result);
    }
  }

  public static void main(String[] args) throws Exception {
    BasicConfigurator.configure();
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Model Total Mileage");
    job.setJarByClass(VehicleMileage.class);
    job.setMapperClass(VehicleMileageMapper.class);
    job.setReducerClass(VehicleMileageReducer.class);
    job.setMapOutputKeyClass(Vehicle.class);
    job.setOutputKeyClass(Text.class);    
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}
Output
1FAPP64R1LH452315 40000
1FMCU5K39AK063750 30000
1GC4KVBG6AF244219 10000
1GCCS148X48370053 10000
1GNEK13T6YJ290558 25000
3GTEC14V57G579789 10000
JM3ER293470820653 60000
WP1AA29P58L263510 10000

Saturday, November 1, 2014

Getting started with Hadoop MapReduce

Hadoop MapReduce framework provides a way to process large data, in parallel, on large clusters of commodity hardware.

[Processing a large file serially from top to bottom could be a very time consuming task, instead, in brief, MapReduce breaks that large file into chunks and processes in parallel.]

A little note on HDFS

HDFS, Hadoop Distributed File System, is a fault tolerant, distributed storage, which is responsible for storing large data, on large clusters of commodity hardware.

HDFS splits data into chunks which are called blocks and stores them across multiple nodes within the cluster. HDFS distributes these blocks across different nodes, if possible. A typical block size in HDFS is 64 MB(you can configure this value using dfs.blocksize property within hdfs-default.xml file). Note that changing this setting will not affect the block size of any files currently in HDFS. It will only affect the block size of files placed into HDFS after this setting has taken effect. All blocks in a file except the last block are of the same size. Each block is given a unique name which is in the form of blk_<large number>. Each block is replicated, by default 3 times(you can configure this value using dfs.replication property within hdfs-default.xml file) across different nodes to ensure availability.

Suppose we have a file of size 160MB, as the file is loaded into HDFS, it splits into 3 blocks. The first block and the second block is of 64MB in size and the third one is of 32MB in size, to makeup 160MB file.

NameNode

NameNode keeps metadata of HDFS files, it does not store data itself. NameNode daemon runs on a Master node and there is only one NameNode per Hadoop cluster. NameNode runs on a seperate JVM process, in a typical production cluster there is a separate node which runs NameNode process.

NameNode does not store the location for each block, they are acquired from each DataNode at cluster startup , and keep them in memory and persisted to a file in its namespace called 'fsimage'. Changes during operations are stored in memory and logged into a file called 'edit', which is also in the NameNode's namespace. The SecondaryNameNode is a daemon process, which does housekeeping functions for NameNode, periodically merge 'fsimage' with 'edits'.

Since there is a single NameNode per Hadoop cluster, it is a single point of failure. If the NameNode becomes unavailable, the entire cluster becomes inaccessible. You may have noticed DataNodes does not contain metadata of the data blocks, but only actual data blocks. Therefore losing the NameNode makes entire cluster inaccessible and useless.

If 'fsimage' and 'edits' files get corrupted, all the data in HDFS becomes inaccessible. Even though a bunch of commodity machines(JBOD) can be used for DataNodes, more reliable RAID-based storage must be used for NameNodes to assure reliability. Also 'fsimage' and 'edits' files must regularly be backed up.

Hadoop prefers large files over a large number of small files. Since only the data of one file can be stored in a data block, if there are a large number of small files present, they have to be stored in different separate data blocks, which in turn results a huge number of data blocks. During cluster operations, NameNode pulls all the metadata of these data blocks and stores them in memory. Which can simply overwhelm the NameNode.

DataNode

DataNodes daemons which run on slave nodes, store HDFS blocks. There is only one DataNode process runs per slave node. DataNodes also run on seperate JVM processes. DataNodes periodically send heartbeats to the NameNode to indicate they are alive. DataNodes also can talk to each other, for an instance they talk to each other during data replication.

When a client wants to perform an operation on a file, it first contacts NameNode to locate that file. NameNode then sends the locations of nodes that file is stored on as HDFS blocks. Client can then directly talk to DataNodes and perform the operation the file.

Daemons of MapReduce

There are two daemon processes we have to look into; JobTracker daemon and TaskTracker daemon.

JobTracker

JobTracker daemon which runs on a master node, tracks MapReduce jobs. There is only one JobTracker daemon per Hadoop cluster. JobTracker runs on a seperate JVM process, in a typical production cluster there is a separate node which runs JobTracker process.

TaskTracker

TaskTracker daemons which run on slave nodes, handles tasks(map, reduce) recieved from the JobTracker. There is only one TaskTracker per slave node. Every TaskTracker is setup with a set of slots which specifies the number of tasks it can handle(A TaskTracker can configured to handle multiple map and reduce tasks). TaskTracker starts up separate JVM processes for each task to isolate it from the problems caused by tasks. DataNodes periodically send heartbeats to the JobTracker to indicate they are alive and to inform the number of available slots.

Running a MapReduce job, client application submits jobs to the JobTracker. JobTracker then talks to the NameNode to locate necessary data blocks. Then the JobTracker choose TaskTrackers with free slots, which runs on the same nodes which contains data or within the same rack as data. TaskTrackers then start separate JVM processes for each task(can also use JVM Reuse), and monitor them, while the JobTracker monitors TaskTrackers for failures. When a task is done TaskTracker informs the JobTracker.

Both HDFS and MapReduce framework run on the same set of nodes, in other words storage nodes(DataNodes in HDFS) and compute nodes(nodes which TaskTrackers run on) are the same. In Hadoop computations are moved to the data, not the other way around.

Input Split

Input split is a chunk of an input that is processed by a single map. Each map is responsible for processing a single Input Split. An Input Split has a length in bytes and a set of storage locations. Input Split doesn't contain the input data, but a reference to the data. In other words, an Input Split is logical and has a reference to the input data which are physically stored in HDFS as HDFS blocks.

Hadoop uses InputSplit Java interface, which is in the org.apache.hadoop.mapred package, to represent an Input Split.


public interface InputSplit extends Writable {

  long getLength() throws IOException;
          
  String[] getLocations() throws IOException;

}

Records

Each Input Split is divided into Records, within a map task. Records are Key/Value pairs and logical. Map task processes each Record,one after the other.

InputFormat

InputSplit s are generated using an interface, InputFormat, which is in the org.apache.hadoop.mapred package.

public interface InputFormat<K, V> {
          
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
          
  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job, 
                                     Reporter reporter) throws IOException;
}

An InputFormat does two tings,
  1. Generating Input Splits.
  2. Dividing Input Splits into Records.
getSplits() method is invoked by the client to generate input splits, and the generated splits are submitted to the JobTracker. JobTracker uses the locations of the splits to choose TaskTrackres and assign splits to them. TakTrackers schedule map tasks to process the Input Splits(one map task for one split). Map Task calls getRecordReader() method on the Input Split to acquire a RecordReader. Map task then uses RecordReader to generate record key/value pairs, which map task later passes to the map() function.

FileInputFormat

FileInputFormat is the base class for all file-based InputFormats, which extends InputFormat interface. This does two things;
  1. Defining input paths for a MapReduce job.
  2. Giving an implementation for generating splits for the input files.
Note that dividing splits into records, which is not happening here, are performed by the sub classes. By default the Input Split size for FileInputFormat is the size of an HDFS block, therefore by default FileInputFormat splits files larger than an HDFS block. Note that it is possible to change these default configurations using the properties listed below;
  1. mapred.min.split.size
  2. mapred.max.split.size
  3. dfs.block.size
Note that FileInputFormat can override the isSplitable(FileSystem, Path) method to return FALSE, to ensure input files are non-splittable and processed as a whole.

Inputs and Outputs

MapReduce framework operates on a series of Key/Value transformations, where input to a MapReduce job is a set of {key, value} pairs and output is also a set of {key, value} pairs. Note that types of input {key, value} pairs possibly could be different from types of output {key, value} pairs.
(input) {k1, v1} -> map -> {k2, List(v2)} -> reduce -> {k3, v3} (output)
Every data type to be used as keys must implement Writable and Comparable interfaces and every data type to be used as values must implement Writable interface. Writable interface provides a way to serialize and deserialize data, across network. Since outputs are sorted on keys by the framework, to facilitate the sorting process, only the data type to be used as keys must implement Comparable interface.

WordCount - Example MapReduce Program

Following example MapReduce program is the exact same one that you will find in Hadoop MapReduce Tutorial. This code works with all three modes; Standalone mode, Pseudo-distributed mode and Fully-distributed mode.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

 public static class TokenizerMapper
 extends Mapper<Object, Text, Text, IntWritable>{

  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(Object key, Text value, Context context
    ) throws IOException, InterruptedException {
   StringTokenizer itr = new StringTokenizer(value.toString());
   while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
   }
  }
 }
 
 public static class IntSumReducer
 extends Reducer<Text,IntWritable,Text,IntWritable> {
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values,
    Context context
    ) throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : values) {
    sum += val.get();
   }
   result.set(sum);
   context.write(key, result);
  }
 }

 public static void main(String[] args) throws Exception {
// Creating a configuration object
  Configuration conf = new Configuration();
// Creating an instance of Job class
  Job job = Job.getInstance(conf, "word count");
// Setting the name of the main class within the jar file
  job.setJarByClass(WordCount.class);
// Setting the mapper class
  job.setMapperClass(TokenizerMapper.class);
 // Setting the combiner class
  job.setCombinerClass(IntSumReducer.class);
// Setting the reducer class
  job.setReducerClass(IntSumReducer.class);
// Setting the data type of the output(final) key
  job.setOutputKeyClass(Text.class);
// Setting the data type of the output(final) value
  job.setOutputValueClass(IntWritable.class);
// Setting input file path, the 1st argument passed in to the main method is used.
  FileInputFormat.addInputPath(job, new Path(args[0]));
// Setting output file path,the 2nd argument passed in to the main method is used.
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Running the job and wait for it to get completed
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
 
}

To perform a MapReduce job, we need a mapper implementation and a reducer implementation. As you can see in the above code, mapper and reducer classes are defined as inner classes. Also MapReduce job configuration happens within its main method, mainly with the use of an instance of Job class. You can go through the WordCount.java code and refer the comments I've added in there, to understand these configurations.

Mapper

In the WordCount example, you can see the mapper implementation is named as 'TokenizerMapper', which extends the base Mapper class provided by Hadoop and has overridden its map method. As you can see, map method has three parameters,
  1. Input key
  2. Input value
  3. An instance of the Context class, which is used to emit the results.
Mapper is executed once for each line of text, and in each time that line of text is broken into words, then it emits a series of new key/value pairs of the form {word,1} using the 'context' object.

Partitioning, Shuffle and Sort

Partitioning

Hadoop ensures that all intermediate records with the same key end up in the same reducer. The default partitioner used by MapReduce framework is HashPartitioner.

Shuffle and Sort

MapReduce ensures that the input to a reducer is sorted by key. The shuffle and sort phases occur simultaneously. It's the process of performing sort and transferring intermediate mapper outputs to the reducers as inputs. As the outputs are fetched by the reducer, they get merged.

Combiner

Hadoop allows the use of an optional Combiner class to run on mapper outputs. Specifying a Combiner class, each mapper output will go through a local Combiner and will perform sorting on keys, local aggregation on them. Combiner output creates the input to the reducer. Combiner class is an optimization, so there is no guarantee of how many times it will run on a mapper output, it could be zero, one or more times. Therefore we must be absolutely sure when specifying a Combiner, that the job will produce the same output from reducer regardless of how many times the Combiner runs on a mapper output. WordCount example has specified a combiner which is same as the reducer.

Reducer

In the WordCount example, you can see the reducer implementation is named as 'IntSumReducer', which extends the base Reducer class provided by Hadoop and has overridden its reduce method. As you can see, reduce method has three parameters,
  1. Input key
  2. Input list of values as an Iterable object
  3. An instance of the Context class, which is used to emit the results.
Note that each mapper emits a series of key/value pairs and in the intermediate shuffle and sort phase these individual key/value pairs get combined into a series of key/List(value) pairs, which inputs to the reducers. Reducer is executed once for each key(word). In the WordCount example reducer computes the sum of values in the Iterable object and emits the results for each word, as in the form of {word, sum}.

Let's take an example, assume we have two input files, one containing a word 'Hello World Bye World' and the other containing a word 'Bye World Bye' in it. In this particular case;

W/O Combiner

1st Mapper emits;
{Hello, 1}
{World, 1}
{Bye, 1} 
{World, 1}
2nd Mapper emits;
{Bye, 1}
{World, 1}
{Bye, 1} 
After shuffle and sort phase, inputs to the reducer;
{Bye, (1,1,1)} 
{Hello,1}
{World, (1,1,1)}
Reducer emits;
{Bye, 3} 
{Hello,1}
{World, 3}
With Combiner

1st Mapper emits;
{Hello, 1} 
{World, 1}
{Bye, 1} 
{World, 1}
2nd Mapper emits;
{Bye, 1}
{World, 1}
{Bye, 1} 
Combiner does a local aggregation and mapper outputs get sorted on keys;

For the 1st Mapper;
{Bye, 1} 
{Hello, 1}
{World, 2}
For the 2nd Mapper;
{Bye, 2}
{World, 1}
Reducer emits;
{Bye, 3} 
{Hello, 1}
{World, 3}

Running a MapReduce job

  1. Add hadoop classpath to your classpath using the following command.
    $export CLASSPATH=`hadoop classpath`:$CLASSPATH
    
  2. Now compile WordCount.java using the following command.
    $javac WordCount.java
    
  3. Create the job jar file.
    $jar cf wc.jar WordCount*.class
    
  4. If you are using Standalone mode to run the job, you can simply use the following command.
    $hadoop jar wc.jar WordCount input output
    
    As you can see there are four arguments to this command,
    1. Name of the jar file.
    2. Name of the main class within the jar file.
    3. The input file location in your local machine.

      Viewing the inputs
      $ls input
      ## file01 file02 
      
      $cat input/file01
      ## Hello World Bye World
      
      $cat input/file02
      ## Bye World Bye
      
    4. The output file location.
    To view the output, use the following command,
    $cat output/part-r-00000
    
    In a successful execution of the job, following should be the output.
    Bye 3
    Hello 1
    World 3
    
  5. If you are using Pseudo-distributed mode to run the job, First place the job jar into a desired location(I have used my HDFS home) on HDFS, using the following command.
    $hdfs dfs -put wc.jar /user/pavithra
    
    you can use the following command to run the job.
    $hadoop jar wc.jar WordCount input output
    
    As you can see there are four arguments to this command,
    1. Name of the jar file.
    2. Name of the main class within the jar file.
    3. The input file location in HDFS. This is relative to your home directory in HDFS. In my case, the full path to home directory would be /user/pavithra/input.

      Viewing the inputs
      $hdfs dfs -ls input
      ## file01 file02 
      
      $hdfs dfs -cat input/file01
      ## Hello World Bye World
      
      $hdfs dfs -cat input/file02
      ## Bye World Bye
      
    4. The output file location. This is also relative to your home directory in HDFS. The full path would be in my case, user/pavithra/output
    To view the output, use the following command,
    hdfs dfs -cat output/part-r-00000
    
    In a successful execution of the job following should be the output.
    Bye 3
    Hello 1
    World 3
    

Saturday, October 25, 2014

Getting Hadoop Up and Running on Ubuntu

Hadoop is a framework written in Java, that allows processing of large datasets in a distributed manner across large clusters of commodity hardware.

Hadoop was created by Doug Cutting and he got the inspiration for Hadoop after reading Google papers "The Google File System" and "MapReduce: Simplified Data Processing on Large Clusters" published in 2003 and 2004 respectively.

As per the project main page, Hadoop includes four modules; Hadoop Common, Hadoop Distributed File System(HDFS), Hadoop YARN and Hadoop MapReduce. There is a whole ecosystem built around Hadoop; including projects like Apache Hive, Apache Pig, Apache ZooKeeper ect...

There are three modes to start a Hadoop cluster.
  1. Local (Standalone) Mode
  2. Pseudo-Distributed Mode
  3. Fully-Distributed Mode
In this post my aim is to get Hadoop up and running on a Ubuntu host using Local (Standalone) Mode and on Pseudo-Distributed Mode.

Linux is supported as a development and a production platform by Hadoop. Since Hadoop runs on any Linux distribution, my choice of platform being Ubuntu will not have any effect to the readers who are using different Linux distributions. Note that environment configurations can be vary across different distributions.

For this post I'll be using Ubuntu 14.04 LTS and Apache Hadoop 2.5.1.

Prerequisites

Since Hadoop is written in Java, Java should be installed in your Ubuntu host. Refer this link for recommended Java versions. Perform following command in command line to check if you have already installed Java on your machine.
 
$ javac
$ java -version
This link provides a good resource in case you have not already installed Java.

Once Java is installed, you should set JAVA_HOME/bin to your PATH, to ensure java is available from the command line. To save the JAVA_HOME environment variable persistently, open up ~/.profile file using following command.
 
$ gedit ~/.profile
Append following lines to it and save.

export JAVA_HOME=/usr/lib/jvm/java-7-oracle
export PATH=$JAVA_HOME/bin

Note that after editing, you should re-login in order to initialize the variables, but you could use following command and use the variable without re-login. Also there are many different ways you can save an environment variables in Ubuntu, refer this link to find out what they are.
 
$ source ~/.profile
Downloading Hadoop
  1. Download the latest stable Hadoop release from this link. You'll most likely to download a file named like; hadoop-2.5.1.tar.gz(This is the latest version of Hadoop at the time of this writing.)
  2. I prefer Hadoop being installed in /usr/local directory. Decompress the downloaded file using the following command.
     
    $ tar -xf hadoop-2.5.1.tar.gz -C /usr/local/
    
  3. Add $HADOOP_PREFIX/bin directory to your PATH, to ensure Hadoop is available from the command line. Follow the same steps we followed for adding JAVA_HOME variable, except you should append following in .profile file.

    export HADOOP_PREFIX=/usr/local/hadoop-2.5.0
    export PATH=$HADOOP_PREFIX/bin:$PATH
  4. Define following parameters in etc/hadoop/hadoop-env.sh file.
  5.  
    $ export JAVA_HOME=/usr/lib/jvm/java-7-oracle
    $ export HADOOP_PREFIX=/usr/local/hadoop-2.5.1
    
  6. If the execution of the following command displays the usage documentation for Hadoop script, you are good to go to start your Hadoop cluster in one of the above mentioned three modes.
     
    $ hadoop
    

Standalone Mode

Hadoop by default is configured to run as a single Java process, which runs in a non distributed mode. Standalone mode is usually useful in development phase since it is easy to test and debug. Also, Hadoop daemons are not started in this mode. Since Hadoop's default properties are set to standalone mode and there are no Hadoop daemons to run, there are no additional steps to carry out here.

Pseudo-Distributed Mode

This mode simulates a small scale cluster, with Hadoop daemons running on a local machine. Each Hadoop daemon is run on a separate Java process. Pseudo-Distributed Mode is a special case of Fully distributed mode.

To enable Pseudo-Distributed Mode, you should edit following two XML files. These XML files contain multiple property elements within a single configuration element. Property elements contain name and value elements.
  1. etc/hadoop/core-site.xml
  2. etc/hadoop/hdfs-site.xml
Edit the core-site.xml and modify the following properties. fs.defaultFS property holds the locations of the NameNode.
 
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>
Edit the hdfs-site.xml and modify the following properties. dfs.replication property holds the number of times each HDFS block should be replicated.
 
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>
Setting up SSH

In a typical cluster, the Hadoop user should be able to execute commands on machines in the cluster, without having to enter a password for every single log in. If we use a password to log in to machines in the cluster, we will have to go on to each individual machine and start all the processes there.

Like I mentioned earlier, in Pseudo-Distributed Mode, we need to start Hadoop daemons. The host(single) being localhost, we need a way to log in to localhost without entering a password and start Hadoop daemons there. To do that we need ssh, we need to make sure we can ssh to localhost without giving a password. To allow password-less log in, we need to create a ssh key pair that has an empty password.

[ssh provides a way to securely log onto remote systems without using a password, using Key-based authentication. Key-based authentication creates a pair of keys; a private key and a public key. The private key will be kept as a secret at the client machine. The public key can be placed on any server you wish to access. Briefly what happens when a client tries to connect to a server is, server will generate a message to client using client's public key and only client can read it using it's private key. Based on the response server will get from client, server can decide if client is authorized or not.]
  1. ssh is pre-packaged with Ubuntu, but we need to install ssh first to start sshd server. Use the following command to install ssh and sshd.
     
    $ sudo apt-get install ssh
    
    Verify installation using following commands.
     
    $ which ssh
    ## Should print '/usr/bin/ssh'
    
    $ which sshd
    ## Should print '/usr/bin/sshd'
    
  2. Check if you can ssh to the localhost without a password.
     
    $ ssh localhost
    
    Note that if you try o ssh to the localhost without installing ssh first, an error message will be printed saying 'ssh: connect to host localhost port 22: Connection refused'. So be sure to install ssh first.
  3. If you cannot SSH to the localhost without a password create a ssh key pair using the following command.
     
    $ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
    
  4. Now the key pair has been created, note that id_rsa is the private key and id_rsa.pub is the public key are in .ssh directory. We need to include the new public key to the list of authorized keys using the following command.
     
    $ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
    
  5. Now connect to the localhost and check if you can ssh to the localhost without a password.
     
    $ ssh localhost
    
Configuring the base HDFS directory

hadoop.tmp.dir property within core-site.xml file holds the location to the base HDFS directory. Note that this property configuration doesn't depend on the mode Hadoop runs on. The default value for hadoop.tmp.dir property is /tmp/hadoop-${user.name}, and there is a risk that some linux distributions might discard the contents of the /tmp directory in the local file system on each reboot, and leads to data loss within the local file system, hence to be on the safer side, it makes sense to change the location of the base directory to a much reliable one.

Carry out following steps to change the location of the base HDFS directory.
  1. Create a directory for Hadoop to store its data locally and change its permissions to be writable by any user.
     
    $ mkdir /app/hadoop/tmp
    $ chmod 777 /app/hadoop/tmp
    
  2. Edit the core-site.xml and modify the following property.
     
    <configuration>
        <property>
            <name>hadoop.tmp.dir</name>
            <value>/app/hadoop/tmp</value>
        </property>
    </configuration>
    
Formatting the HDFS filesystem

We need to format the HDFS file system, before starting Hadoop cluster in Pseudo-Distributed Mode for the first time. Note that formatting the file system multiple times will result deleting the existing file system data.

Execute the following command on command line to format the HDFS file system.
 
$ hdfs namenode -format
Starting NameNode daemon and DataNode daemon
 
$ $HADOOP_PREFIX/sbin/start-dfs.sh












Now you can access the name node web interface at http://localhost:50070/.

You can use the following command to see the Java Processes which are currently running.
 
$ jps








Creating the home directory

In Hadoop the home directories for each user are stored under /user directory.

If there is no /user directory, create that using the following command. Note that even though you would skip this step, /user directory will be automatically created by Hadoop later, if necessary.
 
$ hdfs dfs -mkdir /user
Then create your home directory using the following command.
 
$ hdfs dfs -mkdir /user/hadoop
Note that this is an explicit step, even if you don't carry out this step, a home directory will be automatically created later by Hadoop by the name you are logged into your local machine. For an example if I logged into my local machine as a user called pavithra, my home directory in HDFS will be /user/pavithra. So in order to utilize the previous step you should log in to your local machine as a user called hadoop.

Starting a MapReduce job

  1. Use following command to create an input directory in HDFS.
     
    $ hdfs dfs -mkdir input
    
  2. Use following command to copy input files into HDFS.
     
    $ hdfs dfs -put $HADOOP_PREFIX/etc/hadoop/*.xml input
    
  3. Use the following command to run the MapReduced job, provided.
     
    $ hadoop jar $HADOOP_PREFIX/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.1.jar grep input output 'dfs[a-z.]+'
    
    Hadoop itself creates the output directory you have specified in this command. If by any chance you specify a directory which already exists in HDFS, Hadoop will throw an exception saying "Output directory already exists". With this Hadoop ensures data from previous jobs will not get replaced with the data from the current job.
  4. Use following commands to copy the output files from HDFS to local file system and view them.
     
    $ hdfs dfs -get output output
    $ cat output/*
    
    OR you can use following command to View the output files on HDFS itself.
     
    $ bin/hdfs dfs -cat output/*
    
    Note that result files inside the output directory follows a naming convention of part-nnnnn.
  5. Use the following command to stop the daemons.
  6.  
    $ $HADOOP_PREFIX/sbin/stop-dfs.sh
    

Friday, November 8, 2013

RESTful Web Services with Java

REST stands for REpresentational State Transfer, was first introduced by Roy Fielding in his thesis "Architectural Styles and the Design of Network-based Software Architectures" in year 2000.

REST is an architectural style. HTTP is a protocol which contains the set of REST architectural constraints.

REST fundamentals
  • Everything in REST is considered as a resource.
  • Every resource is identified by an URI.
  • Uses uniform interfaces. Resources are handled using POST, GET, PUT, DELETE operations which are similar to Create, Read, update and Delete(CRUD) operations.
  • Be stateless. Every request is an independent request. Each request from client to server must contain all the information necessary to understand the request.
  • Communications are done via representations. E.g. XML, JSON

RESTful Web Services

RESTful Web Services have embraced by large service providers across the web as an alternative to SOAP based Web Services due to its simplicity. This post will demonstrate how to create a RESTful Web Service and client using Jersey framework which extends JAX-RS API. Examples are done using Eclipse IDE and Java SE 6.

Creating RESTful Web Service
  • In Eclipse, create a new dynamic web project called "RESTfulWS"
  • Download Jersey zip bundle from here. Jersey version used in these examples is 1.17.1. Once you unzip it you'll have a directory called "jersey-archive-1.17.1". Inside it find the lib directory. Copy following jars from there and paste them inside WEB-INF -> lib folder in your project. Once you've done that, add those jars to your project build path as well.
    1. asm-3.1.jar
    2. jersey-client-1.17.1.jar
    3. jersey-core-1.17.1.jar
    4. jersey-server-1.17.1.jar
    5. jersey-servlet-1.17.1.jar
    6. jsr311-api-1.1.1.jar
  • In your project, inside Java Resources -> src create a new package called "com.eviac.blog.restws". Inside it create a new java class called "UserInfo". Also include the given web.xml file inside WEB-INF folder.
  • UserInfo.java
     
    package com.eviac.blog.restws;
    
    import javax.ws.rs.GET;
    import javax.ws.rs.Path;
    import javax.ws.rs.PathParam;
    import javax.ws.rs.Produces;
    import javax.ws.rs.core.MediaType;
    
    /**
     * 
     * @author pavithra
     * 
     */
    
    // @Path here defines class level path. Identifies the URI path that 
    // a resource class will serve requests for.
    @Path("UserInfoService")
    public class UserInfo {
    
     // @GET here defines, this method will method will process HTTP GET
     // requests.
     @GET
     // @Path here defines method level path. Identifies the URI path that a
     // resource class method will serve requests for.
     @Path("/name/{i}")
     // @Produces here defines the media type(s) that the methods
     // of a resource class can produce.
     @Produces(MediaType.TEXT_XML)
     // @PathParam injects the value of URI parameter that defined in @Path
     // expression, into the method.
     public String userName(@PathParam("i") String i) {
    
      String name = i;
      return "<User>" + "<Name>" + name + "</Name>" + "</User>";
     }
     
     @GET 
     @Path("/age/{j}") 
     @Produces(MediaType.TEXT_XML)
     public String userAge(@PathParam("j") int j) {
    
      int age = j;
      return "<User>" + "<Age>" + age + "</Age>" + "</User>";
     }
    }
    
    web.xml
    <?xml version="1.0" encoding="UTF-8"?>  
    <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" id="WebApp_ID" version="2.5">  
      <display-name>RESTfulWS</display-name>  
      <servlet>  
        <servlet-name>Jersey REST Service</servlet-name>  
        <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>  
        <init-param>  
          <param-name>com.sun.jersey.config.property.packages</param-name>  
          <param-value>com.eviac.blog.restws</param-value>  
        </init-param>  
        <load-on-startup>1</load-on-startup>  
      </servlet>  
      <servlet-mapping>  
        <servlet-name>Jersey REST Service</servlet-name>  
        <url-pattern>/rest/*</url-pattern>  
      </servlet-mapping>  
    </web-app>
    
  • To run the project, right click on it and click on run as ->run on server.
  • Execute the following URL in your browser and you'll see the output.
    http://localhost:8080/RESTfulWS/rest/UserInfoService/name/Pavithra
    
  • output
Creating Client
  • Create a package called "com.eviac.blog.restclient". Inside it create a java class called "UserInfoClient".
  • UserInfoClient.java
    package com.eviac.blog.restclient;
    
    import javax.ws.rs.core.MediaType;
    
    import com.sun.jersey.api.client.Client;
    import com.sun.jersey.api.client.ClientResponse;
    import com.sun.jersey.api.client.WebResource;
    import com.sun.jersey.api.client.config.ClientConfig;
    import com.sun.jersey.api.client.config.DefaultClientConfig;
    
    /**
     * 
     * @author pavithra
     * 
     */
    public class UserInfoClient {
    
     public static final String BASE_URI = "http://localhost:8080/RESTfulWS";
     public static final String PATH_NAME = "/UserInfoService/name/";
     public static final String PATH_AGE = "/UserInfoService/age/";
    
     public static void main(String[] args) {
    
      String name = "Pavithra";
      int age = 25;
    
      ClientConfig config = new DefaultClientConfig();
      Client client = Client.create(config);
      WebResource resource = client.resource(BASE_URI);
    
      WebResource nameResource = resource.path("rest").path(PATH_NAME + name);
      System.out.println("Client Response \n"
        + getClientResponse(nameResource));
      System.out.println("Response \n" + getResponse(nameResource) + "\n\n");
    
      WebResource ageResource = resource.path("rest").path(PATH_AGE + age);
      System.out.println("Client Response \n"
        + getClientResponse(ageResource));
      System.out.println("Response \n" + getResponse(ageResource));
     }
    
     /**
      * Returns client response.
      * e.g : 
      * GET http://localhost:8080/RESTfulWS/rest/UserInfoService/name/Pavithra 
      * returned a response status of 200 OK
      *
      * @param service
      * @return
      */
     private static String getClientResponse(WebResource resource) {
      return resource.accept(MediaType.TEXT_XML).get(ClientResponse.class)
        .toString();
     }
    
     /**
      * Returns the response as XML
      * e.g : <User><Name>Pavithra</Name></User> 
      * 
      * @param service
      * @return
      */
     private static String getResponse(WebResource resource) {
      return resource.accept(MediaType.TEXT_XML).get(String.class);
     }
    }
    
  • Once you run the client program, you'll get following output.
  • Client Response 
    GET http://localhost:8080/RESTfulWS/rest/UserInfoService/name/Pavithra returned a response status of 200 OK
    Response 
    <User><Name>Pavithra</Name></User>
    
    
    Client Response 
    GET http://localhost:8080/RESTfulWS/rest/UserInfoService/age/25 returned a response status of 200 OK
    Response 
    <User><Age>25</Age></User>
    
Enjoy!

Wednesday, November 7, 2012

Web Server in C

I implemented a web server in C language using only the standard libraries and thought it would be useful for you guys if I share the code.

The server runs on Linux and includes features like handling HTTP GET request, handling content types(txt, html, jpg, zip. rar, pdf, php etc.), sending proper HTTP error codes, serving the files from a web root, change in web root in a config file, zero copy optimization using sendfile method and php file handling. A port number should be provided as a command line argument.

After the server is up and running you can request for files using a web browser like Firefox.

For an example assume port number is "9000" and if you want to request a file called "test.php" which is in the webroot, use
http://localhost:9000/test.php 

WebServer.c
/*
 * WebServer.c
 *
 *  Created on: Nov 3, 2012
 *      Author: pavithra
 *
 * A web server in C language using only the standard libraries.
 * The port number is passed as an argument.
 *
 */

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <errno.h>

#define EOL "\r\n"
#define EOL_SIZE 2

typedef struct {
 char *ext;
 char *mediatype;
} extn;

//Possible media types
extn extensions[] ={
 {"gif", "image/gif" },
 {"txt", "text/plain" },
 {"jpg", "image/jpg" },
 {"jpeg","image/jpeg"},
 {"png", "image/png" },
 {"ico", "image/ico" },
 {"zip", "image/zip" },
 {"gz",  "image/gz"  },
 {"tar", "image/tar" },
 {"htm", "text/html" },
 {"html","text/html" },
 {"php", "text/html" },
 {"pdf","application/pdf"},
 {"zip","application/octet-stream"},
 {"rar","application/octet-stream"},
 {0,0} };

/*
 A helper function
 */
void error(const char *msg) {
 perror(msg);
 exit(1);
}

/*
 A helper function
 */
int get_file_size(int fd) {
 struct stat stat_struct;
 if (fstat(fd, &stat_struct) == -1)
  return (1);
 return (int) stat_struct.st_size;
}

/*
 A helper function
 */
void send_new(int fd, char *msg) {
 int len = strlen(msg);
 if (send(fd, msg, len, 0) == -1) {
  printf("Error in send\n");
 }
}

/*
 This function recieves the buffer
 until an "End of line(EOL)" byte is recieved
 */
int recv_new(int fd, char *buffer) {
 char *p = buffer; // Use of a pointer to the buffer rather than dealing with the buffer directly
 int eol_matched = 0; // Use to check whether the recieved byte is matched with the buffer byte or not
 while (recv(fd, p, 1, 0) != 0) // Start receiving 1 byte at a time
 {
  if (*p == EOL[eol_matched]) // if the byte matches with the first eol byte that is '\r'
    {
   ++eol_matched;
   if (eol_matched == EOL_SIZE) // if both the bytes matches with the EOL
   {
    *(p + 1 - EOL_SIZE) = '\0'; // End the string
    return (strlen(buffer)); // Return the bytes recieved
   }
  } else {
   eol_matched = 0;
  }
  p++; // Increment the pointer to receive next byte
 }
 return (0);
}

/*
 A helper function: Returns the
 web root location.
 */
char* webroot() {
 // open the file "conf" for reading
 FILE *in = fopen("conf", "rt");
 // read the first line from the file
 char buff[1000];
 fgets(buff, 1000, in);
 // close the stream
 fclose(in);
 char* nl_ptr = strrchr(buff, '\n');
 if (nl_ptr != NULL)
  *nl_ptr = '\0';
 return strdup(buff);
}

/*
 Handles php requests
 */
void php_cgi(char* script_path, int fd) {
 send_new(fd, "HTTP/1.1 200 OK\n Server: Web Server in C\n Connection: close\n");
 dup2(fd, STDOUT_FILENO);
 char script[500];
 strcpy(script, "SCRIPT_FILENAME=");
 strcat(script, script_path);
 putenv("GATEWAY_INTERFACE=CGI/1.1");
 putenv(script);
 putenv("QUERY_STRING=");
 putenv("REQUEST_METHOD=GET");
 putenv("REDIRECT_STATUS=true");
 putenv("SERVER_PROTOCOL=HTTP/1.1");
 putenv("REMOTE_HOST=127.0.0.1");
 execl("/usr/bin/php-cgi", "php-cgi", NULL);
}

/*
 This function parses the HTTP requests,
 arrange resource locations,
 check for supported media types,
 serves files in a web root,
 sends the HTTP error codes.
 */
int connection(int fd) {
 char request[500], resource[500], *ptr;
 int fd1, length;
 if (recv_new(fd, request) == 0) {
  printf("Recieve Failed\n");
 }
 printf("%s\n", request);
 // Check for a valid browser request
 ptr = strstr(request, " HTTP/");
 if (ptr == NULL) {
  printf("NOT HTTP !\n");
 } else {
  *ptr = 0;
  ptr = NULL;

  if (strncmp(request, "GET ", 4) == 0) {
   ptr = request + 4;
  }
  if (ptr == NULL) {
   printf("Unknown Request ! \n");
  } else {
   if (ptr[strlen(ptr) - 1] == '/') {
    strcat(ptr, "index.html");
   }
   strcpy(resource, webroot());
   strcat(resource, ptr);
   char* s = strchr(ptr, '.');
   int i;
   for (i = 0; extensions[i].ext != NULL; i++) {
    if (strcmp(s + 1, extensions[i].ext) == 0) {
     fd1 = open(resource, O_RDONLY, 0);
     printf("Opening \"%s\"\n", resource);
     if (fd1 == -1) {
      printf("404 File not found Error\n");
      send_new(fd, "HTTP/1.1 404 Not Found\r\n");
      send_new(fd, "Server : Web Server in C\r\n\r\n");
      send_new(fd, "<html><head><title>404 Not Found</head></title>");
      send_new(fd, "<body><p>404 Not Found: The requested resource could not be found!</p></body></html>");
      //Handling php requests
     } else if (strcmp(extensions[i].ext, "php") == 0) {
      php_cgi(resource, fd);
      sleep(1);
      close(fd);
      exit(1);
     } else {
      printf("200 OK, Content-Type: %s\n\n",
        extensions[i].mediatype);
      send_new(fd, "HTTP/1.1 200 OK\r\n");
      send_new(fd, "Server : Web Server in C\r\n\r\n");
      if (ptr == request + 4) // if it is a GET request
        {
       if ((length = get_file_size(fd1)) == -1)
        printf("Error in getting size !\n");
       size_t total_bytes_sent = 0;
       ssize_t bytes_sent;
       while (total_bytes_sent < length) {
        //Zero copy optimization
        if ((bytes_sent = sendfile(fd, fd1, 0,
          length - total_bytes_sent)) <= 0) {
         if (errno == EINTR || errno == EAGAIN) {
          continue;
         }
         perror("sendfile");
         return -1;
        }
        total_bytes_sent += bytes_sent;
       }

      }
     }
     break;
    }
    int size = sizeof(extensions) / sizeof(extensions[0]);
    if (i == size - 2) {
     printf("415 Unsupported Media Type\n");
     send_new(fd, "HTTP/1.1 415 Unsupported Media Type\r\n");
     send_new(fd, "Server : Web Server in C\r\n\r\n");
     send_new(fd, "<html><head><title>415 Unsupported Media Type</head></title>");
     send_new(fd, "<body><p>415 Unsupported Media Type!</p></body></html>");
    }
   }

   close(fd);
  }
 }
 shutdown(fd, SHUT_RDWR);
}

int main(int argc, char *argv[]) {
 int sockfd, newsockfd, portno, pid;
 socklen_t clilen;
 struct sockaddr_in serv_addr, cli_addr;

 if (argc < 2) {
  fprintf(stderr, "ERROR, no port provided\n");
  exit(1);
 }
 sockfd = socket(AF_INET, SOCK_STREAM, 0);
 if (sockfd < 0)
  error("ERROR opening socket");
 bzero((char *) &serv_addr, sizeof(serv_addr));
 portno = atoi(argv[1]);
 serv_addr.sin_family = AF_INET;
 serv_addr.sin_addr.s_addr = INADDR_ANY;
 serv_addr.sin_port = htons(portno);
 if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
  error("ERROR on binding");
 listen(sockfd, 5);
 clilen = sizeof(cli_addr);
 /*
  Server runs forever, forking off a separate
  process for each connection.
  */
 while (1) {
  newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
  if (newsockfd < 0)
   error("ERROR on accept");
  pid = fork();
  if (pid < 0)
   error("ERROR on fork");
  if (pid == 0) {
   close(sockfd);
   connection(newsockfd);
   exit(0);
  } else
   close(newsockfd);
 } /* end of while */
 close(sockfd);
 return 0; /* we never get here */
}
Make sure you have installed php and there's exist a "conf" file consisted of the webroot before running this.

Enjoy!

Monday, November 5, 2012

NS-2 Simulation

Installing NS2 on Ubuntu 11.04
sudo apt-get install ns2 nam
Here I have used NS-2 to simulate a network with the topology as shown below.


I have written tcl scripts to create the above topology and to show,
  1. When two TCP flows compete for the bandwidth they fairly share the bandwidth
  2. When a TCP flow and a UDP flow compete for the bandwidth there is no fair sharing
Also I have plotted the throughput at the receivers against time using xgraph.

Installing xgraph on Ubuntu 11.04
sudo apt-get install xgraph
To start the simulation
ns XXX.tcl
How two TCP flows compete for the bandwidth

tcp_tcp.tcl

This script simulates two TCP flows; one between n0 and n4 and one between n1 and n5. Also the bandwidth of L2 has selected in such a way that it becomes the bottleneck link and the "record" procedure has used to measure the throughput at the receivers. .
#Create a simulator object
set ns [new Simulator]

#Define different colors for data flows (for NAM)
$ns color 1 Blue
$ns color 2 Red

#Open the trace files outX.tr for Xgraph and out.nam for nam
set f0 [open out_tcp0.tr w]
set f1 [open out_tcp1.tr w]

#Open the NAM trace file
set nf [open out.nam w]
$ns namtrace-all $nf

#Define a 'finish' procedure
proc finish {} {
 global ns nf f0 f1
 $ns flush-trace
#Close the NAM trace file
 close $nf
#Close the output files
 close $f0
 close $f1
#Execute xgraph to display the results
 exec xgraph out_tcp0.tr out_tcp1.tr -geometry 600x400 &
#Execute NAM on the trace file
 exec nam out.nam &
 exit 0
}

#Create five nodes
set n0 [$ns node]
set n1 [$ns node]
set n2 [$ns node]
set n3 [$ns node]
set n4 [$ns node]
set n5 [$ns node]

#Create links between the nodes
$ns duplex-link $n0 $n2 2Mb 10ms DropTail
$ns duplex-link $n1 $n2 2Mb 10ms DropTail
$ns duplex-link $n2 $n3 1.7Mb 20ms DropTail
$ns duplex-link $n3 $n4 2Mb 10ms DropTail
$ns duplex-link $n3 $n5 2Mb 10ms DropTail

#Set Queue Size of link (n2-n3) to 20
$ns queue-limit $n2 $n3 20

#Give node position (for NAM)
$ns duplex-link-op $n0 $n2 orient right-down
$ns duplex-link-op $n1 $n2 orient right-up
$ns duplex-link-op $n2 $n3 orient right
$ns duplex-link-op $n3 $n4 orient right-up
$ns duplex-link-op $n3 $n5 orient right-down

#record procedure
proc record {} {
 global sink sink1 f0 f1
#Get an instance of the simulator
 set ns [Simulator instance]

#Set the time after which the procedure should be called again
 set time 0.5

#How many bytes have been received by the traffic sinks?
 set bw0 [$sink set bytes_]
 set bw1 [$sink1 set bytes_]

#Get the current time
 set now [$ns now]

#Calculate the bandwidth (in MBit/s) and write it to the files
 puts $f0 "$now [expr $bw0/$time*8/1000000]"
 puts $f1 "$now [expr $bw1/$time*8/1000000]"

#Reset the bytes_ values on the traffic sinks
 $sink set bytes_ 0
 $sink1 set bytes_ 0

#Re-schedule the procedure
 $ns at [expr $now+$time] "record"
}

#Setup a TCP connection
set tcp [new Agent/TCP]
$tcp set class_ 2
$ns attach-agent $n0 $tcp
set sink [new Agent/TCPSink]
$ns attach-agent $n4 $sink
$ns connect $tcp $sink
$tcp set fid_ 1

#Setup a FTP over TCP connection
set ftp [new Application/FTP]
$ftp attach-agent $tcp
$ftp set type_ FTP

#Setup a TCP connection
set tcp1 [new Agent/TCP]
$tcp1 set class_ 2
$ns attach-agent $n1 $tcp1
set sink1 [new Agent/TCPSink]
$ns attach-agent $n5 $sink1
$ns connect $tcp1 $sink1
$tcp1 set fid_ 2

#Setup a FTP over TCP connection
set ftp1 [new Application/FTP]
$ftp1 attach-agent $tcp1
$ftp1 set type_ FTP

#Start logging the received bandwidth
$ns at 0.0 "record"

#Schedule events for the FTP agents
$ns at 0.1 "$ftp start"
$ns at 0.8 "$ftp1 start"
$ns at 4.0 "$ftp1 stop"
$ns at 4.8 "$ftp stop"

#Call the finish procedure after 5 seconds of simulation time
$ns at 5.0 "finish"

#Run the simulation
$ns run

Graph


Above graph visualization clearly shows how two TCP flows fairly share the bandwidth.

How a TCP flow and a UDP flow compete for the bandwidth

tcp_udp.tcl

This script simulates one TCP flow (between n0 and n4) and one UDP flow (between n1 and n5). Again the bandwidth of L2 has selected in such a way that it becomes the bottleneck link and the "record" procedure has used to measure the throughput at the receivers.
#Create a simulator object
set ns [new Simulator]

#Define different colors for data flows (for NAM)
$ns color 1 Blue
$ns color 2 Red

#Open the trace files outX.tr for Xgraph and out.nam for nam
set f0 [open out_tcp.tr w]
set f1 [open out_udp.tr w]

#Open the NAM trace file
set nf [open out_udptcp.nam w]
$ns namtrace-all $nf

#Define a 'finish' procedure
proc finish {} {
 global ns nf f0 f1
 $ns flush-trace
#Close the NAM trace file
 close $nf
#Close the output files
 close $f0
 close $f1
#Execute xgraph to display the results
 exec xgraph out_tcp.tr out_udp.tr -geometry 600x400 &
#Execute NAM on the trace file
 exec nam out_udptcp.nam &
 exit 0
}

#Create five nodes
set n0 [$ns node]
set n1 [$ns node]
set n2 [$ns node]
set n3 [$ns node]
set n4 [$ns node]
set n5 [$ns node]

#Create links between the nodes
$ns duplex-link $n0 $n2 2Mb 10ms DropTail
$ns duplex-link $n1 $n2 2Mb 10ms DropTail
$ns duplex-link $n2 $n3 1.7Mb 20ms DropTail
$ns duplex-link $n3 $n4 2Mb 10ms DropTail
$ns duplex-link $n3 $n5 2Mb 10ms DropTail

#Set Queue Size of link (n2-n3) to 20
$ns queue-limit $n2 $n3 20

#Give node position (for NAM)
$ns duplex-link-op $n0 $n2 orient right-down
$ns duplex-link-op $n1 $n2 orient right-up
$ns duplex-link-op $n2 $n3 orient right
$ns duplex-link-op $n3 $n4 orient right-up
$ns duplex-link-op $n3 $n5 orient right-down

#record procedure
proc record {} {
 global sink sink1 f0 f1
#Get an instance of the simulator
 set ns [Simulator instance]

#Set the time after which the procedure should be called again
 set time 0.5

#How many bytes have been received by the traffic sinks?
 set bw0 [$sink set bytes_]
 set bw1 [$sink1 set bytes_]

#Get the current time
 set now [$ns now]

#Calculate the bandwidth (in MBit/s) and write it to the files
 puts $f0 "$now [expr $bw0/$time*8/1000000]"
 puts $f1 "$now [expr $bw1/$time*8/1000000]"

#Reset the bytes_ values on the traffic sinks
 $sink set bytes_ 0
 $sink1 set bytes_ 0

#Re-schedule the procedure
 $ns at [expr $now+$time] "record"
}

#Setup a TCP connection
set tcp [new Agent/TCP]
$tcp set class_ 2
$ns attach-agent $n0 $tcp
set sink [new Agent/TCPSink]
$ns attach-agent $n4 $sink
$ns connect $tcp $sink
$tcp set fid_ 1

#Setup a FTP over TCP connection
set ftp [new Application/FTP]
$ftp attach-agent $tcp
$ftp set type_ FTP

#Setup a UDP connection
set udp [new Agent/UDP]
$ns attach-agent $n1 $udp
set sink1 [new Agent/LossMonitor]
$ns attach-agent $n5 $sink1
$ns connect $udp $sink1
$udp set fid_ 2

#Setup a CBR over UDP connection
set cbr [new Application/Traffic/CBR]
$cbr attach-agent $udp
$cbr set type_ CBR
$cbr set packet_size_ 1000
$cbr set rate_ 2mb
$cbr set random_ false

#Start logging the received bandwidth
$ns at 0.0 "record"

#Schedule events for the CBR and FTP agents
$ns at 0.1 "$cbr start"
$ns at 0.8 "$ftp start"
$ns at 4.0 "$ftp stop"
$ns at 4.8 "$cbr stop"

#Call the finish procedure after 5 seconds of simulation time
$ns at 5.0 "finish"

#Run the simulation
$ns run
Graph


Above graph visualization clearly shows, when it comes to a TCP flow and a UDP flow there is no fair sharing the bandwidth, UDP gets the most of it.