Murder Your Darlings

| No Comments | No TrackBacks

Lately I've been working on connectivity with NASDAQ. The protocols involve parsing fixed-offset messages of varios types. We're not doing high frequency trading so we are optimizing for programmer efficiency -- that is, the API I expose to the rest of the system should make sense, so I'm representing the different types of messages, trading conditions, exchange identifiers and so on as enums. I was working on processing incoming messages, in this case, implementing a handle for NASDAQ's SoupTCP protocol. The incoming message has a one-character code which I translate. I've seen programmers code this kind of thing using a big lookup table, but that leads to maintainability problems -- when you add an enum value, did you remember to add it to the case statement? Did that case statement get copy and pasted elsewhere? The better solution is to embed that logic in the enum itself using a static map and a factory method.

enum SoupMessageType {
  LOGIN_REQUEST('L'),
  LOGIN_ACCEPT('A'),
  LOGIN_REJECT('J'),
  DATA('S'),
  LOGOUT_REQUEST('O');

  private char code;

  private SoupMessageType(char code) {
    this.code = code;
  }

  private static final Map<Character, SoupMessageType> map;
  static {
    map = new HashMap<Character, SoupMessageType>(values().length);
    for (SoupMessageType v : values()) {
      map.put(v.code(), v);
    }
  }

  public char code() {
    return code;
  }

  public SoupMessageType from(char code) {
    return map.get(code);
  }
}

This is a simple pattern, and I found myself copying it from another enum. Since copy and paste is bad, I started looking for how to turn this pattern into an abstraction. First, I'd move the static code block into the constructor for a map-like class:

public class CodedEnumer<K, E extends Enum<E> & CodedEnum<K>> {
  private Map<K, E> map;
  public CodedEnumer(Class<E> klass) {
    E[] enumConstants = klass.getEnumConstants();
    map = new HashMap<K, E>(enumConstants.length);
    for(E v : enumConstants) {
      map.put(v.code(), v);
    }
  }

  public static <K, V extends Enum<V> & CodedEnum<K>>
    CodedEnumer<K, V> create(Class<V> klass) {
    return new CodedEnumer<K, V>(klass);
  }

  public E get(K key) {
    return map.get(key);
  }
}

The enum needs to implement a CodedEnum interface with one method.

public interface CodedEnum<K> {
  public K code();
}

My first draft of this included another type parameter E for the enum, and a public E from(K key) method. But of course this method should be static, and declaring a static method in an interface would be meaningless (aside from the other detail of being a compiler error).

Now, rather than copy and paste building the mapping from code to value, the enum needs to implement CodedEnum, create an instance of the CodedEnumer, and use that to implement the one-line static method.

enum SoupMessageTypeCoded implements CodedEnum<String, SoupMessageTypeCoded> {
  LOGIN_REQUEST('L'),
  LOGIN_ACCEPT('A'),
  LOGIN_REJECT('J'),
  DATA('S'),
  LOGOUT_REQUEST('O');

  private String code;

  private SoupMessageTypeCoded(String code) {
    this.code = code;
  }

  private static final CodedEnumer<String, SoupMessageTypeCoded> map =
    CodedEnumer.create(SoupMessageTypeCoded.class);

  @Override
  public String code() {
    return code;
  }

  public static SoupMessageTypeCoded from(String code) {
    return map.get(code);
  }
}

Pretty slick, huh? I was pretty pleased with myself when I actually found a use for intersection in a generic declaration. This is where the old writer's advice of "murder your darlings" comes into play. It's various attributed to Fitzgerald, Hemmingway or others, but the meaning is that whenever you write a particularly clever turn of phrase, whatever makes you smile at how smart you are, get out the red pencil or delete key, and get rid of it.

On sober reflection, this code sucks. I've created two extra types with complicated generics to save two or three lines of code. Anyone who opens up the class in the future will have to open two more files to understand how it works and what it's doing. So I struck it out, and reverted to the version with those three horrible lines wastefully repeated in each and every enum I use this pattern in. I can only console myself that disk space is getting cheaper.

Inside Automated Sentiment Analysis

| No Comments | No TrackBacks

This post details Biz360's automated sentiment analysis system, including our goals, how the system works, how we measure success, and the ways it can be used and misused. Before getting into the how or why, I want to start with the what. For our purposes, sentiment is the opinion of the author of an article towards the subject of an article. We classify sentiment into four possible categories.

Positive

Arguing for something, saying something is a good product, talking about good things a person or company has done, enjoying something, liking something, preferring something. If a mostly positive post has a small portion that is negative, it is still positive.

Negative

Arguing against something, saying something is a bad product, a bad experiences, talking about bad things a person or company has done, disliking or having problems with something. If a mostly negative post has a small portion that is positive, it is still negative

Neutral

If an post doesn't express any opinion, doesn't present anyone or anything in a favorable or unfavorable way, and wouldn't lead someone to form an opinion for or against, it is neutral.

Mixed

If an post is both positive and negative, such as saying something was good in some ways but bad in others, or if the post talks about different subjects and is positive toward one subject but negative to another, then rate the post as mixed.

The first question is why do you need automated sentiment. The simple answer is that there's just too much content. As conversations that used to take place over coffee and on street corners move to Twitter and forums, they become trackable. If a magazine with 100,000 readers mentions you in an article, you'll read that article and discuss what it's saying about you. If 10,000 people tell ten of their friends what they think of Kevin Smith vs. Southwest Air, you can't hope to read more than a small sampling. It's this later use case that we cared about.

  1. What portion of my coverage is positive, negative, etc?
  2. I got a spike in coverage on Monday. Was that spike positive or negative?
  3. What kinds of things are people saying that's positive? Negative?

We knew from the start that accuracy on the individual article level was never going to be that good. That is, if you want to know what the sentiment for some particular article is, the best thing to do is click on it, read it, and form your own opinion. With the help of Bill MacCartney, an NLP researcher from Stanford, we quickly honed in on the following design parameters:

  1. A statistical classification system using two classifiers to detect positive and negative, and another classifier to combine these results. We would start with a simple Naive Bayes classifier and a Decision Tree classifier to get everything working, and experiment with more advanced classifiers like the Linear MaxEnt classifier once we had a baseline to measure improvements.
  2. The system would be trained using lots of data from Mechanical Turk. Each item would be rated multiple times so we could throw out the results from raters who didn't understand or were not taking enough care.
  3. Our training data would be real social media content, drawn from all the types of social media we process (blogs, micro-blogs, etc).

At a very high level view, text classification systems get lumped into groups based on whether they are based on statistical learning from data, or whether they are based on hand-coded rules. Our system is solidly in the statistical camp. We were skeptical that a rule-based system could encompass the wide variety of topics and writing styles and the frequency of ungrammatical or misspelled content on the less formal parts of the Internet.

Our sentiment engine turns each post into a set of features, like ("good", "deal") -> 2, meaning the word "good" followed by the word "deal" occurs twice. This gets fed into a two-stage system. First, everything gets flagged for how positive it is (regardless of also being negative) and for how negative it is (regardless of how positive it is). Next, these get combined into the four categories that are displayed. So high positive sentiment and low negative sentiment would be positive, and high positive and high negative would be negative.

We really wanted a mixed category, because in terms of whether it's a post worth reading, someone who is saying both good and bad things about you is even more interesting than positive or negative. Consider the following three clips:

  1. I love my Kinesis Maxim keyboard, it's the best. My wrists feel great since I've been typing on it.
  2. Kinesis is stupid, the Maxim has a stupid layout. I had one for a while but I threw it out.
  3. I like my Kinesis Maxim, but the left alt key is too small and too far to the left.

Sure, the first one is what you hope everyone is saying, but reading these doesn't provide much value. The second one at least is an opportunity for damage control, but the third one is the real gold. In a system based on just a range from negative through neutral to positive, the positive and negative would cancel out and this kind of thing would get lumped into the neutral bucket.

This kind of statistical system isn't any good without good data, so we used an approach that gives us lots of good data quickly and cheaply. We sent out thousands of clips to Mechanical Turk, Amazon's "artificial artificial intelligence" where they were scored by ten humans each. The instructions they were given were exactly the definitions I gave above. Those aren't just descriptions of what we think the system produces, those are the starting point. When the results came back, the humans didn't always agree, and some agreed more than others. We threw out the ones who looked like they just didn't understand the problem at all or were clicking randomly since payment was per item. Of the remaining items, we still got disagreements, so we took the majority, so that if five people said positive, three said neutral and two said mixed, we'd used that clip as training data for positive. All of our data was real social media data. We evaluated one off-the-shelf solution which was trained on newspaper data, and when it said that "Comcast sucks!" was neutral, we gave up on that idea.

To evaluate our accuracy, we looked at a whole slew of numbers. We used a technique called k-fold cross validation, which means that we'd hold back some of our human-annotated data to use to evaluate how accurate the system is. A big challenge was that most of the content we got was neutral or positive, not mixed or negative. This makes it hard to use simple accuracy as the only metric. That is, if I have 90 items that should be classified as A and 10 items that should be classified at B, I could be 90% accurate by just saying everything was A. So I looked at the accuracy rates for each of the categories separately, and tried to balance them. Given my example with 90 A and 10 B, if I could get 90% accuracy, I'd really prefer 81 out of 90 As classified correctly and 9 out of the 10 Bs.

sentiment-breakdown-300x229.jpg

Of course, there's no "make mistakes evenly" button to press, but I think we found a combination that gives useful results. You can see in the attached chart of predicted vs. human-annotated sentiment that the errors are evenly spread across the categories. This illustrates what we mean when we say that sentiment, though it is only correct for about 2/3 of the individual items, is directionally accurate. If the system finds 100 articles for for a topic, and says 50 of them are positive, a lot of those will be wrong. Maybe you go through them and you see that 10 of them were neutral, four negative and one mixed. But when you go to the other categories, you'll find that the errors mostly balance out. Some of the neutral should have been positive, and so on. So maybe there should have been 52 positive.

There's a strong temptation when building an automated sentiment system to treat neutral as "I'm not sure". Computers make different kinds of mistakes than humans, and when the computer screws up something a human would have no trouble classifying correctly, it erodes confidence. The problem with this approach is that it focuses too much on not being wrong, and not enough on being right. If uncertain posts are rated as neutral, it changes the whole distribution of content. If you look at a topic and 75% of the content is "neutral", how much is really neutral and how much is swept under the rug because it didn't cross a confidence threshold? We treat neutral as just another category. To classify something that should be positive, negative, or mixed as neutral is just as incorrect as vice-versa.

I hope this has given you some insight into how Biz360's sentiment engine works, and lets you make better sense of the numbers you are seeing, or, if you are still comparing solutions, gives you things to look for and questions to ask. I'll be following this up in the future with another article explaining "entity" or topic-based sentiment.

This article was original posted on Biz360's Blog

How to roll back a committed change in SVN

| No Comments | No TrackBacks

A coworker asked me today how to roll back a change that has been committed to SVN. This isn't obvious and the top google searches return irrelevant results. To back out or roll back a change that has already been committed to the Subversion repository, you first merge your commit in reverse, and then you commit. That is, in change 2918, you committed some config files that should not be there. Do this:

% cd config
% svn merge -c -2918 ^/project/trunk/config
% svn ci -m 'revert checkins to config'

This is covered in more detail in Undoing Changes section of the documentation.

Handy way to monitor multiple HBase logs

| No Comments | No TrackBacks

We've been running HBase at Biz360 for about six months, but it worked so smoothly at first that I never did much tuning. I've recently increased the volume of data we're storing by about 300%, and have started running into some problems like blocks going missing. I found a good way to monitor all my servers logs to get an idea what's going on today, and it's so simple I wanted to share.

for x in 01 02 03 04 05 06 07 08 09 10 ; do
    server=prod-hbase$x
    echo === $server ===
    ssh $server tail -30f '/app/hbase/logs/*region*.log'
done

It does a tail -f on each server's log in turn. To move to the next server, just hit ctrl-c.

@squarecog suggests

for server in `cat hbase_servers.txt`; do ...

You, right there, go call 911

| No Comments | No TrackBacks
One item that stuck in my mind during a first aid class during boot camp was how to direct someone to call 911.

Wrong:
"Somebody call 911!"

Right:
"You, in the black shirt, go call 911."

If something isn't the responsibility of a particular person, it will not get done. Everyone assumes someone else will do it.

The same thing applies to software, and especially maintenance issues. If you say "hey, guys, looks like the app is down", it's going to stay that way. An open ticket not assigned to anyone is only useful if that is implicitly assigned to the product manager to review for the next iteration. If the app is down, then the right action is to open a ticket, and say, "hey, the app is down, it looks like the database, Joe, that's your area, here, it's your ticket". If Joe determines that the database is down because someone tripped over the power cord, then Joe can assign the ticket to Larry in ops, but every step of the way, some one particular person is responsible for that task.

Hadoop Workflow Tools Survey

| 2 Comments | 1 TrackBack

Hadoop Map Reduce and HDFS are fairly stable pieces of software. One component that doesn't have a clear winner yet is higher level job scheduling, also known as workflow scheduling.

To put this in context for someone who isn't familiar with Hadoop, a single Hadoop job is broken up into many map and reduce tasks. The scheduler runs on the job tracker and assigns tasks to open slots on the task trackers on the worker nodes. When we talk about the scheduler in Hadoop, this is usually what we are talking about. By default, Hadoop uses a FIFO scheduler, but there are two more advanced schedulers which are widely used. The Capacity Scheduler is focused on guaranteing that various users of a cluster will have access to their guaranteed number of slots while making it and the Fair Scheduler is focused on providing good latency for small jobs while long running large jobs share the same cluster. These schedulers closely parallel processor scheduling, with hadoop jobs corresponding to processes and the map and reduce tasks corresponding to time slices.

The next level up is workflow scheduling -- starting jobs on a cluster in the right order and with dependencies. Sometimes a single map-reduce job is all you need. More frequently, you will have many jobs with dependencies between them. For example, you might want to identify the most important words in each document using term frequency–inverse document frequency, which requires first calculating the inverse document frequency then making use of that while examining the documents again. In this case, a shell script that runs the first job, waits for it to complete and then starts the second will work.

Once you go down this path, you start running into difficulties. Perhaps job C depends on job A and job B, but it's fine for A and B to run in parallel. If D depends on B and C, and B and C depend on A, and B fails part way through, how do you recover? It's not a particularly hard problem, but it's enough of a problem that we'd like to not reinvent the wheel. After all, while people use Hadoop for different tasks, this workflow scheduling problem is common to everyone.

I recently sent out a poll to the Hadoop mailing list to see how people are solving this problem.

Posner explains CYA security theater

| No Comments | No TrackBacks

It's obvious to any rational outside observer that US terrorism policy mostly revolves around making sure people think politicians are "doing something", regardless of whether something needs to be done, or whether what they're doing is the right thing. Explaining the work for which Williamson won the Nobel last week, Judge Posner writes:

[FBI criminal-investigation functions] lend themselves to what are called "high-powered" incentives, which are systems of compensation and promotion that are based on objective performance criteria. In the case of criminal investigation these are number of arrests weighted by convictions and sentence. Intelligence work does not lend itself to such performance criteria, because the effect of surveillance and other intelligence activities in preventing terrorism or subversion is usually very difficult to assess. Hence motivation takes the form of creating a "high commitment" environment in which the organization's leaders try to elicit good performance by getting staff to internalize the organization's goals. The problem is that the absence of objective criteria of performance opens the door to "influence activities" by which members of the organization jockey for advancement.

If both types of task are combined in the same organization--those that can be directed by high-powered incentives and those that require high commitment as their motivator, the best employees will tend to gravitate toward the first type of task because they will be confident that they will do well if their performance is judged according to objective criteria. They will be much less certain how well they will do in a job in which influence activities play a large role in determining success.

To summarize the summary, the best and the brightest will be drawn to organizations that have objective measures of success, but even more so, within a given organization, they will be drawn to these types of roles. Those who aren't very good, and especially those who can be political hacks who shamelessly talk about how the threat level is Orange today, so put on extra sunscreen, will drawn to those roles without objective measures of success, where climbing the career ladder is based on criteria other than doing the job better than the next guy.

Check out the whole article

Capping simultaneous tasks in Hadoop

| No Comments | No TrackBacks

Fair Scheduler Pools Screenshot

We've run into several situations in Hadoop where we want to prevent a job from using more than a certain number of slots. Some of our jobs have external resources that don't scale. One task needs to talk to a MySQL database. Another writes to our Solr cluster. These are jobs that we know beyond a certain point they don't go any faster -- if we have 200 mappers running, it's not any faster than 50. We moved to the fair scheduler partially to alleviate some of these concerns. The idea was if multiple jobs are running at once, they aren't likely to be the same type of job.

The other day I ran into a problem again and decided to take a look around to see if anyone had done anything in this direction. The first issue was HADOOP-5170 which ended with a consensus that the functionality should be in the scheduler, not part of Map Reduce proper. MAPREDUCE-698 is to add a per-pool simultaneous tasks cap to the Fair Scheduler, which is a much better idea than to cap it on the job level.

If your jobs rely on external services like a database or web service, you can run those jobs in a particular pool. If you have two jobs in this pool, then they will share the cap, and the load on your database remains constant. Also, these tasks can be assigned a set minimum on their pool to ensure that you don't have the database sitting there idle, and then have half your hadoop cluster sitting idle later when you are waiting for these jobs to finish.

If your jobs have very long-running tasks, like when building a Lucene index in a reducer, you may want to avoid having these jobs grab slots during gaps when there are no jobs running. I see this frequently when one job finishes, and in the time before the dependent job starts up, all the slots have been taken by another job. Without preemption, you can end up increasing latency a lot.

Using HBase TableIndexed from Thrift with unique keys

| No Comments | No TrackBacks

HBase is primarily a sorted distributed hash map, but it does support secondary keys through a contrib package called Transactional HBase. The secondary keys are provided by a component called TableIndexed. A good general walkthrough is Secondary indexes in HBase on Rajeev Sharma's blog. My blog post specifically addresses how to use secondary indexes outside of the Java API and how to handle unique keys.

Our scenario is this: my articles table will use a row key that starts with the timestamp. This is a very common scenario in HBase because it is usually the most natural way to access information. I also have on at least some articles, a secondary key that I want to be able to do lookups by. Let's have a concrete example for further discussion.

row key  key:id  content:title
  1234     abc     ...

These keys are always unique, this is guaranteed at the application level. I would like to be able to fetch an article using the secondary key. All pretty straightforward, except I want to do this outside of Java using just Thrift, and I would prefer to do it using only get() because that makes it easier to write and easier for anyone coming along to read later. Scanners may be powerful, but it's not intuitive that a get using a unique secondary index would use them.

The first thing we need is the index specification that can create the unique key that we want. By default, HBase provides a SimpleIndexKeyGenerator that creates row keys that start with the value of the secondary key, in our example, they would be abc1234. This supports having multiple rows that match the secondary key, for example, abc5678, but means that you have to use a scanner to get at the information.

I've written a UniqueIndexKeyGenerator that would use the exact value from the primary table's secondary key as the row key for the index table. That is, for our example, the row key in "articles-index" would be "abc". I've also added a forUniqueIndex static method to IndexSpecification to make it easier to call this from the jruby shell.

  /**Construct an index spec for a single column that has only unique values.
   * @param indexId the name of the index
   * @param indexedColumn the column to index
   * @return the IndexSpecification
   */
  public static IndexSpecification forUniqueIndex(String indexId, byte[] indexedColumn) {
    return new IndexSpecification(indexId, new byte[][] { indexedColumn },
        null, new UniqueIndexKeyGenerator(indexedColumn));
  }

I then add a create_index method directly in the shell. The shell is flexible enough that you have access to things like @configuration which are used by the all the other methods. Also, I was unsure whether I could do an import inside a method, but it worked just fine. This is there with the idea that it could eventually be added to the shell and if you tried to call it, would fail with a class not found if you didn't have the transactional jar in your classpath.

def create_index(table_name, index_name, column)
  import org.apache.hadoop.hbase.client.tableindexed.IndexedTableAdmin
  import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification
  @iadmin ||= IndexedTableAdmin.new(@configuration)
  spec = IndexSpecification.for_unique_index(index_name, column.to_java_bytes)
  @iadmin.addIndex(table_name.to_java_bytes, spec)
end

Using this, I can create my tables in the shell easily.

create 'a', 'key', 'content'
create_index 'a', 'index', 'key:id'

This creates a table "a" with two column families, "key" and "content". It creates an index called "index" on the "id" column in the "key" column family. Internally,

To access this over Thrift, it's now very simple. You can look at query.rb but these are the important sections. First, we need to make sure we have Thrift hooked up.

require 'rubygems'
require 'hbase'

transport = Thrift::BufferedTransport.new(Thrift::Socket.new('127.0.0.1', 9090))
protocol = Thrift::BinaryProtocol.new(transport)
client = Apache::Hadoop::Hbase::Thrift::Hbase::Client.new(protocol)
transport.open

Thrift is required by hbase, so we don't need to require it separately. I've opened a connection to my own machine on port 9090, the default for the Thrift server. I don't know the details of the above, it's just voodoo I picked up on some other site. My main loop lets me query a bunch of secondary key interactively.

STDIN.each do |id|
  id.strip!
  row_keys = client.get index, id, '__INDEX__:ROW'
  row_key_cell = row_keys[0]
  if row_key_cell
    row_key = row_key_cell.value
    puts "Found row key #{row_key}"
    value = client.get table, row_key, column
    puts "Found item #{value[0].value}"
  else
    puts "unable to find '#{id}' in index"
  end
end

This is the important part. First, we get an id, this is the value we are trying to match to key:id in the table. The first thing we do is get the contents of "INDEX:ROW" from the index table where the row key matches the id. In my case, there's always only one, but the API doesn't know that, so it returns a list of cells. The cells have a column specifier, a timestamp, and what we care about, a value. This value is the row key of the primary table. If we found anything in the secondary index, we then do a get on the primary table.

This is currently describing my work-in-progress. Once I've actually got everything up and running in production under load, I'll create some patches to submit to HBase. I think adding UniqueIndexKeyGenerator greatly simplifies one common use of TableIndexed, and adding support for TableIndexed to the shell makes it easy to manage schema.

The code is now available in HBASE-1885.

Minimal HBase MapReduce Example for 0.19 and 0.20

| 2 Comments | No TrackBacks

HBase includes an example for populating a table from Hadoop map reduce, but it seemed overly complicated. I'm getting started with HBase and this was my starting point. This first one uses the old Hadoop API with everything in the mapred package, not mapreduce. It also uses the corresponding API from HBase which is now deprecated.

public class PopulateArticlesTable extends Configured
  implements Tool {
  public static class Map extends MapReduceBase
    implements
    Mapper<LongWritable, Text, ImmutableBytesWritable, BatchUpdate> {
    private ImmutableBytesWritable outKey = new ImmutableBytesWritable();

    @Override
    public void map(
      LongWritable offset,
      Text input,
      OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
      Reporter report) throws IOException {
      // whatever format your data is in
      RichArticle art = new RichArticle(input.toString());
      // a good HBase row key, consisting of a timestamp and a unique identifier to prevent collisions. All keys are byte arrays.
      byte[] rowId = art.getRowId();
      outKey.set(rowId);
      // We execute one update for each object we encounter, that update may be composed of multiple operations, in this case, two puts
      BatchUpdate update = new BatchUpdate(rowId);
      if (art.getTitle() != null)
        update.put("content:title", Bytes
          .toBytes(art.getTitle()));
      if (art.getBody() != null)
        update.put("content:body", Bytes
          .toBytes(art.getBody()));
      output.collect(outKey, update);
    }
  }

  @Override
  public int run(String[] args) throws Exception {
    // Standard boilerplate for creating and running a hadoop job
    JobConf job = new JobConf(getConf(), this.getClass());
    String input = args[0];
    job.setJobName("Populate articles table from " + input);
    // Input is just text files in HDFS
    TextInputFormat.setInputPaths(job, new Path(input));
    job.setMapperClass(Map.class);
    job.setNumReduceTasks(0);
    // Output is to the table output format, and we set the table we want
    job.setOutputFormat(TableOutputFormat.class);
    job.set(TableOutputFormat.OUTPUT_TABLE, "articles");
    JobClient.runJob(job);
    return 0;
  }

  public static void main(String args[]) throws Exception {
    int res = ToolRunner.run(new Configuration(),
      new PopulateArticlesTable(), args);
    System.exit(res);
  }
}

Next up I've converted everything to use the 0.20 APIs. You'll see that it got shortened too, as I use GenericOptionsParser directly instead of implementing Tool.

For the Hadoop changes, OutputCollector is no more, it has been replaced by Context. The Mapper interface and MapReduceBase have been merged into the Mapper class, which is intended to be extended (without extending it, Mapper is the IdentityMapper). In Job control, you no longer use JobClient.run, instead calling waitForCompletion on the Job. The configuration has been cleaned up, as you'll notice I have to create a configuration prior to the Job. One big item is that you need to manually setJarByClass, which was previously taken care of by creating the JobConf with the class as a parameter. Job.setOutputFormat has changed its name to setOutputFormatClass.

I'm new to HBase, so I'm not as sure about whether I've done things in the recommended way. The important things to note are that you need to set the table name in the conf before creating the job, and Puts and Deletes are Hadoop Writables.

public class PopulateArticlesTable {
  public static class Map extends
    Mapper<LongWritable, Text, NullWritable, Writable> {

    @Override
    protected void map(LongWritable offset, Text input, Context context) throws IOException, InterruptedException {
      // my input is in JSON format, in other applications, you might be splitting a line of text or any of Hadoop's writable formats
      RichArticle art = new RichArticle(input.toString());
      // RichArticles are able to output a good HBase row key, consisting of a timestamp and a unique identifier to prevent collisions. All keys in HBase are byte arrays.
      byte[] rowId = art.getRowId();
      // We output multiple operations for each row
      if (art.getTitle() != null) {
        Put put = new Put(rowId);
        put.add(Bytes.toBytes("content"), Bytes.toBytes("title"), Bytes.toBytes(art.getTitle()));
        context.write(NullWritable.get(), put);
      }
      if (art.getBody() != null) {
        Put put = new Put(rowId);
        put.add(Bytes.toBytes("content"), Bytes.toBytes("body"), Bytes.toBytes(art.getBody()));
        context.write(NullWritable.get(), put);
      }
    }
  }

  public static void main(String args[]) throws Exception {
    Configuration conf = new Configuration();
    conf.set(TableOutputFormat.OUTPUT_TABLE, "articles");
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    String input = otherArgs[0];
    Job job = new Job(conf, "Populate Articles Table with " + input);
    // Input is just text files in HDFS
    FileInputFormat.addInputPath(job, new Path(input));
    job.setJarByClass(PopulateArticlesTable.class);
    job.setMapperClass(Map.class);
    job.setNumReduceTasks(0);
    // Output is to the table output format, and we set the table we want
    job.setOutputFormatClass(TableOutputFormat.class);
    job.waitForCompletion(true);
  }
}

Hopefully the before and after for the APIs is helpful. These have both been tested and work on my system. You'll need to modify them slightly, of course, unless you happen to have a data object called RichArticle that has a string serialization.

Update: I should probably point you at the official documentation as well.

Recent Comments

  • jeff.hammerbacher: Hey Kevin, Have you tried the patched version of Oozie read more
  • Kevin Peterson: My further thoughts are in code at http://github.com/kevinpet/jobcontrol read more
  • Kevin Peterson: Turning off SimpleDB and S3 after about a year of read more
  • Sam: Thanks that's exactly what i was looking for! read more
  • tsaloranta: One quick comment: Jackson 1.3 (that was just released) has read more
  • ishwar: thanks for this reference! read more
  • stack: Nice example Kevin. Thanks for putting it together. read more
  • Ashley Tate: Hi Kevin, Nice write up. I've created a .NET API read more
  • Kevin Peterson: Corrected the 1GB performance degradation item. A contact at Amazon read more
  • cvaschmidt#a4ed5: You cite 4 options beginning with the words "Balance budget read more

Recent Assets

  • sentiment-breakdown-300x229.jpg
  • pools.png
  • inbound-links.png

Find recent content on the main index or look in the archives to find all content.