Tuesday, November 25, 2014

Hadoop MapReduce Features : Custom Data Types

Hadoop requires every data type to be used as keys to implement Writable and Comparable interfaces and every data type to be used as values to implement Writable interface.

Writable interface

Writable interface provides a way to serialize and deserialize data, across network. Located in org.apache.hadoop.io package.
public interface Writable {
   void write(DataOutput out) throws IOException;
   void readFields(DataInput in) throws IOException;
}

Comparable interface

Hadoop uses Java's Comparable interface to facilitate the sorting process. Located in java.lang package.
public interface Comparable<T> {
   public int compareTo(T o);
}
WritableComparable interface

For convenience Hadoop provides a WritableComparable interface which wraps both Writable and Comparable interfaces to a single interface. Located in org.apache.hadoop.io package.
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
Hadoop provides a set of classes; Text, IntWritable, LongWritable, FloatWritable, BooleanWritable etc..., which implement WritableComparable interface, and therefore can be straightly used as key and value types.

Custom Data Types

Example

Assume you want an object representation of a vehicle to be the type of your key or value. Three properties of a vehicle has taken in to consideration.
  1. Manufacturer
  2. Vehicle Identification Number(VIN)
  3. Mileage
Note that the Vehicle Identification Number(VIN) is unique for each vehicle.

A tab delimited file containing the observations of these three variables contains following sample data in it.

Sample Data
Toyota 1GCCS148X48370053 10000
Toyota 1FAPP64R1LH452315 40000
BMW WP1AA29P58L263510 10000
BMW JM3ER293470820653 60000
Nissan 3GTEC14V57G579789 10000
Nissan 1GNEK13T6YJ290558 25000
Honda 1GC4KVBG6AF244219 10000
Honda 1FMCU5K39AK063750 30000
Custom Value Types

Hadoop provides the freedom of creating custom value types by implementing the Writable interface. Implementing the writable interface one should implement its two abstract methods write() and readFields(). In addition to that, in the following example java code, I have overridden the toString() method to return the text representation of the object.
package net.eviac.blog.datatypes.value;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

/**
 * @author pavithra
 * 
 * Custom data type to be used as a value in Hadoop.
 * In Hadoop every data type to be used as values must implement Writable interface.
 *
 */
public class Vehicle implements Writable {

  private String model;
  private String vin;
  private int mileage;

  public void write(DataOutput out) throws IOException {
    out.writeUTF(model);
    out.writeUTF(vin);
    out.writeInt(mileage);
  }

  public void readFields(DataInput in) throws IOException {
    model = in.readUTF();
    vin = in.readUTF();
    mileage = in.readInt();
  }

  @Override
  public String toString() {
    return model + ", " + vin + ", "
        + Integer.toString(mileage);
  }

  public String getModel() {
    return model;
  }
  public void setModel(String model) {
    this.model = model;
  }
  public String getVin() {
    return vin;
  }
  public void setVin(String vin) {
    this.vin = vin;
  }
  public int getMileage() {
    return mileage;
  }
  public void setMileage(int mileage) {
    this.mileage = mileage;
  }
  
}
Following MapReduce job, outputs total Mileage per Manufacturer, with using Vehicle as a custom value type.
package net.eviac.blog.datatypes.jobs;

import java.io.IOException;

import net.eviac.blog.datatypes.value.Vehicle;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;

/**
 * @author pavithra
 *
 */
public class ModelTotalMileage {
  
  static final Logger logger = Logger.getLogger(ModelTotalMileage.class);

  public static class ModelMileageMapper
  extends Mapper<Object, Text, Text, Vehicle>{

    private Vehicle vehicle = new Vehicle();
    private Text model = new Text();

    public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {

      String var[] = new String[6];
      var = value.toString().split("\t"); 

      if(var.length == 3){
        model.set(var[0]);
        vehicle.setModel(var[0]);
        vehicle.setVin(var[1]);
        vehicle.setMileage(Integer.parseInt(var[2]));
        context.write(model, vehicle);
      }
    }
  }

  public static class ModelTotalMileageReducer
  extends Reducer<Text,Vehicle,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<Vehicle> values,
        Context context
        ) throws IOException, InterruptedException {
      int totalMileage = 0;
      for (Vehicle vehicle : values) {
        totalMileage += vehicle.getMileage();
      }
      result.set(totalMileage);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    BasicConfigurator.configure();
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Model Total Mileage");
    job.setJarByClass(ModelTotalMileage.class);
    job.setMapperClass(ModelMileageMapper.class);
    job.setReducerClass(ModelTotalMileageReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Vehicle.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}
Output
BMW 70000
Honda 40000
Nissan 35000
Toyota 50000
Custom Key Types

Hadoop provides the freedom of creating custom key types by implementing the WritableComparable interface. In addition to using Writable interface, so they can be transmitted over the network, keys must implement Java's Comparable interface to facilitate the sorting process. Since outputs are sorted on keys by the framework, only the data type to be used as keys must implement Comparable interface.

Implementing Comparable interface a class should implement its abstract method compareTo(). A Vehicle object must be able to be compared to other vehicle objects, to facilitate the sorting process. Sorting process uses compareTo(), to determine how Vehicle objects should be sorted. For an instance, since VIN is a String and String implements Comparable, we can sort vehicles by VIN.

For partitioning process, it is important for key types to implement hashCode() as well, thus should override the equals() as well. hashCode() should use the same variable as equals() which in our case is VIN. If equals() method says two Vehicle objects are equal if they have the same VIN, Vehicle objects with the same VIN will have to return identical hash codes.

Following example provides a java code for complete custom key type.
package net.eviac.blog.datatypes.key;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * @author pavithra
 * 
 * Custom data type to be used as a key in Hadoop.
 * In Hadoop every data type to be used as keys must implement WritableComparable interface.
 *
 */
public class Vehicle implements WritableComparable<Vehicle> {

  private String model;
  private String vin;
  private int mileage;

  public void write(DataOutput out) throws IOException {
    out.writeUTF(model);
    out.writeUTF(vin);
    out.writeInt(mileage);
  }

  public void readFields(DataInput in) throws IOException {
    model = in.readUTF();
    vin = in.readUTF();
    mileage = in.readInt();
  }

  @Override
  public String toString() {
    return model + ", " + vin + ", "
        + Integer.toString(mileage);
  }
  
  public int compareTo(Vehicle o) {
    return vin.compareTo(o.getVin());
  } 
  
  @Override
  public boolean equals(Object obj) {
    if((obj instanceof Vehicle) && (((Vehicle)obj).getVin().equals(vin))){
      return true;
    }else {
      return false;
    }    
  }
  
  @Override
  public int hashCode() {
    int ascii = 0;
    for(int i=1;i<=vin.length();i++){
      char character = vin.charAt(i);
      ascii += (int)character;
    }
    return ascii;
  }

  public String getModel() {
    return model;
  }
  public void setModel(String model) {
    this.model = model;
  }
  public String getVin() {
    return vin;
  }
  public void setVin(String vin) {
    this.vin = vin;
  }
  public int getMileage() {
    return mileage;
  }
  public void setMileage(int mileage) {
    this.mileage = mileage;
  }  
  
}
Following MapReduce job, outputs Mileage per vehicle, with using Vehicle as a custom key type.
package net.eviac.blog.datatypes.jobs;

import java.io.IOException;

import net.eviac.blog.datatypes.key.Vehicle;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;

/**
 * @author pavithra
 *
 */
public class VehicleMileage {
  
  static final Logger logger = Logger.getLogger(VehicleMileage.class);

  public static class VehicleMileageMapper
  extends Mapper<Object, Text, Vehicle, IntWritable>{

    private Vehicle vehicle = new Vehicle();
    private IntWritable mileage = new IntWritable();

    public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {

      String var[] = new String[6];
      var = value.toString().split("\t"); 

      if(var.length == 3){
        mileage.set(Integer.parseInt(var[2]));
        vehicle.setModel(var[0]);
        vehicle.setVin(var[1]);
        vehicle.setMileage(Integer.parseInt(var[2]));
        context.write(vehicle, mileage);
      }
    }
  }

  public static class VehicleMileageReducer
  extends Reducer<Vehicle,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
    private Text vin = new Text();

    public void reduce(Vehicle key, Iterable<IntWritable> values,
        Context context
        ) throws IOException, InterruptedException {
      int totalMileage = 0;
      for (IntWritable mileage : values) {
        totalMileage += mileage.get();
      }
      result.set(totalMileage);
      vin.set(key.getVin());
      context.write(vin, result);
    }
  }

  public static void main(String[] args) throws Exception {
    BasicConfigurator.configure();
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Model Total Mileage");
    job.setJarByClass(VehicleMileage.class);
    job.setMapperClass(VehicleMileageMapper.class);
    job.setReducerClass(VehicleMileageReducer.class);
    job.setMapOutputKeyClass(Vehicle.class);
    job.setOutputKeyClass(Text.class);    
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}
Output
1FAPP64R1LH452315 40000
1FMCU5K39AK063750 30000
1GC4KVBG6AF244219 10000
1GCCS148X48370053 10000
1GNEK13T6YJ290558 25000
3GTEC14V57G579789 10000
JM3ER293470820653 60000
WP1AA29P58L263510 10000

11 comments :

  1. The advanced MapReduce features describe the execution and lower level details. In normal MapReduce programming, only knowing APIs and their usage are sufficient to write applications. But inner details of MapReduce are a must to understand the actual working details and gain confidence.

    For details : Hadoop Training in Chennai

    ReplyDelete
  2. There are lots of information about latest technology, like Hadoop cluster is a special type of computational cluster designed specifically for storing and analyzing huge amounts of unstructured data in a distributed computing environment. This information seems to be more unique and interesting. Thanks for sharing.
    Big Data Training Chennai | Big Data Course in Chennai | Big Data Hadoop Training in Chennai

    ReplyDelete
  3. Projects are Fully hands-on manner only in Wiztech Automation. In Final year projects 100% Practical methods only Followed by Wiztech to Improve the Student Knowledge in each and every part of the Projects. Wiztech Handle all domains in Engineering Field. We having the Latest IEEE Project and NON-IEEE Projects. So only Wiztech having the best Name among the Final Year Project Center .

    Final Year Projects in Chennai
    Final Year Project centres in Chennai
    Final Year Projects in Chennai
    IEEE Projects in Chennai
    IEEE Project Centers in Chennai
    Best Final Year Project Centres in Chennai
    Mechanical Projects in Chennai
    Mechanical Project Centers in Chennai
    Mechanical Project Centres in Chennai


    ReplyDelete
  4. Jharkhand Labour Department Recruitment 2016


    I have visited this blog first time and i got a lot of informative data from here which is quiet helpful for me indeed.........

    ReplyDelete

  5. too good piece of information, I had come to know about your site from my friend sajid, bangalore,i have read atleast 7 posts of yours by now, and let me tell you, your web-page gives the best and the most interesting information. This is just the kind of information that i had been looking for, i'm already your rss reader now and i would regularly watch out for the new post, once again hats off to you! Thanks a lot once again, Regards, ,servicenow training in hyderabad

    ReplyDelete
  6. new horizon security services manassas va I am really impressed with your efforts and really pleased to visit this post.

    ReplyDelete
  7. Best software training institute in Chennai. Make your career development the best by learning software courses.

    rpa training in chennai
    uipath training in chennai
    cloud computing courses in chennai

    ReplyDelete
  8. This comment has been removed by the author.

    ReplyDelete