Hadoop MapReduce framework provides a way to process large data, in parallel, on large clusters of commodity hardware.
[Processing a large file serially from top to bottom could be a very time consuming task, instead, in brief, MapReduce breaks that large file into chunks and processes in parallel.]
HDFS splits data into chunks which are called blocks and stores them across multiple nodes within the cluster. HDFS distributes these blocks across different nodes, if possible. A typical block size in HDFS is 64 MB(you can configure this value using dfs.blocksize property within hdfs-default.xml file). Note that changing this setting will not affect the block size of any files currently in HDFS. It will only affect the block size of files placed into HDFS after this setting has taken effect. All blocks in a file except the last block are of the same size. Each block is given a unique name which is in the form of blk_<large number>. Each block is replicated, by default 3 times(you can configure this value using dfs.replication property within hdfs-default.xml file) across different nodes to ensure availability.
Suppose we have a file of size 160MB, as the file is loaded into HDFS, it splits into 3 blocks. The first block and the second block is of 64MB in size and the third one is of 32MB in size, to makeup 160MB file.
NameNode
NameNode keeps metadata of HDFS files, it does not store data itself. NameNode daemon runs on a Master node and there is only one NameNode per Hadoop cluster. NameNode runs on a seperate JVM process, in a typical production cluster there is a separate node which runs NameNode process.
NameNode does not store the location for each block, they are acquired from each DataNode at cluster startup , and keep them in memory and persisted to a file in its namespace called 'fsimage'. Changes during operations are stored in memory and logged into a file called 'edit', which is also in the NameNode's namespace. The SecondaryNameNode is a daemon process, which does housekeeping functions for NameNode, periodically merge 'fsimage' with 'edits'.
Since there is a single NameNode per Hadoop cluster, it is a single point of failure. If the NameNode becomes unavailable, the entire cluster becomes inaccessible. You may have noticed DataNodes does not contain metadata of the data blocks, but only actual data blocks. Therefore losing the NameNode makes entire cluster inaccessible and useless.
If 'fsimage' and 'edits' files get corrupted, all the data in HDFS becomes inaccessible. Even though a bunch of commodity machines(JBOD) can be used for DataNodes, more reliable RAID-based storage must be used for NameNodes to assure reliability. Also 'fsimage' and 'edits' files must regularly be backed up.
Hadoop prefers large files over a large number of small files. Since only the data of one file can be stored in a data block, if there are a large number of small files present, they have to be stored in different separate data blocks, which in turn results a huge number of data blocks. During cluster operations, NameNode pulls all the metadata of these data blocks and stores them in memory. Which can simply overwhelm the NameNode.
DataNode
DataNodes daemons which run on slave nodes, store HDFS blocks. There is only one DataNode process runs per slave node. DataNodes also run on seperate JVM processes. DataNodes periodically send heartbeats to the NameNode to indicate they are alive. DataNodes also can talk to each other, for an instance they talk to each other during data replication.
When a client wants to perform an operation on a file, it first contacts NameNode to locate that file. NameNode then sends the locations of nodes that file is stored on as HDFS blocks. Client can then directly talk to DataNodes and perform the operation the file.
JobTracker
JobTracker daemon which runs on a master node, tracks MapReduce jobs. There is only one JobTracker daemon per Hadoop cluster. JobTracker runs on a seperate JVM process, in a typical production cluster there is a separate node which runs JobTracker process.
TaskTracker
TaskTracker daemons which run on slave nodes, handles tasks(map, reduce) recieved from the JobTracker. There is only one TaskTracker per slave node. Every TaskTracker is setup with a set of slots which specifies the number of tasks it can handle(A TaskTracker can configured to handle multiple map and reduce tasks). TaskTracker starts up separate JVM processes for each task to isolate it from the problems caused by tasks. DataNodes periodically send heartbeats to the JobTracker to indicate they are alive and to inform the number of available slots.
Running a MapReduce job, client application submits jobs to the JobTracker. JobTracker then talks to the NameNode to locate necessary data blocks. Then the JobTracker choose TaskTrackers with free slots, which runs on the same nodes which contains data or within the same rack as data. TaskTrackers then start separate JVM processes for each task(can also use JVM Reuse), and monitor them, while the JobTracker monitors TaskTrackers for failures. When a task is done TaskTracker informs the JobTracker.
Both HDFS and MapReduce framework run on the same set of nodes, in other words storage nodes(DataNodes in HDFS) and compute nodes(nodes which TaskTrackers run on) are the same. In Hadoop computations are moved to the data, not the other way around.
Hadoop uses InputSplit Java interface, which is in the org.apache.hadoop.mapred package, to represent an Input Split.
Hadoop ensures that all intermediate records with the same key end up in the same reducer. The default partitioner used by MapReduce framework is HashPartitioner.
Shuffle and Sort
MapReduce ensures that the input to a reducer is sorted by key. The shuffle and sort phases occur simultaneously. It's the process of performing sort and transferring intermediate mapper outputs to the reducers as inputs. As the outputs are fetched by the reducer, they get merged.
Let's take an example, assume we have two input files, one containing a word 'Hello World Bye World' and the other containing a word 'Bye World Bye' in it. In this particular case;
W/O Combiner
1st Mapper emits;
1st Mapper emits;
For the 1st Mapper;
[Processing a large file serially from top to bottom could be a very time consuming task, instead, in brief, MapReduce breaks that large file into chunks and processes in parallel.]
A little note on HDFS
HDFS, Hadoop Distributed File System, is a fault tolerant, distributed storage, which is responsible for storing large data, on large clusters of commodity hardware.HDFS splits data into chunks which are called blocks and stores them across multiple nodes within the cluster. HDFS distributes these blocks across different nodes, if possible. A typical block size in HDFS is 64 MB(you can configure this value using dfs.blocksize property within hdfs-default.xml file). Note that changing this setting will not affect the block size of any files currently in HDFS. It will only affect the block size of files placed into HDFS after this setting has taken effect. All blocks in a file except the last block are of the same size. Each block is given a unique name which is in the form of blk_<large number>. Each block is replicated, by default 3 times(you can configure this value using dfs.replication property within hdfs-default.xml file) across different nodes to ensure availability.
Suppose we have a file of size 160MB, as the file is loaded into HDFS, it splits into 3 blocks. The first block and the second block is of 64MB in size and the third one is of 32MB in size, to makeup 160MB file.
NameNode
NameNode keeps metadata of HDFS files, it does not store data itself. NameNode daemon runs on a Master node and there is only one NameNode per Hadoop cluster. NameNode runs on a seperate JVM process, in a typical production cluster there is a separate node which runs NameNode process.
NameNode does not store the location for each block, they are acquired from each DataNode at cluster startup , and keep them in memory and persisted to a file in its namespace called 'fsimage'. Changes during operations are stored in memory and logged into a file called 'edit', which is also in the NameNode's namespace. The SecondaryNameNode is a daemon process, which does housekeeping functions for NameNode, periodically merge 'fsimage' with 'edits'.
Since there is a single NameNode per Hadoop cluster, it is a single point of failure. If the NameNode becomes unavailable, the entire cluster becomes inaccessible. You may have noticed DataNodes does not contain metadata of the data blocks, but only actual data blocks. Therefore losing the NameNode makes entire cluster inaccessible and useless.
If 'fsimage' and 'edits' files get corrupted, all the data in HDFS becomes inaccessible. Even though a bunch of commodity machines(JBOD) can be used for DataNodes, more reliable RAID-based storage must be used for NameNodes to assure reliability. Also 'fsimage' and 'edits' files must regularly be backed up.
Hadoop prefers large files over a large number of small files. Since only the data of one file can be stored in a data block, if there are a large number of small files present, they have to be stored in different separate data blocks, which in turn results a huge number of data blocks. During cluster operations, NameNode pulls all the metadata of these data blocks and stores them in memory. Which can simply overwhelm the NameNode.
DataNode
DataNodes daemons which run on slave nodes, store HDFS blocks. There is only one DataNode process runs per slave node. DataNodes also run on seperate JVM processes. DataNodes periodically send heartbeats to the NameNode to indicate they are alive. DataNodes also can talk to each other, for an instance they talk to each other during data replication.
When a client wants to perform an operation on a file, it first contacts NameNode to locate that file. NameNode then sends the locations of nodes that file is stored on as HDFS blocks. Client can then directly talk to DataNodes and perform the operation the file.
Daemons of MapReduce
There are two daemon processes we have to look into; JobTracker daemon and TaskTracker daemon.JobTracker
JobTracker daemon which runs on a master node, tracks MapReduce jobs. There is only one JobTracker daemon per Hadoop cluster. JobTracker runs on a seperate JVM process, in a typical production cluster there is a separate node which runs JobTracker process.
TaskTracker
TaskTracker daemons which run on slave nodes, handles tasks(map, reduce) recieved from the JobTracker. There is only one TaskTracker per slave node. Every TaskTracker is setup with a set of slots which specifies the number of tasks it can handle(A TaskTracker can configured to handle multiple map and reduce tasks). TaskTracker starts up separate JVM processes for each task to isolate it from the problems caused by tasks. DataNodes periodically send heartbeats to the JobTracker to indicate they are alive and to inform the number of available slots.
Running a MapReduce job, client application submits jobs to the JobTracker. JobTracker then talks to the NameNode to locate necessary data blocks. Then the JobTracker choose TaskTrackers with free slots, which runs on the same nodes which contains data or within the same rack as data. TaskTrackers then start separate JVM processes for each task(can also use JVM Reuse), and monitor them, while the JobTracker monitors TaskTrackers for failures. When a task is done TaskTracker informs the JobTracker.
Both HDFS and MapReduce framework run on the same set of nodes, in other words storage nodes(DataNodes in HDFS) and compute nodes(nodes which TaskTrackers run on) are the same. In Hadoop computations are moved to the data, not the other way around.
Input Split
Input split is a chunk of an input that is processed by a single map. Each map is responsible for processing a single Input Split. An Input Split has a length in bytes and a set of storage locations. Input Split doesn't contain the input data, but a reference to the data. In other words, an Input Split is logical and has a reference to the input data which are physically stored in HDFS as HDFS blocks.Hadoop uses InputSplit Java interface, which is in the org.apache.hadoop.mapred package, to represent an Input Split.
public interface InputSplit extends Writable { long getLength() throws IOException; String[] getLocations() throws IOException; }
Records
Each Input Split is divided into Records, within a map task. Records are Key/Value pairs and logical. Map task processes each Record,one after the other.InputFormat
InputSplit s are generated using an interface, InputFormat, which is in the org.apache.hadoop.mapred package.public interface InputFormat<K, V> { InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; }An InputFormat does two tings,
- Generating Input Splits.
- Dividing Input Splits into Records.
FileInputFormat
FileInputFormat is the base class for all file-based InputFormats, which extends InputFormat interface. This does two things;- Defining input paths for a MapReduce job.
- Giving an implementation for generating splits for the input files.
- mapred.min.split.size
- mapred.max.split.size
- dfs.block.size
Inputs and Outputs
MapReduce framework operates on a series of Key/Value transformations, where input to a MapReduce job is a set of {key, value} pairs and output is also a set of {key, value} pairs. Note that types of input {key, value} pairs possibly could be different from types of output {key, value} pairs.(input) {k1, v1} -> map -> {k2, List(v2)} -> reduce -> {k3, v3} (output)Every data type to be used as keys must implement Writable and Comparable interfaces and every data type to be used as values must implement Writable interface. Writable interface provides a way to serialize and deserialize data, across network. Since outputs are sorted on keys by the framework, to facilitate the sorting process, only the data type to be used as keys must implement Comparable interface.
WordCount - Example MapReduce Program
Following example MapReduce program is the exact same one that you will find in Hadoop MapReduce Tutorial. This code works with all three modes; Standalone mode, Pseudo-distributed mode and Fully-distributed mode.import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { // Creating a configuration object Configuration conf = new Configuration(); // Creating an instance of Job class Job job = Job.getInstance(conf, "word count"); // Setting the name of the main class within the jar file job.setJarByClass(WordCount.class); // Setting the mapper class job.setMapperClass(TokenizerMapper.class); // Setting the combiner class job.setCombinerClass(IntSumReducer.class); // Setting the reducer class job.setReducerClass(IntSumReducer.class); // Setting the data type of the output(final) key job.setOutputKeyClass(Text.class); // Setting the data type of the output(final) value job.setOutputValueClass(IntWritable.class); // Setting input file path, the 1st argument passed in to the main method is used. FileInputFormat.addInputPath(job, new Path(args[0])); // Setting output file path,the 2nd argument passed in to the main method is used. FileOutputFormat.setOutputPath(job, new Path(args[1])); // Running the job and wait for it to get completed System.exit(job.waitForCompletion(true) ? 0 : 1); } }To perform a MapReduce job, we need a mapper implementation and a reducer implementation. As you can see in the above code, mapper and reducer classes are defined as inner classes. Also MapReduce job configuration happens within its main method, mainly with the use of an instance of Job class. You can go through the WordCount.java code and refer the comments I've added in there, to understand these configurations.
Mapper
In the WordCount example, you can see the mapper implementation is named as 'TokenizerMapper', which extends the base Mapper class provided by Hadoop and has overridden its map method. As you can see, map method has three parameters,- Input key
- Input value
- An instance of the Context class, which is used to emit the results.
Partitioning, Shuffle and Sort
PartitioningHadoop ensures that all intermediate records with the same key end up in the same reducer. The default partitioner used by MapReduce framework is HashPartitioner.
Shuffle and Sort
MapReduce ensures that the input to a reducer is sorted by key. The shuffle and sort phases occur simultaneously. It's the process of performing sort and transferring intermediate mapper outputs to the reducers as inputs. As the outputs are fetched by the reducer, they get merged.
Combiner
Hadoop allows the use of an optional Combiner class to run on mapper outputs. Specifying a Combiner class, each mapper output will go through a local Combiner and will perform sorting on keys, local aggregation on them. Combiner output creates the input to the reducer. Combiner class is an optimization, so there is no guarantee of how many times it will run on a mapper output, it could be zero, one or more times. Therefore we must be absolutely sure when specifying a Combiner, that the job will produce the same output from reducer regardless of how many times the Combiner runs on a mapper output. WordCount example has specified a combiner which is same as the reducer.Reducer
In the WordCount example, you can see the reducer implementation is named as 'IntSumReducer', which extends the base Reducer class provided by Hadoop and has overridden its reduce method. As you can see, reduce method has three parameters,- Input key
- Input list of values as an Iterable object
- An instance of the Context class, which is used to emit the results.
Let's take an example, assume we have two input files, one containing a word 'Hello World Bye World' and the other containing a word 'Bye World Bye' in it. In this particular case;
W/O Combiner
1st Mapper emits;
{Hello, 1} {World, 1} {Bye, 1} {World, 1}2nd Mapper emits;
{Bye, 1} {World, 1} {Bye, 1}After shuffle and sort phase, inputs to the reducer;
{Bye, (1,1,1)} {Hello,1} {World, (1,1,1)}Reducer emits;
{Bye, 3} {Hello,1} {World, 3}With Combiner
1st Mapper emits;
{Hello, 1} {World, 1} {Bye, 1} {World, 1}2nd Mapper emits;
{Bye, 1} {World, 1} {Bye, 1}Combiner does a local aggregation and mapper outputs get sorted on keys;
For the 1st Mapper;
{Bye, 1} {Hello, 1} {World, 2}For the 2nd Mapper;
{Bye, 2} {World, 1}Reducer emits;
{Bye, 3} {Hello, 1} {World, 3}
Running a MapReduce job
-
Add hadoop classpath to your classpath using the following command.
$export CLASSPATH=`hadoop classpath`:$CLASSPATH
-
Now compile WordCount.java using the following command.
$javac WordCount.java
Create the job jar file.
-
If you are using Standalone mode to run the job, you can simply use the following command.
$hadoop jar wc.jar WordCount input output
As you can see there are four arguments to this command,- Name of the jar file.
- Name of the main class within the jar file.
- The input file location in your local machine.
Viewing the inputs$ls input ## file01 file02 $cat input/file01 ## Hello World Bye World $cat input/file02 ## Bye World Bye
- The output file location.
$cat output/part-r-00000
In a successful execution of the job, following should be the output.Bye 3 Hello 1 World 3
-
If you are using Pseudo-distributed mode to run the job, First place the job jar into a desired location(I have used my HDFS home) on HDFS, using the following command.
$hdfs dfs -put wc.jar /user/pavithra
you can use the following command to run the job.$hadoop jar wc.jar WordCount input output
As you can see there are four arguments to this command,- Name of the jar file.
- Name of the main class within the jar file.
- The input file location in HDFS. This is relative to your home directory in HDFS. In my case, the full path to home directory would be /user/pavithra/input.
Viewing the inputs$hdfs dfs -ls input ## file01 file02 $hdfs dfs -cat input/file01 ## Hello World Bye World $hdfs dfs -cat input/file02 ## Bye World Bye
- The output file location. This is also relative to your home directory in HDFS. The full path would be in my case, user/pavithra/output
hdfs dfs -cat output/part-r-00000
In a successful execution of the job following should be the output.Bye 3 Hello 1 World 3
$jar cf wc.jar WordCount*.class
Useful for me, thank you!
ReplyDelete
ReplyDeletetoo good piece of information, I had come to know about your site from my friend sajid, bangalore,i have read atleast 7 posts of yours by now, and let me tell you, your web-page gives the best and the most interesting information. This is just the kind of information that i had been looking for, i'm already your rss reader now and i would regularly watch out for the new post, once again hats off to you! Thanks a lot once again, Regards, ,servicenow training in hyderabad