Hadoop Workflow Tools Survey

Hadoop Map Reduce and HDFS are fairly stable pieces of software. One component that doesn’t have a clear winner yet is higher level job scheduling, also known as workflow scheduling.

To put this in context for someone who isn’t familiar with Hadoop, a single Hadoop job is broken up into many map and reduce tasks. The scheduler runs on the job tracker and assigns tasks to open slots on the task trackers on the worker nodes. When we talk about the scheduler in Hadoop, this is usually what we are talking about. By default, Hadoop uses a FIFO scheduler, but there are two more advanced schedulers which are widely used. The Capacity Scheduler is focused on guaranteing that various users of a cluster will have access to their guaranteed number of slots while making it and the Fair Scheduler is focused on providing good latency for small jobs while long running large jobs share the same cluster. These schedulers closely parallel processor scheduling, with hadoop jobs corresponding to processes and the map and reduce tasks corresponding to time slices.

The next level up is workflow scheduling – starting jobs on a cluster in the right order and with dependencies. Sometimes a single map-reduce job is all you need. More frequently, you will have many jobs with dependencies between them. For example, you might want to identify the most important words in each document using term frequency–inverse document frequency, which requires first calculating the inverse document frequency then making use of that while examining the documents again. In this case, a shell script that runs the first job, waits for it to complete and then starts the second will work.

Once you go down this path, you start running into difficulties. Perhaps job C depends on job A and job B, but it’s fine for A and B to run in parallel. If D depends on B and C, and B and C depend on A, and B fails part way through, how do you recover? It’s not a particularly hard problem, but it’s enough of a problem that we’d like to not reinvent the wheel. After all, while people use Hadoop for different tasks, this workflow scheduling problem is common to everyone.

I recently sent out a poll to the Hadoop mailing list to see how people are solving this problem.
The first question was what are you using to manage your jobs.

The next question mostly applied to those using shell scripts or a homegrown system and asked how these systems interacted with Hadoop.

I also asked whether people were happy with the tools they are using (whether homegrown or off the shelf)

For those who want to look at the raw data (as sparse as it is), I’ve posted it to a google document. What was most interesting is that of the people using a homegrown system, only one said they were at all happy with it, and none would recommend their system. A majority of those using a higher level abstraction would recommend their system to others. Before taking the poll, I worried that I was doing things wrong, that there was some simple clear solution that everyone was adopting. The opposite was true: any combination I could come up with, there was someone out there who had actually done it that way.

There’s a continuum from just running a Hadoop job by typing bin/hadoop jar ... or putting it into crontab up through more complicated systems like what we have been using at Biz360 that involve scripts to figure out batch numbers and parameters and then start Java processes that may run multiple jobs using JobControl. The only person using a homegrown system who said it was acceptable is using something based on JobControl. JobControl is included with Hadoop and simply helps to manage dependencies between Jobs. Rather than keeping track on your own that you can start job A and job B, and need to wait for them both to finish before starting C, you can add them both to a JobControl and run it. Dependencies can only be between jobs – you can’t have a task to move files around depend on jobs or a job depend on whether a directory is empty or anything like that. This is client side, so the process that started the JobControl will need to keep a thread running. You can detect errors, and it will stop running jobs when a dependency can’t be satisfied, but there’s no way to recover from errors. If you want to retry jobs, you need to handle that yourself.

Another popular option, and the one that seems to have the most happy users, is a higher level abstraction that runs on top of Hadoop like Cascading, Pig or Hive. These share many common features.

There are significant differences. Hive’s uses SQL to express the workflow, Pig has its own language called Pig Latin, while Cascading is written in Java or Groovy. User defined functions in Pig are very much an extension to Pig, compared to Cascading where it’s possible to create a Flow directly from a Hadoop JobConf. Hive specifically targets integration with SQL based tools. All of these to some extend insulate the user from the Hadoop concept of jobs, replacing it with something else. For our purposes at Biz360, the ability to plug existing Hadoop jobs unchanged into a Cascade would make it the best choice if we wanted to go this route. Other users may find the simple query language Pig or Hive compelling.

One more specialized tool is Hamake, named for and inspired by make. This allows you to express your jobs as dependencies on data so that you can run only those portions of a complex pipeline that are not up to date. I should note that Cascading also has this functionality.

None of these really suit our purposes though. I’d prefer to stick to writing Hadoop map-reduce jobs, not Pig Latin or Cascading Flows. Understanding where my data goes when I have a Hadoop Reducer writing to a SolrOutputFormat that writes to an instance of Solr running on the same node as the Reducer is right at the edge of what I can keep track of. If I introduce another layer of indirection, I would get hopelessly confused.

There are tools related to scheduling map reduce jobs and assembling them into workflows. Amazon’s Elastic Map Reduce has one tool, but this is tied to their particular service. There’s Cloudera Desktop, which offers some basic job scheduling functionality, but this doesn’t yet offer much functionality for workflow scheduling. I’m not sure what functionality Opswise offers as far as Hadoop scheduling goes. I’d never heard of it before someone mentioned it in the poll.

Some I’m going to list what I think are the killer features to see in a Hadoop workflow scheduling system:

Most of what I’ve listed above is available in Oozie, the Hadoop Workflow System. Generally, it allows you to express a workflow as a DAG using XML, it supports Java map-reduce jobs, streaming jobs, and even Pig. You can have nodes that are file system actions, or call outside shell scripts. Everything is persisted in MySQL and it has a UI showing what stage each running workflow is at. On paper, Oozie looks like the holy grail. The downside is that it isn’t to the level of polish the other projects I mentioned are. I haven’t attempted to set it up because I can’t see that it has been updated for 0.20. The last update to the Jira was in June, and it hasn’t yet been committed, even though it’s just a contrib package.

But never fear, the latest post on Yahoo’s Hadoop blog is about hiring for the Hadoop team, and I notice that they have an opening for “Senior Engineer for Oozie” and say that “the Oozie team is rapidly growing”. At least this means if I set it up and learn to use it, I can trust that it isn’t going to be dying off any time soon.

Even if nothing comes of Oozie, Cloudera is also working in this direction. Cloudera Desktop currently has a job designer that allows you to create and save parameterized single map-reduce jobs, for example, to set all of the options except the input and output directories. It doesn’t yet have any workflow tools, but I’m told they are in the works.