Excel InputFormat for Hadoop MapReduce

Excel Spreadsheet Input Format for Hadoop Map Reduce
I want to read a Microsoft Excel spreadsheet using Map Reduce, and found that I cannot use Text Input format of Hadoop to fulfill my requirement. Hadoop does not understand Excel spreadsheet so I landed upon writing custom Input format to achieve the same.
Hadoop works with different types of data formats like flat text files to databases. An InputSplit is nothing more than a chunk of several blocks; it should be pretty rare to get a block boundary ending up at the exact location of a end of line (EOL). Each such split is processed by a single map. Some of my records located around block boundaries should be therefore split in 2 different blocks.

  1. How Hadoop can guarantee all lines from input files are completely read?
  2. How Hadoop can consolidate a line that is starting on block B and that ends up on B+1?
  3. How Hadoop can guarantee we do not miss any line?
  4. Is there a limitation in term of line’s size? Can a line be greater than a block (i.e. spanned over more than 2 blocks)? If so, is there any performance issue?

Let’s take an example to get the concept more clear: Suppose my data set is composed on a single 300Mb file, spanned over 3 different blocks (blocks of 128Mb), and suppose that I have been able to get 1 InputSplit for each block. Let’s imagine now 3 different scenarios

Hadoop MapReduce Input split

Hadoop MapReduce Input split

File is composed on 6 lines of 50Mb each

• The first Reader will start reading bytes from Block B1, position 0. The first two EOL will be met at respectively 50Mb and 100Mb. 2 lines (L1 & L2) will be read and sent as key / value pairs to Mapper 1 instance. Then, starting from byte 100Mb, we will reach end of our Split (128Mb) before having found the third EOL. This incomplete line will be completed by reading the bytes in Block B2 until position 150Mb. First part of Line L3 will be read locally from Block B1, second part will be read remotely from Block B2 (by the mean of FSDataInputStream), and a complete record will be finally sent as key / value to Mapper 1.

• The second Reader starts on Block B2, at position 128Mb. Because 128Mb is not the start of a file, there is strong chance our pointer is located somewhere in an existing record that has been already processed by previous Reader. We need to skip this record by jumping out to the next available EOL, found at position 150Mb. Actual start of RecordReader 2 will be at 150Mb instead of 128Mb.

We can wonder what happens in case a block starts exactly on an EOL. By jumping out until the next available record (through readLine method), we might miss 1 record. Before jumping to next EOL, we actually need to decrement initial “start” value to “start – 1″. Being located at least 1 offset before EOL; we ensure no record is skipped.

Maximum size for a single record
There is a maximum size allowed for a single record to be processed. This value can be set using mapred.linerecordreader.maxlength parameter.

You can also find the whole project on Github

ExcelParser.java


package com.sreejithpillai.excel.parser;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.poi.hssf.usermodel.HSSFSheet;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
public class ExcelParser {
private static final Log LOG = LogFactory.getLog(ExcelParser.class);
private StringBuilder currentString = null;
private long bytesRead = 0;
public String parseExcelData(InputStream is) {
try {
HSSFWorkbook workbook = new HSSFWorkbook(is);
// Taking first sheet from the workbook
HSSFSheet sheet = workbook.getSheetAt(0);
// Iterate through each rows from first sheet
Iterator<Row> rowIterator = sheet.iterator();
currentString = new StringBuilder();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
// For each row, iterate through each columns
Iterator<Cell> cellIterator = row.cellIterator();
while (cellIterator.hasNext()) {
Cell cell = cellIterator.next();
switch (cell.getCellType()) {
case Cell.CELL_TYPE_BOOLEAN:
bytesRead++;
currentString.append(cell.getBooleanCellValue() + "\t");
break;
case Cell.CELL_TYPE_NUMERIC:
bytesRead++;
currentString.append(cell.getNumericCellValue() + "\t");
break;
case Cell.CELL_TYPE_STRING:
bytesRead++;
currentString.append(cell.getStringCellValue() + "\t");
break;
}
}
currentString.append("\n");
}
is.close();
} catch (IOException e) {
LOG.error("IO Exception : File not found " + e);
}
return currentString.toString();
}
public long getBytesRead() {
return bytesRead;
}
}

view raw

ExcelParser

hosted with ❤ by GitHub

ExcelInputFormat.java


package com.sreejithpillai.excel.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* <p>
* An {@link org.apache.hadoop.mapreduce.InputFormat} for excel spread sheet files.
* Multiple sheets are supported
* <p/>
* Keys are the position in the file, and values are the row containing all columns for the
* particular row.
*/
public class ExcelInputFormat extends FileInputFormat<LongWritable,Text>{
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new ExcelRecordReader();
}
}

ExcelRecordReader.java


package com.sreejithpillai.excel.mapreduce;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import com.sreejithpillai.excel.parser.ExcelParser;
/**
* Reads excel spread sheet , where keys are offset in file and value is the row
* containing all column as a string.
*/
public class ExcelRecordReader extends RecordReader<LongWritable, Text> {
private LongWritable key;
private Text value;
private InputStream is;
private String[] strArrayofLines;
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
final Path file = split.getPath();
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
is = fileIn;
String line = new ExcelParser().parseExcelData(is);
this.strArrayofLines = line.split("\n");
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key = new LongWritable(0);
value = new Text(strArrayofLines[0]);
} else {
if (key.get() < (this.strArrayofLines.length – 1)) {
long pos = (int) key.get();
key.set(pos + 1);
value.set(this.strArrayofLines[(int) (pos + 1)]);
pos++;
} else {
return false;
}
}
if (key == null || value == null) {
return false;
} else {
return true;
}
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
if (is != null) {
is.close();
}
}
}

ExcelMapper.java


package com.sreejithpillai.excel.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExcelMapper extends
Mapper<LongWritable, Text, Text, Text> {
private static Logger LOG = LoggerFactory.getLogger(ExcelMapper.class);
/**
* Excel Spreadsheet is supplied in string form to the mapper.
* We are simply emitting them for viewing on HDFS.
*/
public void map(LongWritable key, Text value, Context context)
throws InterruptedException, IOException {
String line = value.toString();
context.write(new Text(line), null);
LOG.info("Map processing finished");
}
}

view raw

ExcelMapper

hosted with ❤ by GitHub

ExcelDriver.java


package com.sreejithpillai.excel.mapreduce;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExcelDriver {
private static Logger logger = LoggerFactory.getLogger(ExcelDriver.class);
/**
* Main entry point for the example.
*
* @param args arguments
* @throws Exception when something goes wrong
*/
public static void main(String[] args) throws Exception {
logger.info("Driver started");
Job job = new Job();
job.setJarByClass(ExcelDriver.class);
job.setJobName("Excel Record Reader");
job.setMapperClass(ExcelMapper.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(ExcelInputFormat.class);
job.waitForCompletion(true);
}
}

view raw

ExcelDriver

hosted with ❤ by GitHub

I have implemented only Mapper class and no Reducer because we just need to see if Mappers are able to read each line from excel sheet. Folks who want to further process these data can implement those on Reducers.

To use this custom class you need to just specify the following line

job.setInputFormatClass(ExcelInputFormat.class); 

Happy Coding
Cheers !

30 comments

  1. Reblogged this on Big Data World and commented:
    EXCEL INPUTFORMAT FOR HADOOP MAPREDUCE

    Liked by 1 person

  2. HAI AM LOOKING OUT FOR IMPLEMENTATION OF Improving Data Transfer Rate and Throughput of HDFS using Efficient Replica Placement.Its a IEEE paper.Can any one help me out with this!!!!!

    Like

  3. Great explanation of splits & EOL!! Thanks!

    Like

  4. First of all thanks for explaining it was quite helpful. Next I did notice the below when I was playing around with the code. Can you pls comment on this ?

    1. I understand there is a limitation on the heap space when I use HSSFWorkbook for reading an XLS. When I tried with 11mb(1 sheet) data it worked like a charm but failed with 22mb(2 sheets) throws the below error : Java.lang.OutOfMemoryError: Java heap space
    So am wondering with the above constraint can I ever read a huge XLS in hadoop ?

    2.Am quite not the above code addresses reading of data from splits in ExcelRecordReader.java
    Lets say my block size is 64mb and I have a XLS file with 88mb stored in HDFS. Now as you see there are 2 blocks which constitutes my complete data. Sheet1 data will spill over block1 and extend into block2. Since EOL is a not delimiter here in XLS file how will the split be handled to ensure all of the data is read and set to mapper. Can you pls explain this ?

    Like

  5. Praveen · · Reply

    Hi Sreejith,

    Thanks for the blog, which is very useful….

    This is reading only a single sheet in the Excel file, I am having around 30+ sheets in a single file. Can you please suggest me how to read the complete 30 sheets…..

    Thanks,
    Praveen.

    Like

    1. Harshavardhan · · Reply

      In ExcelParser class, inside parseExcelData() method, the below snippet is used.
      HSSFSheet sheet = workbook.getSheetAt(0);
      If you want to read other sheets, just loop the above snippet,
      int numOfSheets = n;
      for(int i:n){
      HSSFSheet sheet = workbook.getSheetAt(i);
      {

      Hope, it helps.

      Peace
      Harshavardhan

      Like

  6. Aditya · · Reply

    Hi Sree,
    Can we have a demo on how the file can be uploaded using Hive and Pig.

    Like

    1. Hi Aditya,
      can you elaborate a little more.

      Like

  7. BalaKrishna Reddy · · Reply

    When running jar in hadoop i am getting 0rg.apache.poi.hssf.usermodel.HSSFWorkbook Class not found exception. What’s the cause for this.

    Like

    1. Hi Balkrishna,
      Please put the dependency for poi in your project and then re compile the jar.

      Like

  8. balakrishna · · Reply

    I compiled successfully, when running i am getting by the way i am using poi-3.9.jar. Shall we add these third party libraries in hadoop

    Like

    1. Added poi 3.14 and compiled the project but getting same error. Hssfworkbook class not found exception when running on Hadoop. What is causing this?

      Like

  9. Very good example.

    I also want to use multiple output file format to generate multiple files as per condition apply.

    Like

  10. I was running the code but got an exception —

    FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
    at com.sreejithpillai.excel.mapreduce.ExcelRecordReader.initialize(ExcelRecordReader.java:49)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:524)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:762)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

    Liked by 1 person

  11. Hi, Have u solved the issue I also got same error, Can you please share the solution?

    java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
    at com.ncv.hdp.excel.ExcelRecordReader

    at This line :

    Configuration job = context.getConfiguration();

    Liked by 1 person

  12. Did anybody solve the problem faced by Shashank Kulkarni and Nithin CV ?
    I am also facing the same issue.

    I tried resolving that by compiling the jar file against Hadoop 2.6.0 libraries, but still no luck.

    THanks in advance.

    Like

  13. Hi Guys,

    java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
    at com.sreejithpillai.excel.mapreduce.ExcelRecordReader.initialize(ExcelRecordReader.java:49)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:524)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:762)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

    Above error is finally resolved.
    I have tested it on Hadoop 2.5.1

    We need to change entries in our pom.xml file as follows.

    Remove hadoop-core dependency that you get from github after downloading this project and replace it with following four dependencies for compiling it on Hadoop 2.5.1

    org.apache.hadoop
    hadoop-client
    2.5.1

    org.apache.hadoop
    hadoop-common
    2.5.1

    org.apache.hadoop
    hadoop-mapreduce-client-core
    2.5.1

    org.apache.hadoop
    hadoop-hdfs
    2.5.1

    Please revert back if you face any difficulty.

    Thanks,
    Milind

    Liked by 1 person

  14. thanks Milind Sir finlly it run, when we change pom.xml,

    Like

  15. Thanks @abhi11621 for verifying

    Like

  16. […] Entire source code has been taken from this link. […]

    Like

  17. krishna · · Reply

    hello..thank you for this program ..i successfully created jar file but now t don’t know which input file i have to give in hdfs and which kind of out put it generated

    Like

  18. Please clone the project from git (https://github.com/sreejithpillai/ExcelRecordReaderMapReduce). There you can find the resources which have test data.

    Thanks,
    Sreejith

    Like

  19. krishna · · Reply

    sorry..for again asking .. sir i con’t understand this code and which kind of output or input display please reply me fast it’s emrgeny

    Like

  20. Hi, Thank you very much for the Post. it is really amazing.

    I have used the code (ExcelParser.java) and first testing in local machine how it is handling the file. then i can push to HDFS.

    There is an observation, if we have blank cells then it seems it is not recognizing and then we will end up with wrong ordering of values in output.

    So, can you help on this.

    Like

  21. Gowtham · · Reply

    Hi Sreejith, I have a problem while reading the Excel file(.xlsx).

    When I get the FSDataInputStream from split and assign it to Inputstream. But inputStream.available() returns which shows there are no bytes to read before blocking.

    Can you help me to convert FSDataInputStream to inputStream?

    Like

  22. Gowtham · · Reply

    Hi Sreejith,

    My map method is not getting called !! Can you help me with that ?

    I have used setup and cleanup method. It is getting called but not the map method.

    Thanks,
    Gowtham G

    Like

  23. Error: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
    i am getting is error can u say the steps to slove

    Liked by 1 person

  24. Hi Rahul,
    Please find my comment above which answers this question.

    Hope this helps.

    Thanks,
    Milind
    http://www.milindjagre.co

    Like

  25. Can anyone please, explain the workflow?

    Like

Leave a comment