How to write MapReduce program in Java with example

Understanding fundamental of MapReduce
MapReduce is a framework designed for writing programs that process large volume of structured and unstructured data in parallel fashion across a cluster, in a reliable and fault-tolerant manner. MapReduce concept is simple to understand who are familiar with distributed processing framework.

MapReduce is a game all about Key-Value pair. I will try to explain key/value pairs by covering some similar concepts in the Java standard library. The java.util.Map interface is used for key-value in Java.
For any Java Map object, its contents are a set of mappings from a given key of a specified type to a related value of a potentially different type.

In the context of Hadoop, we are referring to keys that is associated with values. This data in MapReduce is stored in such a way that the values can be sorted and rearranged (Shuffle and sort wrt to MapReduce) across a set of keys. All data emitted in the flow of a MapReduce program is in the form of pairs. Thinking about data in key value pair, we can start thinking questions popping out such as:

  1. How it should be decided what will be the key and value?
  2. Does every problem statement can be solved through MapReduce?
  3. What are the values associated with each given key?

Some important features of key/value data will become apparent are:

  1. Keys must be unique.
  2. Each value must be associated with a key
  3. A key can have no values also.

Why key/value data?
Key-value data is the foundation of MapReduce paradigm which means much of the data is in key-value nature or we can represent it in such a way. In short we can say that data model to be applied for designing MapReduce program is Key-Value pair.

To achieve Hadoop’s power of parallel execution and divide and conquer we should bother how Hadoop executes and does parallelism for us. All we need to do is we give our problem to MapReduce framework so that the framework should take the pain of dividing the tasks into small chunks and distributing it in parallel to all machines of a cluster, and after completion of all tasks framework will merge the results and provide us the output.

Being a MapReduce programmer you shouldn’t be handling all the above steps in my program. Those will be taken care by MapReduce framework. You being a developer should only focus on how you should be expressing the problem statement into data model of MapReduce. With key-value interface, MapReduce provides abstraction to the programmer how Hadoop handles distributed and parallel processing.

MapReduce’s benefits are:

  • Simplicity: Programmers can write applications in any language such as Java, C++ or Python. Java is the most preferred language.
  • On the fly Scalability – We can add servers to increase processing power depending on our requirement and our MapReduce code remains untouched.
  • Speed: Parallel processing means that Hadoop can take problems that used to take days to solve and solve them in hours or minutes.
  • Minimal data motion: MapReduce moves compute processes to the data on HDFS and not the other way around. Processing tasks can occur on the physical node where the data resides. This significantly reduces the network I/O patterns and keeps most of the I/O on the local disk or within the same rack.
  • Focus only on business logic: Because MapReduce takes care of the pain around deployment, resource management, monitoring and scheduling, the user is free to focus on problem statement and how to transform the solution into MapReduce framework.
  • Handling failover: MapReduce takes care of failures. If a machine with one copy of the data is unavailable, another machine has a copy of the same key/value pair, which can be used to solve the same sub-task. The JobTracker keeps track of it all.

How to code a MapReduce program?
To start coding MapReduce we should have a problem statement, and I am fed up of seeing word count program everywhere when a beginner google it to learn MapReduce. Hence I have decided to take a different problem statement to demonstrate MapReduce.

Problem statement: I run a highly busy website and need to pull down my site for an hour in order to apply some patches and maintenance of backend severs, which means the website will be completely unavailable for an hour. To perform this activity the primary lookout will be that shutdown outage should be affected to least number of users. The games starts here: We need to identify at what hour of the day the web traffic is least for the website so that maintenance activity can be scheduled for that time.
There is an Apache web server log for each day which records the activities happening on website. But those are huge files up to 5 GB each.

Excerpt from Log file:
64.242.88.10 – – [07/Mar/2014:22:12:28 -0800] “GET /twiki/bin/attach/TWiki/WebSearch HTTP/1.1” 401 12846
64.242.88.10 – – [07/Mar/2014:22:15:57 -0800] “GET /mailman/listinfo/hs_rcafaculty HTTP/1.1” 200 6345

We are interested only in the date field i.e. [07/Mar/2014:22:12:28 -0800]
Solution: I need to consume log files of one month and run my MapReduce code which calculates the total number of hits for each hour of the day. Hour which has the least number of hits is perfect for the downtime. It is as simple as that!

A MapReduce program usually consists of the following 3 parts:

    1. Mapper
    2. Reducer
    3. Driver

As the name itself states Map and Reduce, the code is divided basically into two phases one is Map and second is Reduce. Both phase has an input and output as key-value pairs. Programmer has been given the liberty to choose the data model for the input and output for Map and Reduce both. Depending upon the business problem we need to use the appropriate data model.

What Mappers does?

  • The Map function reads the input files as key/value pairs, processes each, and generates zero or
    more output key/value pairs.
  • The Map class extends Mapper class which is a subclass of org.apache.hadoop.mapreduce.
  • java.lang.Object : org.apache.hadoop.mapreduce.Mapper
  • The input and output types of the map can be (and often are) different from each other.
  • If the application is doing a word count, the map function would break the line into words and
    output a key/value pair for each word. Each output pair would contain the word as the key and
    the number of instances of that word in the line as the value.
  • The Map function is also a good place to filter any unwanted fields/ data from input file, we
    take the data only we are interested to remove unnecessary workload.
  • I have used Hadoop 1.2.1 API, Java 1.7 to write this program.

    package com.sreejith.loganalyzer;
    
    import java.io.IOException;
    import java.text.ParseException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import com.sreejith.loganalyzer.ParseLog;
    
    public class LogMapper extends
       	 Mapper<LongWritable, Text, IntWritable, IntWritable> {
    
        private static Logger logger = LoggerFactory.getLogger(LogMapper.class);
        private IntWritable hour = new IntWritable();
        private final static IntWritable one = new IntWritable(1);
        private static Pattern logPattern = Pattern
       		 .compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\]"
       				 + " \"([^\"]*)\""
       				 + " ([^ ]*) ([^ ]*).*");
    
        public void map(LongWritable key, Text value, Context context)
       		 throws InterruptedException, IOException {
       	 logger.info("Mapper started");
       	 String line = ((Text) value).toString();
       	 Matcher matcher = logPattern.matcher(line);
       	 if (matcher.matches()) {
       		 String timestamp = matcher.group(4);
       		 try {
       			 hour.set(ParseLog.getHour(timestamp));
       		 } catch (ParseException e) {
       			 logger.warn("Exception", e);
       		 }
       		 context.write(hour, one);
       	 }
       	 logger.info("Mapper Completed");
        }
    }
    

    The Mapper code which is written above is written for processing single record from programmer’s point of view. We will never write logic in MapReduce to deal with entire data set. The framework is responsible to convert the code to process entire data set by converting into desired key value pair.

    The Mapper class has four parameters that specifies the input key, input value, output key, and output values of the Map function.

     Mapper<LongWritable, Text, IntWritable, IntWritable> 
     Mapper<Input key, Input value, Output key, and Output values> 
     Mapper<Offset of the input file, Single Line of the file, Hour of the day, Integer One> 

    Hadoop provides its own set of basic types that are optimized for network serialization which can be found in the org.apache.hadoop.io package.
    In my program I have used LongWritable, which corresponds to a Java Long, Text (like Java String), and IntWritable (like Java Integer). Mapper write their output using instance of Context class which is used to communicate in Hadoop.

    What Reducer does?

    1. The Reducer code reads the outputs generated by the different mappers as pairs and
      emits key value pairs.
    2. Reducer reduces a set of intermediate values which share a key to a smaller set of values.
    3. java.lang.Object : org.apache.hadoop.mapreduce.Reducer
    4. Reducer has 3 primary phases: shuffle, sort and reduce.
    5. Each reduce function processes the intermediate values for a particular key generated by the map
      function. There exists a one-one mapping between keys and reducers.
    6. Multiple reducers run in parallel, as they are independent of one another. The number of reducers
      for a job is decided by the programmer. By default, the number of reducers is 1.
    7. The output of the reduce task is typically written to the FileSystem via
      OutputCollector.collect(WritableComparable, Writable)
    package com.sreejith.loganalyzer;
    
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class LogReducer extends
       	 Reducer&lt;IntWritable, IntWritable, IntWritable, IntWritable&gt; {
    
        private static Logger logger = LoggerFactory.getLogger(LogReducer.class);
    
        public void reduce(IntWritable key, Iterable&lt;IntWritable&gt; values,
       		 Context context) throws IOException, InterruptedException {
    
       	 logger.info("Reducer started");
       	 int sum = 0;
       	 for (IntWritable value : values) {
       		 sum = sum + value.get();
       	 }
       	 context.write(key, new IntWritable(sum));
       	 logger.info("Reducer completed");
    
        }
    }
    

    Four parameters are used in Reducers to specify input and output, which define the types of the input and output key/value pairs. Output of the map task will be input to reduce task. First two parameter are the input key value pair from map task. In our example IntWritable, IntWritable

    Reducer<IntWritable, IntWritable, IntWritable, IntWritable> 
    Reducer<Input key, Input value, Output key, and Output values> 
    Reducer<Hour of the day, List of counts, Hour, Total Count for the Hour>; 

    What Driver does?

    Driver class is responsible to execute the MapReduce framework. Job object allows you to configure the Mapper, Reducer, InputFormat, OutputFormat etc.

    package com.sreejith.loganalyzer;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class LogDriver {
    
        private static Logger logger = LoggerFactory.getLogger(LogDriver.class);
    
        public static void main(String[] args) throws Exception {   	 
       	 logger.info("Code started");
    
       	 Job job = new Job();
       	 job.setJarByClass(LogDriver.class);
       	 job.setJobName("Log Analyzer");
    
       	 job.setMapperClass(LogMapper.class);
       	 job.setReducerClass(LogReducer.class);
    
       	 job.setOutputKeyClass(IntWritable.class);
       	 job.setOutputValueClass(IntWritable.class);
    
       	 FileInputFormat.addInputPath(job, new Path(args[0]));
       	 FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
       	 job.waitForCompletion(true);
       	 logger.info("Code ended");
        }
    
    }
    

    Job control is performed through the Job class in the new API, rather than the old
    JobClient, which no longer exists in the new API.

    Output:
    MapReduceOutput

16 comments

  1. Vallinayagam · · Reply

    Indeed a practical exhibition !!

    Like

  2. Thanks. Makes sense….

    Like

  3. Reblogged this on Big Data World and commented:
    MapReduce program other than WordCount

    Liked by 1 person

  4. Saurabh · · Reply

    Thanks for explaining different scenario .

    Like

  5. Great article.

    Howover, Could you please proivde the internal flow.
    Means, When we call job.waitForCompletion(true); method who will come into the picture to take action first, who will generate key value pair and send to Mapper phase. How the process will be done from Map –> short and shuffle pahse –> Reducer.

    Thanks in advance.

    Like

  6. Awesome article! Helped me a lot. Thanks.

    Like

  7. MapReduce is a method for distributing a task across multiple nodes, Each node processes data stored on that node Where possible. Consists of two phases : 1. Map, 2. Reduce

    Features of MapReduce
    1. Automatic parallelization and distribution
    2. Fault-tolerance
    3. Status and monitoring tools
    4. A clean abstraction for programmers – MapReduce programs are usually written in Java
    5. MapReduce abstracts all the ‘housekeeping’ away from the developer – Developer can concentrate simply on writing the Map and Reduce functions

    Sir my concept of MapReduce is clear but in programming i face problem, how to overcome this, Any Book Suggestion ?

    Like

  8. I am unable see the complete explanation of a particular topic. I have opend Mapreduce program other than word count. complete page is not loading only half explanation is displayed!!!

    Checked my internet connectivity it is quite faster. Many of your topics are not loading has the same problem

    Like

  9. Hi Sreejith,

    I have confirmed the subscription mail. Still unable to load the complete chapter.

    For few of the lessons. Kindly help.

    Like

  10. Really good explanation of how to think the problem to solution in terms of key and value pair 🙂

    Like

  11. […] am using log analyzer example which was explained here in my blog post using Map Reduce. The input and output in both examples are same, only change is Scala instead of […]

    Like

  12. Amazin post and very clear thanks my friend nice job!!

    Like

  13. NICE AND SIMPLE JUSTIFICATION…. still have doubt in ur program

    Sir.. Reducer has calculated hour wise frequency… but our target was to find an HOUR with MINIMUM TRAFFIC.. i could not visualise further.. Please enlighten the issue

    Like

  14. Troy Horton · · Reply

    This was great! Thanks!

    Like

Leave a reply to Sushant Cancel reply