Converting Hive query (Joining multiple tables) into MapReduce using Job Chaining

It’s been a while since I last time blogged. I am writing this post which gives you an idea how to convert a hive query which joins multiple tables into a MapReduce job.
You might be wondering why I should ever think of writing a MapReduce query when Hive does it for me ? You are correct – Any query which you fires in Hive is converted into MapReduce internally by Hive thus hiding the complexity of MapReduce job for user comfort. But their might come a requirement where Hive query performance is not upto the mark or you need some extra data to be calculated internally which should be a part of output then writing a MapReduce job is best alternative.

Joins are possibly one of the most complex operations one can execute in MapReduce. By design, MapReduce is very good at processing large data sets by looking at every record or group in isolation, so joining two very large data sets together does not fit into the paradigm gracefully.

A join is an operation that combines records from two or more data sets based on a field or set of fields, known as the foreign key. The foreign key is the field in a relational table that matches the column of another table, and is used as a means to cross-reference between tables.

What Reduce side join performs :
Map

  1. Mapper starts the join operation by reading different input files and outputs all records to Reducer.
  2. Tag each records for identifying from which source the record has arrived.
  3. Key of the map output has to be the join key.

Reducer

  1. Reducer will get shuffled data from all files with common key.
  2. Combines the record for both depending upon tag attribute.

Problem statement : Find total amount purchased along with number of transaction for each customer.

Corresponding Hive query is :
select c.cust_fname ,sum(t.purchase_amt) ,count(*) from customer c
inner join purchases p
on c.cust_id=p.cid
inner join transaction t
on p.purchase_id=t.purchase_id
group by c.cust_id,c.cust_fname;

The example will be dealing with three tables Customer, Purchases and Transaction.

Here I will demonstrate joining three tables and group by on particular column using MapReduce in Java. We will be using Reduce side join because all three of my datasets are huge and are related to each other by some keys(foreign keys)

Below tables are Hive external tables, so in our MapReduce job we can point the location of these source directly into our MapReduce code

Customer table will have unique customer ID along with other details of Customer.
***********Customer table***********
cust_id|cust_fname|cust_lname|location
867230|William|smith|New York
973239|Alex|bard|Canada
124847|Michael|george|Washington

Purchases will have unique purchase Id for each purchase. There will be multiple purchases for each customer.
***********Purchases table***********
purchase_id|cid|store
23|973239|Wallmart
99|234958|DStore
25|973239|Oasis
66|973239|Wallmart
33|124847|Pearson
83|973239|Trad|
72|124847|Wallmart
54|038403|Suz

Transaction table will have unique transaction Id for each transaction along with purchase amount for each transaction
***********Transaction table***********
purchase_id|transa_date|purchase_amt
23|2015-01-23|23434
99|2015-01-12|89734
25|2014-03-28|36495
66|2015-05-20|76577
33|2015-03-17|9736
83|2015-01-10|32873
72|2015-01-04|453822
54|2014-02-13|3290843

Solution:
We will first do a simple Cross join on Customer and Purchases table using customer id from both tables and prepare a file where we have data of both in a single file. Below is the output

124847|michael|33
124847|michael|72
973239|alex|23
973239|alex|25
973239|alex|66
973239|alex|83

Now we will combine the above output file and Transaction file using key purchase Id which is common and prepare a second output file.

973239|alex|23|23434
973239|alex|25|36495
124847|michael|33|9736
973239|alex|66|76577
124847|michael|72|453822
973239|alex|83|32873

Now the file which we have contains the customer data as well as purchase amount of their transaction. Further we will pass the above output file to final MapReduce job and count the total amount of purchase for each customer and total number of transaction.

alex|169379|4
michael|463558|2

Summary of Java MapReduce code :
1. Here I have used Job Chaining which combines all three MapReduce jobs in one Driver. FinalDriver is the main class.
2. There are three mapper class for each input feed.
3. Delimiter used in input feeds is pipe (|) and tilde(~) delimiter is used to differentiate each columns in values.
4. Build the JAR using maven or Ant and prepare HiveMRJoin.jar and execute the below hadoop command.
5. Code is available on Github

 
hadoop jar HiveMRJoin.jar /sreejith/hive-loc/customer/data/Customer /sreejith/hive-loc/purchases/data/Purchases /sreejith/hive-loc/transaction/data/Transaction /sreejith/prac/output1 /sreejith/prac/output2 /sreejith/prac/FinalOutput

Following are the arguments
1. First argument- Input File of Customer
2. Second argument- Input File of Purchases
3. Third argument- Input File of Transaction
4. Fourth argument- Intermediate Output directory 1
5. Fifth argument- Intermediate Output directory 2
6. Third argument Final Output Result.

You can also find the whole project on Github

Below is the code :

Happy Coding
Cheers !

Advertisements

2 comments

  1. Rajasekhar · · Reply

    Very Nice one , will try it once. Thank you in Advance.

    Like

  2. Ankush Maheshwari · · Reply

    Hi Sreejith,
    I have few questions.
    Can we reduce the number of mapper in above code ?
    Does count of mappers drive the performance of code?
    I implemented the above reducer in other.
    Does it process the data slowly?

    package github.com;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class MyDriver {

    public static void main(String[] args) {
    // TODO Auto-generated method stub

    Configuration conf = new Configuration();

    Job job;
    try {
    job = new Job(conf,”join”);

    job.setJarByClass(MyDriver .class);

    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setMapperClass(ConsumerMapper.class);
    job.setMapperClass(PurchaseMapper.class);
    job.setReducerClass(MyReducer.class);

    MultipleInputs.addInputPath(job, new Path(“/home/cloudera/ankush/GITHUB/custdata.txt”), TextInputFormat.class, ConsumerMapper.class);
    MultipleInputs.addInputPath(job, new Path(“/home/cloudera/ankush/GITHUB/purchasedata.txt”), TextInputFormat.class, PurchaseMapper.class);

    try {
    FileOutputFormat.setOutputPath(job, new Path(“/home/cloudera/ankush/GITHUB/output”));
    } catch (IllegalArgumentException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }

    try {
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    } catch (ClassNotFoundException | InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    } catch (IOException e1) {
    // TODO Auto-generated catch block
    e1.printStackTrace();
    }

    }

    }

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: