It’s been a while since I last time blogged. I am writing this post which gives you an idea how to convert a hive query which joins multiple tables into a MapReduce job.
You might be wondering why I should ever think of writing a MapReduce query when Hive does it for me ? You are correct – Any query which you fires in Hive is converted into MapReduce internally by Hive thus hiding the complexity of MapReduce job for user comfort. But their might come a requirement where Hive query performance is not upto the mark or you need some extra data to be calculated internally which should be a part of output then writing a MapReduce job is best alternative.
Joins are possibly one of the most complex operations one can execute in MapReduce. By design, MapReduce is very good at processing large data sets by looking at every record or group in isolation, so joining two very large data sets together does not fit into the paradigm gracefully.
A join is an operation that combines records from two or more data sets based on a field or set of fields, known as the foreign key. The foreign key is the field in a relational table that matches the column of another table, and is used as a means to cross-reference between tables.
What Reduce side join performs :
- Mapper starts the join operation by reading different input files and outputs all records to Reducer.
- Tag each records for identifying from which source the record has arrived.
- Key of the map output has to be the join key.
- Reducer will get shuffled data from all files with common key.
- Combines the record for both depending upon tag attribute.
Problem statement : Find total amount purchased along with number of transaction for each customer.
Corresponding Hive query is :
select c.cust_fname ,sum(t.purchase_amt) ,count(*) from customer c
inner join purchases p
inner join transaction t
group by c.cust_id,c.cust_fname;
The example will be dealing with three tables Customer, Purchases and Transaction.
Here I will demonstrate joining three tables and group by on particular column using MapReduce in Java. We will be using Reduce side join because all three of my datasets are huge and are related to each other by some keys(foreign keys)
Below tables are Hive external tables, so in our MapReduce job we can point the location of these source directly into our MapReduce code
Customer table will have unique customer ID along with other details of Customer.
Purchases will have unique purchase Id for each purchase. There will be multiple purchases for each customer.
Transaction table will have unique transaction Id for each transaction along with purchase amount for each transaction
We will first do a simple Cross join on Customer and Purchases table using customer id from both tables and prepare a file where we have data of both in a single file. Below is the output
Now we will combine the above output file and Transaction file using key purchase Id which is common and prepare a second output file.
Now the file which we have contains the customer data as well as purchase amount of their transaction. Further we will pass the above output file to final MapReduce job and count the total amount of purchase for each customer and total number of transaction.
Summary of Java MapReduce code :
1. Here I have used Job Chaining which combines all three MapReduce jobs in one Driver. FinalDriver is the main class.
2. There are three mapper class for each input feed.
3. Delimiter used in input feeds is pipe (|) and tilde(~) delimiter is used to differentiate each columns in values.
4. Build the JAR using maven or Ant and prepare HiveMRJoin.jar and execute the below hadoop command.
5. Code is available on Github
hadoop jar HiveMRJoin.jar /sreejith/hive-loc/customer/data/Customer /sreejith/hive-loc/purchases/data/Purchases /sreejith/hive-loc/transaction/data/Transaction /sreejith/prac/output1 /sreejith/prac/output2 /sreejith/prac/FinalOutput
Following are the arguments
1. First argument- Input File of Customer
2. Second argument- Input File of Purchases
3. Third argument- Input File of Transaction
4. Fourth argument- Intermediate Output directory 1
5. Fifth argument- Intermediate Output directory 2
6. Third argument Final Output Result.
You can also find the whole project on Github
Below is the code :