Resource aware queue

For the TLDRs:

This blog post presents the reasoning behing a project called ResourcePriorityBlockingQueue which is a blocking queue implementation that:

  1. Allows you to assign priority on tasks, just as with a PriorityBlockingQueue.
  2. Tasks may belong to task groups where each group may have a different priority. In that case, tasks are prioritized by group first, then task priority second.
  3. For a particular instance of a ResourcePriorityBlockingQueue you give it implementations of a ResourcePrioritizer and a ResourceAllocator, which further defines to which resources a particular task is made available.
  4. Focus as been put on making the code highly concurrent with as little synchronized code as possible. Lock free code is used where applicable, with efficient use of internal data structures. This gives ResourcePriorityBlockingQueue a performance characteristic which is comparable to that of a pure PriorityBlockingQueue when used in a similar fashion.
  5. It’s available on GitHub, so feel free to poke around. Any constructive feedback is always welcome.

A lot of developers love their maps. Maps are flexible and easy to work with. They give us efficient access to an item associated with a key, and if we mix in a defined order of iteration as well, it quickly broadens our potential use cases even further.

But I tend to like queues better, particularly blocking queues. They allow us to decouple concerns, and by their very nature allows us to easily create task flow and work distribution, especially useful in multi-core environments. (Of course you can’t use a queue if you need a map, but that’s not what I’m saying either.)

But what happens when you have more than just a multi-core type of environment, but also a multi-machine cluster of resources at your disposal? One of Apache Hadoop‘s greatest strengths, when you run batch type MapReduce jobs on it, is ability to not only command a cluster of machines but to do so intelligently.

Before we can define what is implied by intelligently in this setting we need some more context. Assume you have 3 machines, each equal in hardware and software (for the sake of simplicity). These machines differ only in their distance to a set of data. Illustrated, we can imagine this picture:

machine-dataitem

In a Hadoop/HDFS context we can imagine that each machine has one data item stored locally, so machine A has got data item A stored on it’s local hard drive. The same goes for machine B (data item B) and machine C (data item C). Now, HDFS allows machine A to access and read the content of all three data items, but it’s more efficient for machine A to read what’s available locally, rather than remotely.

So then when you run a MapReduce job, Hadoop will do its best at trying to ensure that a Map task reads locally available data, which is were the intelligent distribution of tasks enter the picture.

Say instead that it wasn’t about reading locally available files (maintaining data locality if/when possible), but instead about running CPU intensive tasks were different tasks might require different number of cores on a machine to efficiently run its job. Say in that case our three machines above each have 8 cores available for running tasks giving us a total of 24 cores. Then we have 100.000 tasks in a queue, each task requiring 1, 2, 3, or 4 cores each. First of all, each task has a priority associated with them, going from low to high with medium in between. We could definitely use something like a PriorityBlockingQueue to deal with the ordering of tasks by priority.

But that would only be a very crude allocation strategy, and here’s why:

Say 22 of our 24 cores are currently being used across our 3 machines, leaving us with 2 free cores. Since we’re paying for the access to these cores we want to make use of them by allocating tasks. But it just so happens that the next task in our priority queue requires 3 free cores making it, by our own definition, impossible to allocate that task to any machine. We simple have no free resource available to this next task.

If all tasks were short running tasks, this might not be a big problem for us. With short running tasks, we’re quite sure it wouldn’t take us long to get a machine with three available cores. We’re still wasting resources by not using those 2 free cores, but we’re maybe not wasting them for too long so it might not make a large impact overall. But still, if our allocation and prioritization strategy allowed us, it would be nice to put one 2 core task or 2 single core tasks on those two free cores, right?

That’s where a little pet project of mine enters the picture. It’s called ResourcePriorityBlockingQueue, and is a flexible blocking queue implementation which allows a worker (like our machines above) to present itself in the form of a resource prior to asking for tasks from the queue. This resource could describe anything about itself, like how many cores or memory it can currently allocate, and also what data is can provide locally. It’s really up to the developer to implement the definition of a resource.

A resource description in it self isn’t very useful unless we can make decisions based on the tasks we have available and those currently available resources. That’s where the ResourcePrioritizer and ResourceAllocator enters the picture.

The job of a ResourcePrioritizer is to prioritize a resource against a given task. So a task and a resource is given to the prioritizer. If we focus purely on available cores, a resource with 2 available cores would be disqualified for any task requiring 3 or more cores. It would be highly appropriate for a task requiring exactly 2 cores (high score), and also highly appropriate for tasks requiring just one core (again high score). If we, in general, preferred to give tasks requiring more cores a higher priority compared to tasks requiring less cores, simply have the ResourcePrioritizer return a higher score for these tasks compared to less core hungry tasks.

So the ResourcePrioritizer connects single tasks against single resources through a score. The higher the score the better the fit or priority of the task against the resource. This is then handed over to the ResourceAllocator.

While a ResourcePrioritizer implementation linked a single task against a single resource, the ResourceAllocator links a single task against multiple resources. Let’s say we develop a simple resource allocator, who’s tasked with assigning a task to the best available resource at any point in time. As we know from our ResourcePrioritizer above that a higher score imply a better fit, we could simple pick the resource with the highest score and allocate the task against this one resource.

That’s all good, and the above simple implementation of a ResourcePrioritizer and ResourceAllocator would have allowed us to make use of those 2 free cores in the earlier example.

What if the example becomes more data oriented instead of computationally oriented? Let’s say we have our three machines, and we have several TB of data stored in AWS S3 or Rackspace Cloud Files? We know it’s going to take our machines quite some time (in computer terms) to download data from those sources rather than reading from it’s local hard drive. So we could, just as we did with available cores, have the resource describe which of those remote files it has cached locally. Say each machine caches the last 100 GB of downloaded files, only downloading the file if strictly needed. Using some efficient mechanism for describing which files a resource has available locally (maybe using a Bloom filter for example), a task could be allocated to resources with the aim of obtaining data locality. The task would then describe which files are required, and our ResourcePrioritizer and ResourceAllocator would place tasks as they see fit. Worst case scenario where no machine already have the file cached or machines who do have the cached files are too busy, we could still allocate the task to any other machine as they would be able to download the data from our remote source. These machines would then now have the data available locally for future tasks and would update their resource description accordingly.

One thing to note about that example is that multiple machines might cache the same set of files. Maybe your work flow is such that you typically run analytics across several tasks mainly agains the most recently available data. Hadoop wouldn’t be a good fit here unless you had a very high replication factor on your most recent files (to obtain data locality). But we know our machines would quickly have the most recent files cached because most of our tasks would require access to them.

And this brings us to the next feature of the ResourcePriorityBlockingQueue implementation. Even if we have multiple distinct resources polling tasks from the queue, it still behaves like a queue. That means that once a task is polled from the queue, no other resource would be able to poll that task. This allows our ResourcePrioritizer and ResourceAllocator implementations to speculatively allocate a single task to multiple resources. You’d do this if you couldn’t predict with perfect accuracy (which is typically the case) which resource (among several applicable resources) would poll the task first. But once the task is polled, it would never be possible for any other resource to poll the same task.

So to sum it up and again by mentioning the core “selling points”, ResourcePriorityBlockingQueue is a blocking queue implementation which:

  1. Allows you to assign priority on tasks, just as with a PriorityBlockingQueue.
  2. Tasks may belong to task groups where each group may have a different priority. In that case, tasks are prioritized by group first, then task priority second.
  3. For a particular instance of a ResourcePriorityBlockingQueue you give it implementations of a ResourcePrioritizer and a ResourceAllocator, which further defines to which resources a particular task is made available.
  4. Focus as been put on making the code highly concurrent with as little synchronized code as possible. Lock free code is used where applicable, with efficient use of internal data structures. This gives ResourcePriorityBlockingQueue a performance characteristic which is comparable to that of a pure PriorityBlockingQueue when used in a similar fashion.

You’ll find the project released under the Apache License, Version 2.0, available on GitHub. Any constructive feedback is of course welcome.