June 2009 Archives

Scala Actors at SDForum

| No Comments | No TrackBacks

I'm attending tonight a presentation on actors and actors in scala, presented by SDForum Software Architecture SIG. Upcoming meeting: July 21, 3rd tuesday, Vijay Patel linked in talk about analytics. Carl Hewitt, Stanford; Robey Pointer, Twitter; Frank Sommers, Artima; Bill Venners moderating. Abstract to concrete.

I'll post this as-is and come back and edit it later.

Carl Hewitt, Stanford, inventor of actor paradigm

Carl Hewitt: back in the day in 1972 we programmed Smalltalk with a magnetized needle and a steady hand, uphill in the snow both ways.

Three things: send more messages; create new addresses; decide what state for next message. Petri nets as a model suffer from being physically impossible. The three way model has the advantage of being possible. Implications of actors model breaks representation as turing machine or in lambda calculus.

Cloud: it's clients, all the way up. (Title for client server interaction on the cloud: the fog rolls in.) John MacCarthy defines lisp in terms of lisp. "How many have seen 'eval'?" 2 hands. Review of that lecture from 61a. Instead of eval the function, eval as a message. If I'm X and I get an eval message with an environment. This is the best way to define concurrent programming languages currently. Bah, say mathematicians, that's circular! Well, too bad, concurrency doesn't fit math.

Define theoretical PL ActorScript. XML and JSON instantiations. No assignments, but not functional. Actors get replaced by their next version.

Tension between well-targetted ads and user desire for privacy. But government can't mine your data fast enough. Spooks will need to reside inside datacenters. Try to move behavioral targetting to client, store encrypted data on cloud. July 23rd symposium on semantic integration at Stanford. Info at http://carlhewitt.info

Invented the one minute lecture. Advertisers can deliver a coherent lecture in 30 seconds. (Fantastic idea -- I should do this) Lots of questions following his lecture. This guy just drops the bomb in terms of many ideas at once. Stalinist theory of computation. Lots of parrallelism down the tree. Company model of computation. Different departments. All of the departments talk to each other without having to go through the CEO. That's concurrency. Map Reduce does the parrallelism but doesn't do the concurrency.

Frank Sommers - Actors in Scala

Why scala makes actors natural. Example Scaling actors the future

Immutability, OO + functional, pattern matching, easy DSL, JVM. Mainstream language that lends itself to Actors.

Walks through use of Scala actors showing a chat program. When user joins, receiver of subscribe message create a new actor to handle updating the user. Illustrates the shorthand syntax for actors. Sync, async and futures messages. Gets derailed by questions from audience that are too detailed.

Scala actors support working in a distributed fashion, same syntax as within a single VM. Need to import RemoteActor._, listen on a port. Sending actor needs to know the "node", tuple of address, port number, and symbol. Thread-per-actor and Event-driven actor implementations. Example used thread-per-actor, but better scalability with event driven actors, execute actors on a thread pool. Wait for messages without consuming a thread. Can scale to millions of actors on a single JVM using this system. Able to schedule actor sending message to another actor within the same thread, effectively performance of subroutine call.

Missed a bit, I think he's talking about how react cannot return conventionally. But now the time is displayed in my emacs status bar, so all is good. Now I just need to display my battery status.

Will be getting continuations in the future. Pluggable schedulers, better actor isolation using compiler plugin, static checking, integrating exceptions, actor migration (to different node? I assume). Tensions whether actors should be more complicated, or if the actors library should remain very basic. Also question of single actors library or multiple actors libraries, e.g., Lift uses a simpler library than Scala actors. Partially pragmatic concerns versus more pure approach.

Still lots of theoretical problems, but quite usable for any actual scenario.

Robey Pointer -- Twitter

Talk is titled "solving problems with actors". Got started with Actors writing a chat proxy for cell phones. Long lived connections, lots of connections, mostly idle. First attempt was with one thread per session. Very simple, but didn't scale. Went with thread pools and async IO. More scalable but harder to read. Fatal flaw: blocking on other services (http). Fix all APIs to be async using hideous callbacks. If it doesn't fit on a slide, it's not good code.

Actors: each session is an actor. Events are just messages. Can seek ahead for specific events. Works will with java.nio and apache mina. Mina wraps nio as events, his naggati library translates this into scala messages.

Kestrel. Message queue. Memcache protocol as a Mina plugin. Scales horizontally, no awareness of each other. Stats on one server: 1 month uptime, 2.4 TB written, 4 billion gets, 1.6 billion sets.

Actors just one of many tools. Used synchronized for some features. What didn't work: each queue is an actor. Move to queues using synchronized data. Need to read this code and study it.

Actors are still a little shaky. Actors lifetime issues. Mixing threads with actors make it hard to GC.

Lots of exciting stuff.

Make Scala lists work for you

| No Comments | No TrackBacks

(Well dammit, I meant to save a draft, and here I've accidentally published it and FriendFeed spreads it around like a gossipy neighbor, so I guess I'd better finish it.)

I've been working on a collaborative filtering system based on genetic algorithms for message boards, running shoe recommendations, the Netflix Prize for a while now. The latest iteration is a mix of Java and Scala. I sat down to clean up some of the code tonight, and wanted to rewrite a function that made use of dot product.

Scala, like pretty much every modern programming language, has a REPL, aka. interpreter, making it really easy to work the kinks out of something before getting ant, junit, or an IDE involved. But when I go to paste the methods I need to work with into the repl, curses, foiled again! That piece of my app is in Java, I'll have to rewrite it in Scala.

What a wonderful opportunity to talk about how Lists will make all of your wildest dreams come true. Let's see the Java version of unit vector:

public static double[] unit(double[] vector) {
    double[] unit = new double[vector.length];
    double norm = 0.0;
    for(double v : vector) {
        norm += v * v;
    } 
    norm = Math.sqrt(norm);
    for(int i = 0; i < vector.length; i++) {
        unit[i] = vector[i] / norm;
    }
    return unit;
}

Wow that's a lot of typing for such a simple concept. How would I express "divide each component by the magnitude of the vector" in scala? Well, pretty much exactly like that:

def unit(v : List[Double]) = {
  val sum = Math.sqrt((0.0 /: v.map(x => x * x)) {_ + _})
  v.map(x => x / sum)
}

Let's walk through this one. The innermost v.map(x => x * x) maps the vector to its squares. The /: does a fold left starting with 0.0, and applying { _ + _ }. Note that /: is a method call on the list returned by the v.map.... This gives the sum of squares, we take the square root to get the magnitude. Our last operation is another map. The tricky part here is the precedence of the /: operator means that you do need parentheses.

Now I have unit vectors, and I need the dot product that operates on unit vectors. If your math is hazy, dot product of unit vectors [a, b, c] and [x, y, z] is a * x + b * y + c * z. Here it is in Java:

public static double dot(double[] a, double[] b) {
    double result = 0.0;
    for(int i = 0; i < a.length; i++) {
        result += a[i] * b[i];
    }
    return result;
}

Pretty straightforward, but still imposes the mental effort on you of deciding a name for that variable, and I really hate that. Bottom line? Scala lets me avoid thinking up names for variables:

def dot(a : List[Double], b : List[Double]) = {
  ((a zip b).map{case(x,y) => x * y} :\ 0.0) { _ + _ }
}

We'll walk through this one. I love zip. I like the Java 5 for-each loop, but it doesn't do me any good when iterating over two lists in parallel. With Scala Lists, you can zip them together and treat them as a single list.

scala> List(1, 2, 3) zip List('a, 'b, 'c)
res25: List[(Int, Symbol)] = List((1,'a), (2,'b), (3,'c))

When I use map, I use pattern matching to get my two items back out. Map expects a function taking a single parameter, and gives it whatever is in the list, which in this case are actually Tuple2 objects. Pattern matching allows me to break that tuple apart. I sum the list up, this time using a fold right, which means I need to change the order of the parameters.

So now I'm able to play with things in the REPL, get my replacement code working, and paste it back into my project, and run the junit test to verify that my improvements didn't break anything. At 11pm, between putting the baby to bed and going to bed myself, I don't have much mental energy to hold huge methods in my head. Scala means I don't need to.

The downside is that then I go back to work in the morning, I sit there staring at the screen wondering why Eclipse is not formatting my Java right and displaying little red errors until I realize that I actually need semi-colons and return statements in Java.

Pack multiple small objects in S3 for cost savings

| 4 Comments | No TrackBacks

Amazon Web Services offers two forms of data storage. First is S3, a key-value store allowing very large files if needed, but with a pricing model that will cause problems for small files. Second is SimpleDB, a schema-less or column-oriented database, which allows storing many small pieces of data, but with limitations that prevent it scaling to the point that S3 becomes economical. In this post I describe a system I built to leverage SimpleDB to reduce the costs of storing small files in S3.

Context: Biz360 Community Insights

Biz360 Community Insights is a social media monitoring and measurement system. We consume various types of social media -- blogs, forums, microblogs -- perform analysis on each item, index it, store it, and present it to the user. We process tens of millions of items every day, ranging from under 1k for a tweet with metadata and analysis results, up to blog posts that can go above 100k. These items are indexed in Solr, but we don't store the full text in the index for size reasons.

The Problem

The problem is where to store the complete article with metadata when we are done with it. These average around a few kb. The initial solution was to store them in S3, but our first month's bill was thousands of dollars. Not for the storage: it was a small amount of data. Not for the data transfer: everything from EC2 is free. The cost was the $0.01 per 1000 PUT requests. We figured there had to be a way to bring this cost down.

About the same time, we were having trouble with a choke point in our application when we did a large map-reduce job to reconcile duplicate articles which could be sped up with a persistent lookup table of some kind. So we needed to stop storing all our items in S3, and we needed a secondary index on at least some of our items. The first thing to come to mind was SimpleDB. We were already in AWS, it didn't have any operations headaches, and although the storage cost per GB was higher, we had now seen that it the storage cost wasn't the most significant factor.

Among the other options we considered was running our own non-relational database with the top contenders being CouchDB and the schema-less MySQL used by FriendFeed. CouchDB didn't seem all that mature, and we had concerns about the volume of data we could manage in MySQL and the number of machines we would need.

SimpleDB turned out to have significant limitations. First, it limits the size per attribute to 1024 bytes. If we wanted to actually store our data in SimpleDB, we would have needed to do some complicated system splitting the large items into 1kb blocks and store them in several attributes. Secondly, it has a limit of 10GB per domain. We expected to store orders of magnitude more than this.

The Solution

The general solution is that each S3 object stores multiple items, and two sets of SimpleDB domains provide indexes to those items.

All of my objects have a unique item key. In the first set of domains, this item key will be the "itemName" for SimpleDB. Some of my items have a secondary key, a which I'll call dupeId for this article. In my secondary SimpleDB set, this dupeId will be the itemName, and the item key will be one of the stored attributes.

Items are distributed to domains based on their key. I pick a number of domains that I will partition across. Here you want to estimate the total volume of data you will be storing in SimpleDB (in my case about 200 bytes per item). Choose a number of domains so that each partition will store no more than 1GB when you get to maximum capacity. SimpleDB is actually limited to 10GB per domain, but I've been told by Amazon that above 1GB performance starts to degrade. Also, you will not be able to change the number of partitions after the fact, so this gives you sufficient headroom when you realize that you can't actually delete items. In my case, I'm using 30 domains for my main set of domains, and 10 domains for my secondary key (which I don't need to retain for as long). You can have up to 100 domains without having to contact them. They've been happy to increase our instance limit in the past, so if you need more, just ask.

To store data, I need to store three things:

  1. The full item in S3, but remember this is what I want to minimize.
  2. The index from the item key to the S3 location in the primary SimpleDB.
  3. The secondary index from the blogHash to the item key in the secondary SimpleDB.

I will mostly ignore the third step, as it is straightforward and there is nothing interesting about it.

Implementation

I've defined two services, DomainSet and DomainSetS3. My access to SimpleDB is through via Typica. A domain set is the SimpleDB portion, a domain set with S3 uses a domain set, a buffer for each partition, and an S3 service. I use JetS3t for S3 access.

A DomainSet contains an array of SimpleDB domains, and I index into that array using hash of the item key modulo number of domains. In my case, since we will also be using the system from our Rails front end, I defined my own that I can keep consistent (though I could have just duplicated java's hashcode in Ruby). The public interface consists of only two methods:

public interface DomainSet {
    public void store(SimpleDbItem item);
    public SimpleDbItem find(String itemName);
}

SimpleDBItem is an object that contains the item name and a list of attributes. Under the covers, this gets translated to and from the Typica objects, but doesn't expose the complexity of the SimpleDB API. Since I prefer composition to inheritance, DomainSetS3 uses this interface, but does not extend from it. This does add a few lines of code (I have to store numDomains and maxFailures and use both for instantiating my DomainSet), but it avoids having a meaningless single argument store(SimpleDbItem) method. The interface is:

public interface DomainSetS3<T> {
    public void store(SimpleDbItem metadata, T contents);
    public void flush();
    public SimpleDbItem find(String itemName);
    public T loadContents(SimpleDbItem item);
}

In this case, I've let the abstraction leak a bit -- the only way to load the contents of an item is to find it in SimpleDB, and then do a separate request to load it. That's what it's doing under the covers, of course, and if I combined both of these into one method (perhaps returning a SimpleDbItemWithData<T>), later down the line I'd have to provide the simple find for performance reasons. When you want to store an object, you would create a SimpleDbItem with the metadata (the key and date are the only values we currently use), and store it. The DomainSetS3 would add these to the buffer for whatever partition the item key hashes to until it gets to a predetermined max buffer size. Depending on your data and access patterns, 5-20 items per S3 object is about right. When the buffer is full, I create a HashMap<String, T> and serialize the HashMap into JSON. I create a UUID and store this JSON object in S3 under that id. I then write my metadata with the UUID as an additional attribute "s3loc" to SimpleDB.

To retrieve the data from S3, I need the metadata from SimpleDB. I fetch the object in s3loc from S3, then deserialize it back to Java. This places limitations on what kind of objects you can store, but it's not too painful with Jackson -- the only real restriction is that you know ahead of time what types of objects you will be deserializing and that they not have any fields which aren't concrete types. What I get back is the HashMap of String -> T, but my key here is the same as the item key, which of course I have.

If I need to do updates, I will follow the same procedure as above, which results in leaving the original item in the S3 object, but overwriting the s3loc with the location of the updated item. We're wasting storage space, but we will always see the correct version. Without some form of transactions or conditional writes it would be impossible to guarantee that you can do updates with this scheme. It's important that anyone building a system on top of AWS understand the consistency guarantees. Again, this problem (and lots more!) would go away if Amazon exposed the "semantic reconciliation" step in the Dynamo system underlying S3. Of course, this step still exists in S3, but for simplicity the semantics are hard coded to "last write wins".

Your mileage may vary

This system doesn't solve everyone's problem. If you intend to allow outside access to your data via public S3 buckets or Cloudfront, this won't work. Additionally, if the data you are storing is too big to put into SimpleDB, it may be big enough that repeated downloads will outway the cost savings on the PUT side.

If you are doing a large number of updates, and you are storing data for a long time, you may find that the storage costs starts to be significant. I suppose if you were storing too much obsolete data, you could iterate through old items, repack these into new S3 objects, and update SimpleDB, but at this point, your SimpleDB costs will outweigh your S3 savings.

A missed opportunity?

This was an interesting technical problem, finding the best solution with the pieces at hand, but a voice from my econ classes is whispering in my head "arbitrage", meaning that Amazon actually has a perverse pricing system that charges me less money to use more resources. If I can store items in S3 by laying another system on top of it, Amazon would certainly be able to do the same, do so more cheaply, and without the inconsistent interface that you need to use if you adopt my system. Unless S3 has significantly different replication and reliability characteristics than SimpleDB, I suspect that they are overcharging for small PUTs, and this is keeping people from using the service to the fullest extent.

I've heard from Tim Robertson that he ran into the same problem with small files and scaled back what he was storing in S3 to save on costs. HotPads has also done the calculation and this was a factor in their reluctance to recommend others adopt Cloudfront as they did.

SimpleDB: a work in progress

SimpleDB is a fantastic system, but it's not quite done yet. Amazon labels it a beta, and it's certainly good enough to use in production, but there are lots of desireable features that aren't yet there. As I write this, I have a that has been running for a day just going through and deleting old items from my SimpleDB domains -- bulk delete or delete by query aren't supported yet.

As I learn more about other options, I'm having trouble justifying the hassle of dealing with things like lack of bulk delete, not being able to store large values in the table, and the size limits. HBase is becoming more mature and is now being used to directly serve content to the web. SimpleDB is likely slightly better value if you are pinching pennies (for us, SimpleDB is less than 10% of our Hadoop cluster costs), but if I were doing this again, and I were already using Hadoop, I would prefer HBase. If you have a smaller system, and don't want to deal with administration headaches, SimpleDB can still be a good choice.

About this Archive

This page is an archive of entries from June 2009 listed from newest to oldest.

May 2009 is the previous archive.

September 2009 is the next archive.

Find recent content on the main index or look in the archives to find all content.