September 2009 Archives

Using HBase TableIndexed from Thrift with unique keys

| No Comments | No TrackBacks

HBase is primarily a sorted distributed hash map, but it does support secondary keys through a contrib package called Transactional HBase. The secondary keys are provided by a component called TableIndexed. A good general walkthrough is Secondary indexes in HBase on Rajeev Sharma's blog. My blog post specifically addresses how to use secondary indexes outside of the Java API and how to handle unique keys.

Our scenario is this: my articles table will use a row key that starts with the timestamp. This is a very common scenario in HBase because it is usually the most natural way to access information. I also have on at least some articles, a secondary key that I want to be able to do lookups by. Let's have a concrete example for further discussion.

row key  key:id  content:title
  1234     abc     ...

These keys are always unique, this is guaranteed at the application level. I would like to be able to fetch an article using the secondary key. All pretty straightforward, except I want to do this outside of Java using just Thrift, and I would prefer to do it using only get() because that makes it easier to write and easier for anyone coming along to read later. Scanners may be powerful, but it's not intuitive that a get using a unique secondary index would use them.

The first thing we need is the index specification that can create the unique key that we want. By default, HBase provides a SimpleIndexKeyGenerator that creates row keys that start with the value of the secondary key, in our example, they would be abc1234. This supports having multiple rows that match the secondary key, for example, abc5678, but means that you have to use a scanner to get at the information.

I've written a UniqueIndexKeyGenerator that would use the exact value from the primary table's secondary key as the row key for the index table. That is, for our example, the row key in "articles-index" would be "abc". I've also added a forUniqueIndex static method to IndexSpecification to make it easier to call this from the jruby shell.

  /**Construct an index spec for a single column that has only unique values.
   * @param indexId the name of the index
   * @param indexedColumn the column to index
   * @return the IndexSpecification
   */
  public static IndexSpecification forUniqueIndex(String indexId, byte[] indexedColumn) {
    return new IndexSpecification(indexId, new byte[][] { indexedColumn },
        null, new UniqueIndexKeyGenerator(indexedColumn));
  }

I then add a create_index method directly in the shell. The shell is flexible enough that you have access to things like @configuration which are used by the all the other methods. Also, I was unsure whether I could do an import inside a method, but it worked just fine. This is there with the idea that it could eventually be added to the shell and if you tried to call it, would fail with a class not found if you didn't have the transactional jar in your classpath.

def create_index(table_name, index_name, column)
  import org.apache.hadoop.hbase.client.tableindexed.IndexedTableAdmin
  import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification
  @iadmin ||= IndexedTableAdmin.new(@configuration)
  spec = IndexSpecification.for_unique_index(index_name, column.to_java_bytes)
  @iadmin.addIndex(table_name.to_java_bytes, spec)
end

Using this, I can create my tables in the shell easily.

create 'a', 'key', 'content'
create_index 'a', 'index', 'key:id'

This creates a table "a" with two column families, "key" and "content". It creates an index called "index" on the "id" column in the "key" column family. Internally,

To access this over Thrift, it's now very simple. You can look at query.rb but these are the important sections. First, we need to make sure we have Thrift hooked up.

require 'rubygems'
require 'hbase'

transport = Thrift::BufferedTransport.new(Thrift::Socket.new('127.0.0.1', 9090))
protocol = Thrift::BinaryProtocol.new(transport)
client = Apache::Hadoop::Hbase::Thrift::Hbase::Client.new(protocol)
transport.open

Thrift is required by hbase, so we don't need to require it separately. I've opened a connection to my own machine on port 9090, the default for the Thrift server. I don't know the details of the above, it's just voodoo I picked up on some other site. My main loop lets me query a bunch of secondary key interactively.

STDIN.each do |id|
  id.strip!
  row_keys = client.get index, id, '__INDEX__:ROW'
  row_key_cell = row_keys[0]
  if row_key_cell
    row_key = row_key_cell.value
    puts "Found row key #{row_key}"
    value = client.get table, row_key, column
    puts "Found item #{value[0].value}"
  else
    puts "unable to find '#{id}' in index"
  end
end

This is the important part. First, we get an id, this is the value we are trying to match to key:id in the table. The first thing we do is get the contents of "INDEX:ROW" from the index table where the row key matches the id. In my case, there's always only one, but the API doesn't know that, so it returns a list of cells. The cells have a column specifier, a timestamp, and what we care about, a value. This value is the row key of the primary table. If we found anything in the secondary index, we then do a get on the primary table.

This is currently describing my work-in-progress. Once I've actually got everything up and running in production under load, I'll create some patches to submit to HBase. I think adding UniqueIndexKeyGenerator greatly simplifies one common use of TableIndexed, and adding support for TableIndexed to the shell makes it easy to manage schema.

The code is now available in HBASE-1885.

Minimal HBase MapReduce Example for 0.19 and 0.20

| 2 Comments | No TrackBacks

HBase includes an example for populating a table from Hadoop map reduce, but it seemed overly complicated. I'm getting started with HBase and this was my starting point. This first one uses the old Hadoop API with everything in the mapred package, not mapreduce. It also uses the corresponding API from HBase which is now deprecated.

public class PopulateArticlesTable extends Configured
  implements Tool {
  public static class Map extends MapReduceBase
    implements
    Mapper<LongWritable, Text, ImmutableBytesWritable, BatchUpdate> {
    private ImmutableBytesWritable outKey = new ImmutableBytesWritable();

    @Override
    public void map(
      LongWritable offset,
      Text input,
      OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
      Reporter report) throws IOException {
      // whatever format your data is in
      RichArticle art = new RichArticle(input.toString());
      // a good HBase row key, consisting of a timestamp and a unique identifier to prevent collisions. All keys are byte arrays.
      byte[] rowId = art.getRowId();
      outKey.set(rowId);
      // We execute one update for each object we encounter, that update may be composed of multiple operations, in this case, two puts
      BatchUpdate update = new BatchUpdate(rowId);
      if (art.getTitle() != null)
        update.put("content:title", Bytes
          .toBytes(art.getTitle()));
      if (art.getBody() != null)
        update.put("content:body", Bytes
          .toBytes(art.getBody()));
      output.collect(outKey, update);
    }
  }

  @Override
  public int run(String[] args) throws Exception {
    // Standard boilerplate for creating and running a hadoop job
    JobConf job = new JobConf(getConf(), this.getClass());
    String input = args[0];
    job.setJobName("Populate articles table from " + input);
    // Input is just text files in HDFS
    TextInputFormat.setInputPaths(job, new Path(input));
    job.setMapperClass(Map.class);
    job.setNumReduceTasks(0);
    // Output is to the table output format, and we set the table we want
    job.setOutputFormat(TableOutputFormat.class);
    job.set(TableOutputFormat.OUTPUT_TABLE, "articles");
    JobClient.runJob(job);
    return 0;
  }

  public static void main(String args[]) throws Exception {
    int res = ToolRunner.run(new Configuration(),
      new PopulateArticlesTable(), args);
    System.exit(res);
  }
}

Next up I've converted everything to use the 0.20 APIs. You'll see that it got shortened too, as I use GenericOptionsParser directly instead of implementing Tool.

For the Hadoop changes, OutputCollector is no more, it has been replaced by Context. The Mapper interface and MapReduceBase have been merged into the Mapper class, which is intended to be extended (without extending it, Mapper is the IdentityMapper). In Job control, you no longer use JobClient.run, instead calling waitForCompletion on the Job. The configuration has been cleaned up, as you'll notice I have to create a configuration prior to the Job. One big item is that you need to manually setJarByClass, which was previously taken care of by creating the JobConf with the class as a parameter. Job.setOutputFormat has changed its name to setOutputFormatClass.

I'm new to HBase, so I'm not as sure about whether I've done things in the recommended way. The important things to note are that you need to set the table name in the conf before creating the job, and Puts and Deletes are Hadoop Writables.

public class PopulateArticlesTable {
  public static class Map extends
    Mapper<LongWritable, Text, NullWritable, Writable> {

    @Override
    protected void map(LongWritable offset, Text input, Context context) throws IOException, InterruptedException {
      // my input is in JSON format, in other applications, you might be splitting a line of text or any of Hadoop's writable formats
      RichArticle art = new RichArticle(input.toString());
      // RichArticles are able to output a good HBase row key, consisting of a timestamp and a unique identifier to prevent collisions. All keys in HBase are byte arrays.
      byte[] rowId = art.getRowId();
      // We output multiple operations for each row
      if (art.getTitle() != null) {
        Put put = new Put(rowId);
        put.add(Bytes.toBytes("content"), Bytes.toBytes("title"), Bytes.toBytes(art.getTitle()));
        context.write(NullWritable.get(), put);
      }
      if (art.getBody() != null) {
        Put put = new Put(rowId);
        put.add(Bytes.toBytes("content"), Bytes.toBytes("body"), Bytes.toBytes(art.getBody()));
        context.write(NullWritable.get(), put);
      }
    }
  }

  public static void main(String args[]) throws Exception {
    Configuration conf = new Configuration();
    conf.set(TableOutputFormat.OUTPUT_TABLE, "articles");
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    String input = otherArgs[0];
    Job job = new Job(conf, "Populate Articles Table with " + input);
    // Input is just text files in HDFS
    FileInputFormat.addInputPath(job, new Path(input));
    job.setJarByClass(PopulateArticlesTable.class);
    job.setMapperClass(Map.class);
    job.setNumReduceTasks(0);
    // Output is to the table output format, and we set the table we want
    job.setOutputFormatClass(TableOutputFormat.class);
    job.waitForCompletion(true);
  }
}

Hopefully the before and after for the APIs is helpful. These have both been tested and work on my system. You'll need to modify them slightly, of course, unless you happen to have a data object called RichArticle that has a string serialization.

Update: I should probably point you at the official documentation as well.

About this Archive

This page is an archive of entries from September 2009 listed from newest to oldest.

June 2009 is the previous archive.

October 2009 is the next archive.

Find recent content on the main index or look in the archives to find all content.