Minimal HBase MapReduce Example for 0.19 and 0.20

HBase includes an example for populating a table from Hadoop map reduce, but it seemed overly complicated. I’m getting started with HBase and this was my starting point. This first one uses the old Hadoop API with everything in the mapred package, not mapreduce. It also uses the corresponding API from HBase which is now deprecated.

public class PopulateArticlesTable extends Configured
  implements Tool {
  public static class Map extends MapReduceBase
    implements
    Mapper<LongWritable, Text, ImmutableBytesWritable, BatchUpdate> {
    private ImmutableBytesWritable outKey = new ImmutableBytesWritable();

    @Override
    public void map(
      LongWritable offset,
      Text input,
      OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
      Reporter report) throws IOException {
      // whatever format your data is in
      RichArticle art = new RichArticle(input.toString());
      // a good HBase row key, consisting of a timestamp and a unique identifier to prevent collisions. All keys are byte arrays.
      byte[] rowId = art.getRowId();
      outKey.set(rowId);
      // We execute one update for each object we encounter, that update may be composed of multiple operations, in this case, two puts
      BatchUpdate update = new BatchUpdate(rowId);
      if (art.getTitle() != null)
        update.put("content:title", Bytes
          .toBytes(art.getTitle()));
      if (art.getBody() != null)
        update.put("content:body", Bytes
          .toBytes(art.getBody()));
      output.collect(outKey, update);
    }
  }

  @Override
  public int run(String[] args) throws Exception {
    // Standard boilerplate for creating and running a hadoop job
    JobConf job = new JobConf(getConf(), this.getClass());
    String input = args[0];
    job.setJobName("Populate articles table from " + input);
    // Input is just text files in HDFS
    TextInputFormat.setInputPaths(job, new Path(input));
    job.setMapperClass(Map.class);
    job.setNumReduceTasks(0);
    // Output is to the table output format, and we set the table we want
    job.setOutputFormat(TableOutputFormat.class);
    job.set(TableOutputFormat.OUTPUT_TABLE, "articles");
    JobClient.runJob(job);
    return 0;
  }

  public static void main(String args[]) throws Exception {
    int res = ToolRunner.run(new Configuration(),
      new PopulateArticlesTable(), args);
    System.exit(res);
  }
}

Next up I’ve converted everything to use the 0.20 APIs. You’ll see that it got shortened too, as I use GenericOptionsParser directly instead of implementing Tool.

For the Hadoop changes, OutputCollector is no more, it has been replaced by Context. The Mapper interface and MapReduceBase have been merged into the Mapper class, which is intended to be extended (without extending it, Mapper is the IdentityMapper). In Job control, you no longer use JobClient.run, instead calling waitForCompletion on the Job. The configuration has been cleaned up, as you’ll notice I have to create a configuration prior to the Job. One big item is that you need to manually setJarByClass, which was previously taken care of by creating the JobConf with the class as a parameter. Job.setOutputFormat has changed its name to setOutputFormatClass.

I’m new to HBase, so I’m not as sure about whether I’ve done things in the recommended way. The important things to note are that you need to set the table name in the conf before creating the job, and Puts and Deletes are Hadoop Writables.

public class PopulateArticlesTable {
  public static class Map extends
    Mapper<LongWritable, Text, NullWritable, Writable> {

    @Override
    protected void map(LongWritable offset, Text input, Context context) throws IOException, InterruptedException {
      // my input is in JSON format, in other applications, you might be splitting a line of text or any of Hadoop's writable formats
      RichArticle art = new RichArticle(input.toString());
      // RichArticles are able to output a good HBase row key, consisting of a timestamp and a unique identifier to prevent collisions. All keys in HBase are byte arrays.
      byte[] rowId = art.getRowId();
      // We output multiple operations for each row
      if (art.getTitle() != null) {
        Put put = new Put(rowId);
        put.add(Bytes.toBytes("content"), Bytes.toBytes("title"), Bytes.toBytes(art.getTitle()));
        context.write(NullWritable.get(), put);
      }
      if (art.getBody() != null) {
        Put put = new Put(rowId);
        put.add(Bytes.toBytes("content"), Bytes.toBytes("body"), Bytes.toBytes(art.getBody()));
        context.write(NullWritable.get(), put);
      }
    }
  }

  public static void main(String args[]) throws Exception {
    Configuration conf = new Configuration();
    conf.set(TableOutputFormat.OUTPUT_TABLE, "articles");
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    String input = otherArgs[0];
    Job job = new Job(conf, "Populate Articles Table with " + input);
    // Input is just text files in HDFS
    FileInputFormat.addInputPath(job, new Path(input));
    job.setJarByClass(PopulateArticlesTable.class);
    job.setMapperClass(Map.class);
    job.setNumReduceTasks(0);
    // Output is to the table output format, and we set the table we want
    job.setOutputFormatClass(TableOutputFormat.class);
    job.waitForCompletion(true);
  }
}

Hopefully the before and after for the APIs is helpful. These have both been tested and work on my system. You’ll need to modify them slightly, of course, unless you happen to have a data object called RichArticle that has a string serialization.

Update: I should probably point you at the official documentation as well.