Blog of Christian Felde Technology, computers and quant finance

25Dec/120

Oracle Coherence and MapReduce

I spend a lot of my time working with Oracle Coherence. If you've never heard of Coherence it can briefly be described as a linearly scalable in-memory HashMap. By linearly scalable I mean a distributed HashMap, where each cluster member is responsible for storing a portion of the complete map. As everything is in-memory you maintain data availability by ensuring there are nodes in the cluster whom also contain a backup for the primary partition. So should a node die, one or more backups are readily available to become primary storage locations.

So as you add nodes you increase the total available storage space, with each node serving a smaler fraction of the total data content, thereby ensuring linear and horisontal scalability for getting and putting data. But getting and putting data is just a simple little use case for Coherence. Instead I find it more interesting to focus on it's parallel aggregation capabilities. As each node owns fractions of the total data population you're able to write aggregation logic which, if fanned out, allow each node to perform a partial aggregation on only its data fraction.

Let's exemplify this with some simple numbers:

Key Value
Key1 10
Key2 20
Key3 30
Key4 40

This represents a very simple data structure, directly mapped to our distributed map. We have a distinct key with an associated value. Here our values are simple numbers, and say we now want to sum these up. We would then use a summation aggregator and it would extend the com.tangosol.util.aggregator.AbstractAggregator class. There are two methods on this class we care to look at now:

public Object aggregate(Set)

and

public Object aggregateResults(Collection)

The first of these two methods is responsible for our partial aggregation. If we assume there are two members to our cluster, where member 1 maintains Key1 and Key3, while member 2 maintains Key2 and Key4, then aggregation would occur in parallel on each member. Usually there would be multiple partitions involved as well, with each member owning several partitions. However, this is details we don't need to worry about right now, so let's assume there are no partitions involved.

With two instances of our summation aggregator, one on member 1 and one on member 2, the aggregate(..) method would be called on each. The set on member 1 would contain the entries for Key1 and Key3, while member 2 would receive a set containing Key2 and Key4.

Each instance would sum up their respective sets, with member 1 returning the partial sum equal to 40 (10 + 30) and member 2 returning the second half equal to 60 (20 + 40). These two results represent our partial aggregation results.

Finally, a single cluster member, responsible for orchestrating the aggregation, receives these partial aggregation results, 40 and 60, as a collection on the aggregateResults(..) method. We clearly see why we have a Collection type here rather than a Set as there might be multiple equal partial results which need to be further reduced down to a final result. In our case the aggregator would simple add 40 + 60, returning the final sum of 100 back to the client.

Illustrated graphically it would be something like this:

Coherence SumAggregatorThe red and green boxes represent each of the two partial aggregations which occurred on our two cluster members. They each return a partial result given to a single instance where the aggregateResults(..) method is used. The combine step here would be similar to our reduce step in a MapReduce framework.

Let's now turn our focus to a common "Hello World" example when reading about MapReduce on Hadoop. Assume you have a portion of text. For the sake of BigData, let's also assume this is a significantly large portion of text which justifies us using Hadoop and MapReduce. We want to count the number of occurrences of each word, tokenized appropriately.

This would look something like this when graphically illustrated:

MapReduce

Assume here that each color represents a distinct word. The map phase of our MapReduce will emit each distinct word as a key together with a value, here being the partial word count. Then we need to reduce all the partial results, obtained throughout the cluster from our various mappers, down to one sum of all partial "red word" counts. The same goes for our green and blue words as well.

Comparing this to the Coherence example above we see one clear distinction. Our MapReduce routine will also perform the reduction phase in parallel, while Coherence will use one designated member for reducing partial results down to one final result. Clearly our example was different, and there wouldn't be any reason to have more than one reduction/combine step when summing the numbers earlier. So let's instead do a similar "Hello World" word count exercise in Coherence.

As Oracle Coherence with its distributed map based data structure imposes some structure when feeding it data, we need to fit our large portion of unstructured text onto this structure. Let's say we do this on a line by line basis, letting the key represent the byte offset in our text file. This would give us something like this:

Key Value
0 blue red green green
21 green blue blue red blue
... ...
1021 red red green blue blue green

So we now want to group by each distinct word and count the number of times we see each word. Creating a WordCount aggregator the aggregate(..) method would here tokenize the words and emit a map as its partial aggregation result. As an example, assume one member receives a set for the entries on key 0 and key 1021, the partial aggregation result would be:

Key Value
blue 3
green 4
red 3

Graphically illustrated we could imagine this:

Coherence_parallel_aggregation

Again we see there would be no parallel reduction, as only one member would become responsible for combining all partial results to one final result. This makes it even more important to ensure that as much as possible of the aggregation occurs as part of the first partial aggregation, for two reasons in particular:

  1. The final reduction step is performed by a single member/thread and anything we can do in parallel before this point will benefit our overall performance.
  2. All the partial results must fit in memory on the member performing the final reduction step. This eventually prohibits our aggregation scalability (as we have a finit amount of memory, and we're not using disk). It would also potentially throttle aggregation performance as every partial result must be delivered over the network to this one member. Ensuring compact partial results would therefore benefit us.

Clearly there are architectural differences between these two systems, which might also indicate that they are targeting different use cases. But taking these differences into account we clearly see that we are still free to think in terms of MapReduce also when doing Oracle Coherence development.

Comments (0) Trackbacks (0)

No comments yet.


Leave a comment

No trackbacks yet.