These, “refs” for short, are one of the three tools offered by Clojure to make concurrency practical and manageable. Herewith a walk-through of code that uses them to accomplish a simple task in a highly concurrent fashion.

[This is part of the Concur.next series.]

[Update: There are a ton of comments; I’ve added some reactions at the bottom.]

The Problem · It’s one that’s been much-discussed in this space: reading all the lines in a large file and computing some simple statistics. It was the discovery that this sort of script ran faster on my Mac laptop than a pricey industrial-grade Sun server that originally motivated the Wide Finder work.

What I would really like to come out of all this work would be a set of tools, accessible to ordinary programmers, that help them accomplish everyday tasks with code that makes good use of many of the compute engines on a modern multi-core processor.

I keep coming back to this “read lines and compute stats” task because it’s as simple as you can imagine, it occurs in the real world, and if I can’t manage that one maybe that’s a signal that the whole project is doomed.

If you want an application to process lines out of a file concurrently, you need, first, to parallelize the process of reading the file, and second, parallelize the application code.

Paralines · This is Clojure code I wrote to parallelize the line-reading part of the problem. I’m not sure what the right name is for a splodge of Clojure code: “Module”? “Package”? Whatever; let’s ignore Paralines for the purposes of this essay; I’ll get back to it later. It offers a function called read-lines; here’s its doc string:

([file-name destination all-done user-data params])
Reads the named file and feeds the lines one at a time to the function 'destination', which will be called in parallel on many threads with two arguments, the first being a line from the file, the second 'user-data'. When all the lines have been processed, the the 'all-done' function will be called with 'user-data' as its single argument. The 'destination' function will be called on multiple threads in parallel, so its logic must be thread-safe. There is no guarantee in which order the lines from the file will be processed. The 'params' argument is a map; the default values for the number of threads and file block-size may be overridden with the :width and :block-size values respectively.

Popular · This is the little Clojure script I wrote to solve the original Wide Finder problem: read an Apache logfile from this blog and find the ten most popular pieces; for a refresher, see the original Ruby script. It calls Paralines like so:

(read-lines "data/O.1m" proc-line report (ref {}) {})

In this invocation data/O.1m is the one-million-line sample of logfile data. proc-line is the function to which each line will be passed. report is the function which will be used to report on the results of the run.

The second-last argument, (ref {}), is more interesting. This will be passed as the second argument to each invocation of proc-line and as the sole argument to the final report invocation. It’s meant to be used to hold the program’s state as it does its computations.

{} is the Clojure idiom for an empty map (think hash table). The ref function produces a reference, a thing that (unlike most things in Clojure) can be changed, and in particular can be changed safely in a concurrent environment. The notion of using a hash table to build up state in this kind of work should hardly be surprising to anyone who’s ever written a Perl script.

And we’re going to use it just as we would in a Perl script; the hash will be keyed by the URIs of ongoing pieces, and the values will just be integers, how many times we’ve seen each one so far in the logfile.

The final argument, {}, is an empty hash table which tells Paralines to use the default block-size and thread-count.

proc-line · Paralines calls this function once for each line, the first argument being the line as a string, the second being that hash-table-reference that we set up at the beginning.

1 (def re #"GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) ")
2
3 (defn proc-line [ line so-far ]
4   (let [hit (re-find re line)
5         target (if hit (last hit) nil)]
6     (if target (record target so-far))))

Line 1: in Clojure a string with # in front is a regular expression. I find it pleasing that I can use pretty well the same regexes in Perl, Ruby, Java, and now anything built on the Java platform.

Line 3: The ref to the hash where the results live will be called so-far from here on in.

Line 4: re-find uses the java.util.regex library to scan the string.

Line 5: If re-find missed it returns nil. If it hits, it returns a list whose first member is the whole match, then $1, $2, and so on. Since I only have one ()-group, it’ll always be of size two and I can use last to fish out $1.

Line 6: Assuming the regex hit, we call record with the interesting part of the URI and the so-far hash ref.

record · Here is where we start to use Clojure references.

1 (defn record [target so-far]
2   (let [ counter (@so-far target) ]
3     (if counter
4       (incr counter)
5       (incr (new-counter so-far target)))))

Line 2: Remember that so-far isn’t the results hash table, it’s a ref. The @ means “reach through the ref and extract the value”, apparently ignoring concurrency issues.

In Clojure, a hash is just a function; so (@so-far target) says “reach through the ref, get the hash table, and look up the target in it”.

Lines 3-4: If counter is non-nil, that means we’ve already seen this target and have started counting references to it. So we just use the incr function to increment that counter.

Line 5: We didn’t find a counter for this target, so we’re going to have to use new-counter to add one to the hash table; new-counter also returns the counter it just added, so we can increment it.

incr · What could be simpler than adding one to a counter? Well, except for, this is running in (a potentially large number of) parallel threads, more than one of which might be wanting to increment this counter at the same time.

Above, we said that the values in the hash table were the occurrence counts, but that wasn’t quite accurate. They’re actually Clojure refs to the occurrence counts, so we can run in parallel.

1 (defn incr [ counter ]
2   (dosync (alter counter inc)))

All the magic is in Line 2. First, since we’re going to update a reference, we have to call dosync to launch a “transaction”, the STM voodoo that Clojure uses to make this possible and safe.

Inside the dosync, we use alter to send the inc function to the actual integer with the count. Since Clojure always uses immutable values as opposed to traditional variables, what actually happens is it creates a new integer and rejiggers the counter ref to point to that.

new-counter · This is the function that gets called when we’re trying to record the occurrence of a URI but the hash doesn’t yet contain a counter for it.

1 (defn new-counter [ so-far target ]
2   (dosync
3     (let [ c (@so-far target) counter (ref 0) ]
4       (if c
5         c
6         (do
7           (ref-set so-far (assoc @so-far target counter))
8           counter)))))

Line 2: The function wraps itself up in a dosync because it’s going to be poking and prodding at the hash reference quite a bit.

Line 3: The part where we set c may be a little puzzling; we look the target up in the hash table; but didn’t we just do that, and call this function because it wasn’t there? Be careful: We’re running in parallel, and someone else might have got in and added it, and counted a few occurrences even, while we weren’t looking, and we wouldn’t want to reset the count to zero accidentally.

Remember that when we dereferenced the table in the higher-level record function by saying (@so-far target), I said “apparently ignoring concurrency issues”. Well, not quite. If another thread had been in this new-counter function, the dosync function and Clojure’s concurrency magic would have prevented the dereference from seeing the hash in an inconsistent state.

The second half of Line 3, assuming that we’re probably going to need a new counter, goes ahead and creates a new ref-to-an-int for that purpose.

Line 4-5: If c came back non-nil, that means that, yes indeed, someone came along and created the counter and we don’t need to, we can just return it.

Line 7: This is the normal path through the function, where we update the hash table with the new counter. ref-set is like alter; I find it a bit more readable when the code is something more complicated than just inc.

assoc looks like it updates the hash table, but because we’re living in the land of immutability it doesn’t really, it creates a new one which differs only by having the new key/value pair. This isn’t as horrifically inefficient as it sounds, Clojure is smart about creating a “new” object by just twiddling pointers and re-using most of the old one.

report · We’re about done; all that’s left is the function that prints out the top ten most-referenced ongoing pieces.

(defn report [ so-far ]
  (let [sorted (sort-by last (fn [a b] (> @a @b)) @so-far) ]
    (doseq [key (keys (take 10 sorted))]
      (println (str "K " key " - " @(@so-far key))))))

There’s nothing terribly exotic here, except for we have to throw a lot of @ characters around to reach through the references to the table itself and then to the values in the key/value pairs. Which is perfectly OK since this stage is read-only.

Looking Back · Note that actual collisions between concurrent threads are probably going to be rare in this implementation. Most times when you probe the hash-table ref you’ll find that yes, we’re already counting this URI; no transaction required. And most times when you increment the counter, probably no other threads are, so once again, the STM should work smoothly.

So there, you have it; 30 or so lines of code that reliably and concurrently compute line-at-a-time statistics.

Does This Actually Work? · This section used to begin:

Not quite, at the moment. I mean, it works fine for the first 50 million lines or so of the big dataset’s quarter-billion, keeping my eight-core 32-thread SPARC T2000 maxed, and I mean smokin’, you can see the 50M blocks that Paralines is reading go by pop-pop-pop. But then it hits its heap limit and descends into garbage-collection hell and thrashes its way to a miserable single-threaded standstill.

No more! I found the bug, due to a combination of some moderate stupidity on my part and an old and well-known Java issue, and killed it.

It runs fine; see Parallel I/O for more.

The Big Question · Are the ref and @ and dosync and alter and ref-set machinery accessible to ordinary programmers? Will they figure out how to use them and use them correctly? Clearly this is a lot less painful than wrangling locks and threads, but on the other hand there is still careful thought required and places where you can go wrong.

Is this a primitive that will be one of the distinguishing features of the Java of Concurrency?

Reactions to Commenters · On the question of whether agents would be a better fit; Well, maybe. I don’t think there’s a single person on this green planet who is smart enough to predict, based on intuition, where the bottlenecks are going to happen in this kind of a setup. Once I’ve fought through the GC weirdness, I’ll see if going to an agent-based system helps.

I will point out that, until the GC conflagration broke out, this gave the appearance of running very efficiently on the Niagara box, with all the available threads maxed out. So it may be that the Clojure mechanisms have hit an 80/20 point and further improvement is incremental. We’ll see.

To Avi and others who suggested map/reduce or other more deeply-parallel approaches: Well, maybe. Do bear in mind that I’m trying to nail down The Simplest Thing That Will Really Help, not the most optimal solution regardless of complexity.

To those who suggest that my performance problem is due to concurrency contention rather than GC: I really think you’re wrong. I have the GC diagnostics printing out, and I can see it descending into GC hell when it bottlenecks.

To those advancing theories based on the number of unique counters, lines, and so on: There are around 3,000 unique URIs, and a quarter-billion lines of data, just under 10% of which match the regex and will lead to an update.



Contributions

Comment feed for ongoing:Comments feed

From: Patrick (Nov 12 2009, at 12:19)

Would you like to post the entire code so I/we can poke at it?

Thanks for the detailed walk-thru.

[link]

From: John Hart (Nov 12 2009, at 12:21)

The method used (dosync locking around counter increments) only works if (a) the locking mechanism is cheap, and (b) the set of hits are distributed somewhat flatly. You code up two assumptions:

"Most times when you probe the hash-table ref you’ll find that yes, we’re already counting this URI; no transaction required. And most times when you increment the counter, probably no other threads are, so once again, the STM should work smoothly."

If my each of my quarter-billion hits are to unique URLs, assumption (1) is false. If 70% of my quarter-billion hits are to the same page, then assumption (2) is false.

Why a single shared global data structure? Why not one user-data per thread, with results summing at the end? Are you trying not to do that because "it's too complicated for ordinary programmers"? I think the locking code (especially the "oh right I gotta rememember that someone may have already created this entry" in lines 4-5 of new-counter) is more complicated & error-prone than exposing knowledge of threads/processes/etc but not requiring global locks.

[link]

From: Chas Emerick (Nov 12 2009, at 12:46)

I don't see a link to some source, but I'm not clear on why you're using refs. Refs are for tracking *coordinated* change. I've not followed the WideFinder "challenge", but it seems like each unit of work (a single line from a log file) is entirely encapsulated, so there's no need to constantly coordinate intermediate results line-by-line.

Given that, it seems that either (a) a simple pmap across a line-seq of the input file swap!-ing the aggregate state in a single atom or (b) dispatching lines to a set of agents (which do not correspond 1:1 with native threads, FYI) in round-robin fashion would be better.

[link]

From: Phil (Nov 12 2009, at 13:16)

I've got a version that uses agents; clocks in at twelve lines:

http://p.hagelb.org/wide_finder.clj.html

Haven't run it over any really beefy logs to get perf numbers yet though.

[link]

From: Ben (Nov 12 2009, at 13:54)

A splodge of Clojure code should obviously be called a "mojule".

[link]

From: jwhitlark (Nov 12 2009, at 16:31)

I believe your incr function could use commute instead of alter. see http://clojure.org/api#commute

I don't know how much improvement it would make, though.

[link]

From: Patrick Logan (Nov 12 2009, at 16:59)

I'd like to noodle on this, but some initial impressions... I don't think you have to do so many dereferences, nor do I think you have to synchronize so much.

From what I understand of the problem so far there are a few distinct cases...

1. You have a URL that is not in the map yet.

In this case if the map is in an atom, swap! the map with one where the URL is mapped to something with the value of 1.

2. You have a URL that is already in the map.

In this case if the URL is mapped to an atom then swap! the value in that atom by inc'ing it. You do not need to swap! the map in this case.

I think those two cases could be the core of the counting aspect of the problem. There's probably something lurking around the corner to simplify taking the top ten counts as well.

e.g. you may not want to sort them while still counting but you may keep them in a vector then either sort the vector in one thread or partition the vector into a thread per range of length N to coordinate sorting concurrently.

Or something... I think the next step for you is to think about using atoms and swap, dereferencing and syncing much less, and maybe sorting concurrently.

[link]

From: pmf (Nov 12 2009, at 17:09)

If you'd show the implementation of your read-lines-function, we might be able to pinpoint your leak. My superficial assumption is that there are implicit or explicit local references to the blocks in read-lines that will only get GCd on exit from read-lines.

[link]

From: Joshua (Nov 12 2009, at 18:25)

We have some sims that run into the same issue with the Java garbage collector. It would be nice if there was more available to use different garbage collectors or tweak it to not hammer the system in these cases.

[link]

From: Jeremy Corbett (Nov 12 2009, at 18:43)

This may not help but its worth trying -XX:+DoEscapeAnalysis. Any objects that stay in a thread will not touch the heap.

[link]

From: Patrick Logan (Nov 12 2009, at 20:26)

Correction to my first comment...

The map should be held by an agent not an atom. The action for the agent would be to update the map for the URL as with an atom.

But with an atom there would be a race condition where some other thread updated the map for the same URL. An agent would serialize actions.

If the agent has just previously updated the map for the same URL the action would inc the counter for the URL. Otherwise it would initialize a counter to 1 for the URL.

The counter would still be an atom as the only requirement is to inc whatever value the counter currently has.

[link]

From: Dan Davies Brackett (Nov 12 2009, at 20:34)

If you are looking around, and asking for help, some details of what you've found so far would help anyone interested in contributing help =) For example, when you kill -QUIT the JVM for a stack trace on standard out, is GC (or a wait for it) actually what's going on? Have you run any heap dumps through jhat or whatever that analyzer is called to see the objects-by-type bar charts?

[link]

From: Mark (Nov 12 2009, at 20:40)

I don't see any obvious memory leaks in this code, maybe the problem is in read-lines which you didn't show?

Your code will likely have even better throughput if you use atoms instead of refs, since you're not really doing anything that requires the full power of refs.

Using a ref/atom for both the hash table and the individual counters is an interesting approach, it would be very interesting to see whether it makes much of a difference to only put the overall hash table in a ref/atom as one big immutable object. If you have done this experiment already, it would be great if you would report these results.

[link]

From: tkil (Nov 12 2009, at 22:05)

@Joshua -- "It would be nice if there was more available to use different garbage collectors or tweak it to not hammer the system in these cases."

There is some flexibility. A few years ago, we had a server load that would go in to GC land for 30 seconds every 10 minutes or so, and would bring our web page to a halt.

It turns out that there's a much more incremental GC algorithm that you can specify; from memory, it was the "server" algorithm (instead of the "workstation" algo?), and it had high / low water knobs, etc.

Might be worth looking into -- but then again, this is all from 5+ years ago, so I have no idea if the current JVMs still offer this.

http://java.sun.com/docs/hotspot/gc5.0/gc_tuning_5.html

Looks like the "concurrent low pause collector" is the one I'm thinking of.

Good luck!

[link]

From: Mark (Nov 13 2009, at 00:01)

Now that I've read Patrick Logan's comment, I agree that holding the map in an agent is even better than holding it in an atom.

I'm pretty sure that once you switch to an agent, you can just use a regular immutable hash map as the value inside the agent, rather than a map where the counters are atoms or refs. So the code will be cleaner as well.

[link]

From: Avi Bryant (Nov 13 2009, at 01:40)

I'm with John Hart here - trying to do this in a single data structure seems odd, unless that's explicitly part of the point of the exercise. This is the kind of problem for which map/reduce is, to me, very obviously the right pattern.

[link]

From: Mark (Nov 13 2009, at 02:26)

I think that Phil's version could run into problems if the dispatch to the agents happens faster than the agents can process them -- the agents mailbox would fill up which could cause a crash.

[link]

From: gvb (Nov 13 2009, at 04:55)

@John Hart has identified the problem, it is with dosync.

What you are doing is creating a global table and then serializing *all* access to it. In database terms, you are *table* locking when you really want to be *row* locking most of the time. When you create a new accumulator in the table (e.g. insert row), you will have to lock the table. There is no way around this, but it will happen (relatively) infrequently. When you increment an existing accumulator (update row), you want to *row* lock, not table lock.

I suspect your "memory leak/garbage collection" is really a table locking issue. What your program is doing is creating a HUGE number of threads, each running a regex on a line. Splitting the input file is non-blocking. Running the regex on a line is non-blocking. As a result, these run as fast as possible and as many threads as possible are created.

Then, when it comes time to update the accumulator in the table, one thread gets in and all the rest are blocked. Since the input is spawning at a HUGE rate effectively an infinite number of threads, it overwhelms your system because of the dosync (global lock) bottleneck.

This is a *very* easy theory to test: if you remove the dosync calls, does you code run fast (and to completion)? It will produce the wrong answers, but the exercise will tell you if your dosync is bottlenecking your code.

What you want for this algorithmic paradigm is a table lock only when you are creating a new "row", and a lock per row. This allows most of the operations to run in parallel since most of the incrementers will be incrementing different "rows" (IP addresses). I'm not familiar with Clojure, so I don't know if that is possible. Your use of dosync implies only a global lock. You need to create a per-entry lock and use that to increment the accumulator.

I think what you really want is a different algorithmic paradigm. For instance, in an Erlang world, you would create an accumulator thread per IP address that needed to be incremented. The regexp threads would no longer increment the accumulator directly (with a global or even a row lock), but instead would send a message "increment yourself" to the appropriate accumulator thread. The Erlang message interface would inherently serialize the access to each unique IP address accumulator and would inherently parallelize the incrementing of disjoint IP addresses.

gvb

[link]

From: Shawn Hoover (Nov 13 2009, at 13:01)

record/incr look like a race condition. record reads the map outside of dosync and creates a new value to insert. Then incr does the update in dosync, because it has to. Multiple threads could read in parallel, create new counters (in parallel), and insert them (in serial).

[link]

From: Patrick Logan (Nov 13 2009, at 13:37)

"trying to do this in a single data structure seems odd, unless that's explicitly part of the point of the exercise"

Yeah, I think this is where it gets into the specific details of the problem. Improving one step beyond the code in the post I think would be to put one map into an agent, with the values for each key being an atom.

Potential downside is if there are a *lot* of unique keys then adding each one to the single map is a serial thing. There could be multiple maps to allow more concurrency.

A big point for concurrency though is not to update a map on every URL increment, especially once tha URL is already "known".

One of the previous posts in this topic points out that this exercise is a microbenchmark for SMP coordination, so this particular post seems to be one of exploring the convenience foremost and performance somewhat of various clojure mechanisms.

[link]

From: Ross Thomas (Nov 13 2009, at 14:31)

Hi Tim,

As cool as refs are, I'm with others in believing they're overkill in this situation :)

Here's a version I made that takes advantage of an even cooler feature of Clojure: lazy infinite sequences. I think it's correct, though I can't speak to its performance. Would be interesting to investigate how it scales.

http://pastebin.com/f968c762

[link]

author · Dad · software · colophon · rights
picture of the day
November 11, 2009
· Technology (77 fragments)
· · Concurrency (70 more)

By .

I am an employee
of Amazon.com, but
the opinions expressed here
are my own, and no other party
necessarily agrees with them.

A full disclosure of my
professional interests is
on the author page.