BulkLoading data into HBase table using MapReduce

My previous post will give a high level architecture of different components used in HBase and its functioning. Here in this post I will discuss how to bulk load source data directly into HBase table using HBase bulkloading feature.

Apache HBase gives you random, real-time, read/write access to your Big Data, but how do you efficiently get the data loaded into HBase is more important. Intuitively, a newbie will try to do that using client APIs or by using a MapReduce job with TableOutputFormat, but those are not efficient depending upon individual use case. Instead, the HBase bulk loading feature is much easier to use and can insert the same amount of data more quickly.

Bulk Loading
If you come across any of these issues, bulk loading is the right choice for you:

  1. You need to tweak HBase MemStores.
  2. You need to increase WALs file size.
  3. Your compaction and flush queues are in the hundreds.
  4. Your inserts size in the MBs hence GC is out of control.

Bulk loading in HBase is the process of preparing HFiles and loading it directly into the region servers. Doing this we are bypassing HBase write path i.e. WAL doesn’t get written here.
The entire process of bulk loading can be broken down into three steps, I will walk you through the details of each step.

1. Extract the data from source, and load into HDFS.
If data is in Oracle, MySQL you need to fetch it using Sqoop or any such tools which gives mechanism to import data directly from a database into HDFS. If your raw files such as .txt, .pst, .xml are located in any servers then simply pull it and load into HDFS. HBase doesn’t prepare HFiles directly reading data from source.

2. Transform the data into HFiles via MapReduce job.
Here we write a MapReduce job which will process your data and create HFile. There will be only Mapper and no Reducer, Mapper will emit row key as the key and KeyValue as the value. We configure HFileOutputFormat.configureIncrementalLoad() in our code doing which HBase creates its own Reducer. At this stage, one HFile will be created per region in the output folder.

3. Load the files into HBase by telling the RegionServers where to find them.
We are almost done, just a small step to use LoadIncrementalHFiles giving it the HDFS path where HFile is located and HBase will load each HFile into region via RegionServer.

Points to be considered while using HBase BulkLoading

  1. We will miss write path of HBase, i.e. WAL doesn’t get created while doing bulk load.
  2. Replication won’t work, because in HBase replication work by reading WAL files. So to achieve replication while bulk loading we need to copy the HFile created by MapReduce job to other instances and do a manual processing.
  3. It is recommended to manually split your HBase table before a bulk load operation.
  4. You need double space in your machine, say if you want to process a 10GB xml file, HFile will be created of size 10GB. So in total your 20GB is used till you load the HFile into HBase, once data is loaded into table you can release space by deleting the source(raw) data.
  5. When we do an incremental bulk load, you might come across issue in major compactions. HBase 0.96.0 has proper fix for this. To read more on this issue visit HBASE-8521

I have written an application which reads XML files from HDFS and persists the data into HBase table using Bulk load. The whole project is available on Github.

To read XML file I have used Mahout’s XMLInputFormat class. Link for the source code is MahoutXmlInputFormat. You can download Mahout library available and add in your project.

As I am using distributed HBase cluster I have given Zookeeper quorum in HbaseConfiguration to connect HBase.

Sample xml file which I have used can be found in /test/resource/test.xml.

As explained earlier bulk load has its own reducer, HFileOutputFormat.configureIncrementalLoad is the method which configures the reducer and its output format, depending on mapper and table configurations. I will recommend looking the source code once so as to understand the complete mechanism. HBase uses its Reduce class, to ensure total ordering; replacing any reducer class that you may have previously set. The reducer class is dependent on the Mapper’s output value class that you have set. In my code I have used KeyValue, so the reducer class called by HBase is KeyValueSortReducer.

Below lines will import HFile from HDFS and load into HBase.

 
HFileOutputFormat.configureIncrementalLoad(job, htable); 
 
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 
loader.doBulkLoad(new Path(outputPath, htable); 

Below is the code for Mapper class.
HBaseMapper .java

 
package com.sreejithpillai.hbase.bulkload;
 
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
 
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class HBaseMapper extends
		Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
	final static byte[] COL_FAMILY = "bookFamily".getBytes();
 
	List<String> columnList = new ArrayList<String>();
	ParseXml parseXml = new ParseXml();
	ImmutableBytesWritable hKey = new ImmutableBytesWritable();
	KeyValue kv;
 
	protected void setup(Context context) throws IOException,
			InterruptedException {
		columnList.add("id");
		columnList.add("author");
		columnList.add("title");
		columnList.add("genre");
		columnList.add("price");
		columnList.add("publish_date");
		columnList.add("description");
	}
 
	/**
	 * Map method gets XML data from tag <book> to </book>. To read the xml content the data is sent to getXmlTags method
	 * which parse the XML using STAX parser and returns an String array of contents.
	 * String array is iterated and each elements are stored in KeyValue
	 * 
	 */
	public void map(LongWritable key, Text value, Context context)
			throws InterruptedException, IOException {
		String line = value.toString();
 
		String fields[] = parseXml.getXmlTags(line, columnList);
 
		hKey.set(fields[0].getBytes());
 
		if (!fields[1].equals("")) {
			kv = new KeyValue(hKey.get(), COL_FAMILY,
					HColumnEnum.COL_AUTHOR.getColumnName(),
					fields[1].getBytes());
			context.write(hKey, kv);
		}
 
		if (!fields[2].equals("")) {
			kv = new KeyValue(hKey.get(), COL_FAMILY,
					HColumnEnum.COL_TITLE.getColumnName(), fields[2].getBytes());
			context.write(hKey, kv);
		}
 
		if (!fields[3].equals("")) {
			kv = new KeyValue(hKey.get(), COL_FAMILY,
					HColumnEnum.COL_GENRE.getColumnName(), fields[3].getBytes());
			context.write(hKey, kv);
		}
		if (!fields[4].equals("")) {
			kv = new KeyValue(hKey.get(), COL_FAMILY,
					HColumnEnum.COL_PRICE.getColumnName(), fields[4].getBytes());
			context.write(hKey, kv);
		}
		if (!fields[5].equals("")) {
			kv = new KeyValue(hKey.get(), COL_FAMILY,
					HColumnEnum.COL_PUBLISHDATE.getColumnName(),
					fields[5].getBytes());
			context.write(hKey, kv);
		}
		if (!fields[6].equals("")) {
			kv = new KeyValue(hKey.get(), COL_FAMILY,
					HColumnEnum.COL_DESCRIPTION.getColumnName(),
					fields[6].getBytes());
 
			context.write(hKey, kv);
		}
	}
}

I have used STAX Parser to parse XML fetched by Mahout’s XmlInputFormat class.

ParseXml.java

public class ParseXml {
 
	public String[] getXmlTags(String line, List<String> columnList) {
		
		String[] fields = new String[7];
		
		try {
						
			XMLInputFactory factory = XMLInputFactory.newFactory();
			
			XMLStreamReader rawReader = factory.createXMLStreamReader(new ByteArrayInputStream(line.getBytes()));
			
			
			XMLStreamReader filteredReader = factory.createFilteredReader(rawReader,
					  new StreamFilter() {
					    public boolean accept(XMLStreamReader r) {
					      return !r.isWhiteSpace();
					    }
					  });
			String currentElement = "";
 
			while (filteredReader.hasNext()) {
				System.out.println("reader");
				int code = filteredReader.next();
				
				switch (code) {
				
				case XMLStreamReader.START_ELEMENT:
					currentElement = filteredReader.getLocalName();
					System.out.println("current element "+ currentElement);
					break;
					
				
				case XMLStreamReader.CHARACTERS:
					
					int k = 0;
					for (String xmlTag : columnList) {
						
						if (currentElement.equalsIgnoreCase(xmlTag)) {
							fields[k] = filteredReader.getText().trim();
						}
						k++;
					}
 
				}
			}
		} catch (XMLStreamException e) {
			e.printStackTrace();
		} catch (FactoryConfigurationError e) {
			e.printStackTrace();
		}
		return fields;	
	}
 
}

Below is Driver class
HBaseDriver .java

package com.sreejithpillai.hbase.bulkload;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class HBaseDriver {
 
	private static Logger LOG = LoggerFactory.getLogger(HBaseDriver.class);
	static Configuration hbaseconfiguration = null;
	static Configuration conf = new Configuration();
	static HBaseAdmin hbaseAdmin;
 
	public static void connectHBase() {
		LOG.info("Initializing Connection with Hbase");
		final String HBASE_CONFIG_ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
		final String HBASE_ZOOKEEPER_CLIENT_PORT = "2181";
		final String HBASE_CONFIG_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
		final String HBASE_ZOOKEEPER_SERVER = "1.2.3.4","172.56.12.35"; // Machine IP where
										ZooKeeper is installed
		conf.set(HBASE_CONFIG_ZOOKEEPER_CLIENT_PORT,
				HBASE_ZOOKEEPER_CLIENT_PORT);
 
		conf.set(HBASE_CONFIG_ZOOKEEPER_QUORUM, HBASE_ZOOKEEPER_SERVER);
 
		hbaseconfiguration = HBaseConfiguration.create(conf);
 
		try {
			hbaseAdmin = new HBaseAdmin(hbaseconfiguration);
			LOG.info("HBase connection successfull");
		} catch (MasterNotRunningException e) {
			LOG.error("HBase Master Exception " + e);
		} catch (ZooKeeperConnectionException e) {
			LOG.error("Zookeeper Exception " + e);
		}
	}
 
	/**
	 * Main entry point for the example.
	 * 
	 * @param args
	 *            arguments
	 * @throws Exception
	 *             when something goes wrong
	 */
	public static void main(String[] args) throws Exception {
		LOG.info("Code started");
		HBaseDriver.connectHBase(); // Initializing connection with HBase
		
		String inputPath=args[0];
		String outputPath=args[1];
		String tableName=args[2];
 
		conf.set("hbase.table.name", tableName);
		conf.set("xmlinput.start", "<book>");
		conf.set("xmlinput.end", "</book>");
 
		Job job = new Job(conf);
		job.setJarByClass(HBaseDriver.class);
		job.setJobName("Bulk Load XML into HBase");
 
		job.setInputFormatClass(XmlInputFormat.class);
 
		job.setMapperClass(HBaseMapper.class);
 
		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(KeyValue.class);
 
		job.setNumReduceTasks(0);
 
		HTable htable = new HTable(conf, tableName);
		HFileOutputFormat.configureIncrementalLoad(job, htable);
 
		FileInputFormat.addInputPath(job, new Path(inputPath));
		FileOutputFormat.setOutputPath(job, new Path(outputPath));
 
		job.waitForCompletion(true);
 
		// Importing the generated HFiles into a HBase table
		LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
		loader.doBulkLoad(new Path(outputPath, htable);
		LOG.info("Code ended");
	}
 
}

The whole project is available on Github.

Happy Coding !

Advertisements

13 comments

  1. Hi Sreejith

    I actually need this working with ASAP. But the problem I am getting that at the time of executions I am getting
    java.lang.NoSuchMethodError: org.apache.hadoop.hbase.HColumnDescriptor.getCompression()

    org.apache.hbase
    hbase
    0.94.2

    As I am unable to find 0.94.6 , which you mentioned in your pom file.

    I guess this is some kind of version issue. But I need to get on with it.

    Like

    1. The Api for HColumnDescriptor has changed in new version of HBase. The new API for defining table is HTableDescriptor table = New HTableDescriptor(TableName.valueOf(tablename))
      I tested this is HBase 0.98

      Thanks

      Like

  2. Thanks for replying.

    Can you just tell that what dependencies you included in your pom file to make this code working in HBase 0.98 ?

    Like

    1. I am using MapR distro. And for that i am using hbase client,hbase server, hbase core. It will vary if you are using CDH or HDP.

      Like

    2. Thodi mehnat khud bhi karlo.

      Like

  3. Hi Sreejith,

    I am new in hadoop so I have to run this type of code my question is why zookeeper instance require in driver class.

    Like

  4. narayana · · Reply

    haii Sreejith,

    thanks for giving the code.. i am learning hadoop. i got error at HColumnEnum can u plz give me the reason

    Like

    1. Please paste the error stack trace you got.

      Like

  5. Hi Sreejith,

    I have build the bulk load code but its taking so much time for 1mb 5 minutes.
    Can you please suggest me how to optimize it.
    My code is exactly a copy of your code.

    Like

  6. shashika · · Reply

    HI Sreejith,
    Can we do a bulk delete on a hbase table like this?

    Like

  7. Hi Sreejith,
    I am new to HBASE and found your articles very informative.just a small doubt. There is a command using IMPORTTSV to generate HFILEs. cant we use that instead of mapreduce program? what are the pros and cons?is there any situation where we should use a particular approach?
    Thanks,
    Vinitha

    Like

  8. HI Sreejith,
    I found your article. Since i just started learn to use HBase, I’m trying your code. I got this error:
    Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/MasterNotRunningException
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:278)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:214)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.MasterNotRunningException
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

    Can you tell me what’s wrong? I used the exact copy

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: