This is the fourth progress report from the Wide Finder Project. Following on my earlier fumbling at the controls of the Erlang race-car, several others joined the conversation. For particularly smart input, check out the Erlang questions mailing list and the already-linked-to Steve Vinoski piece, Tim Bray and Erlang. Other voices have questioned the project’s sanity, and brought Scala and Haskell to bear on the problem. But let’s push all that on the stack, and ask ourselves this question: what if processes were free? Answering it turns out to be all sorts of fun; I seize the Erlang controls back again, and steer it with a firm steady hand into a brick wall at top speed.

Erlang’s proponents claim that processes are free, pretty nearly. And in fact, Steve Vinoski’s findings support that claim. He had the radical idea of parallelizing not only the counting of totals but the breaking-up of the file into lines and the pattern matching; which never would have occurred to me. His algorithm has a single parameter: the number of Erlang processes. He observed that the curve dropped off somewhere north of 400.

While I admire Steve’s work, there are three things about it that bothered me. First, the ad-hoc recursive binary-match probing to find the line breaks to split on. Second, that irritating number-of-processes parameter (as an application programmer, I really don’t want to think about the number of processes running my code any more than I want to think of the organization of the page table that intercepts my memory references). Finally, the fact that he had to read the whole file into memory in one unit. Remember, my logfiles are a quarter-gig per week, and I have years’ worth of data, so that’s just not gonna fly.

On the first, Steve was fighting an Erlang shortcoming: there’s a tokenize function for strings but not for binaries. Deal with it.

How Many Processes? · In an ideal many-core world, process creation should be as close to free as achievable. The Erlvangelists claim that they get closer to the ideal than anyone. In that ideal world, you’d simply assign every conceivable independent unit of computation to its own process and let the infrastructure sort out the problems of getting them scheduled appropriately.

The Wide Finder problem has the following independent units of computation:

  • Breaking chunks of data out of the file into lines.

  • Processing each line to establish whether it is an article fetch.

  • Counting fetches for each article.

So, let’s assign each of those units to a separate process, and see what happens. There’ll be a lot of little processes that do one job, hand off the work, and exit, in a cascade.

Finding Lines in Blocks · In an ideal world, you’d split your file up into blocks and hand each block to a separate process. There’s a fly in the ointment; the data is organized into newline-separated “lines”, and they don’t arrange themselves neatly across block boundaries.

So what you do is, you split the block into lines, you accept that the last chunk won’t be a complete line, so you pass that on to the next chunk (and of course you attach the equivalent fragment from the previous line to the front of this chunk before splitting).

%% The file-chunk cascade
chunk() ->
    receive
	{ Previous, File, Counter, Source } ->
	    %% read one chunk of the file
	    case file:read(File, ?BUFSIZ) of
		{ok, Chunk} ->
		    split_chunk(Previous, Chunk, File, Counter, Source);
		eof -> done
	    end
    end.

%% mostly to avoid excessive indentation
split_chunk(Previous, Chunk, File, Counter, Source) ->

    %% When we split a chunk into lines on \n, the last piece probably 
    %% doesn't end with a newline.  The "Previous" argument is that last
    %% piece from the prior chunk; we stick it on the front, and split
    %% *that* into lines
    LinesPlus = string:tokens(Previous ++ Chunk, "\n"),

    %% subtract the trailing line fragment from the list of lines to process
    Trailer = lists:last(LinesPlus),
    Lines = lists:delete(Trailer, LinesPlus),
    %% io:format("**** Chunk split into ~p lines~n", [length(Lines)]),

    %% a new process for the next chunk
    spawn(fun cascade:chunk/0) ! { Trailer, File, Counter, Source },

    %% a new process for each line in this chunk
    lists:foreach(fun(L) ->
			  spawn(fun cascade:line/0) ! { L, Counter, self() }
		  end, Lines),

    %% wait for all the line processing to be done
    wait_for_lines(length(Lines)),

    %% let the mainline know this chunk is done
    Source ! done.

You have to do the line-splitting to isolate the trailing fragment before you can start processing the next block. But, in a world where processes are free, you can start processing the next block (as above) before you start processing the lines in this block. With a block size of 32k, you get on average 150 or so lines from each.

Looking at Lines · Here’s the line processing.

%% The per-line cascade
line() ->
    receive
	{ Line, Counter, Chunker } ->
	    %% Pull out the URI field
	    Uri = lists:nth(7, string:tokens(Line, " ")),
	    case Uri of

		%% we'll count it if it matches the pattern and doesn't
		%% contain a '.'
		"/ongoing/When" ++ Rest ->
		    case lists:member($., Rest) of

			%% counting is done in another process
			false -> counter:incr(Rest, Counter);
			true -> pass
		    end;
		_ -> done
	    end,

	    %% let the chunk processor know this line is done
	    Chunker ! done
    end.

So each chunk of input data, and each line from the chunk, gets its own process. That’s not all. That counter:incr call turns out to give each unique fetched-article URI its own counter process; details here.

To be precise: seven thousand or so block-splitting processes, another short-lived process for each of the 1167948 lines, and a persistent process for each of the 4706 unique URIs.

The Results · First of all, you can get the code: cascade.erl and counter.erl.

This represents, I think, a theoretical maximum in the number of independent processes we’re throwing at the problem, with only one parameter; how big a chunk of file we read at a time.

How does it run? Well, like dogshit, more or less. Applied to the whole 250M file containing a week’s worth of ongoing data, the code burned 55 seconds of CPU. Which is twenty or more times slower than the naïve Ruby implementation. Which means we’re not done yet.

Have I profiled it yet? No. I will. And I’m not entirely without hope that this conceptually-pure approach can be made to run really fucking fast in the real world.

But before I buckle down to all that workaday stuff, I wanted to present the processes-are-free thought experiment in its purest form. Because this stuff is pure brain candy.



Contributions

Comment feed for ongoing:Comments feed

From: Doug Cutting (Sep 27 2007, at 16:08)

What we do in Hadoop is observe the convention that chunks starts at the first line break after its start position and ends at the first line break after its end position (except the first chunk). Wouldn't that be simpler here too?

[link]

From: Tim (Sep 27 2007, at 16:29)

This code treats a chunk as from the last linebreak before the file block to the last linebreak in the file block. Which is at the end of the day equivalent in its effect.

I did it this way because I only need to process one block at a time, plus the leftover from the previous block; to do it your way I'd have to have this block and the next block available, right?

Seems like it'd probably shake out the same, or am I missing something obvious?

[link]

From: Joe Cheng [MSFT] (Sep 27 2007, at 17:08)

Have you given up on "approximately as compact and readable" as the Ruby version? Cause seriously... WTF.

[link]

From: dbt (Sep 27 2007, at 17:32)

Why does loading quarter gig files one at a time have to sound crazy?

Personally, I'd root for a driver that lets you mmap an entire file of arbitrary size into a binary, check out phk's "varnish" web accelerator for an example of that mindset. In a 64 bit environment and even most 32 bit ones, address space is REAL cheap and easier than tens of thousands of arbitrary IO syscalls.

I don't know how hard that would be to do in erlang, though. I don't know the internals well enough (yet).

[link]

From: John Cowan (Sep 27 2007, at 21:56)

A speculation: the kernel and the disk driver are doing read-ahead for the benefit of your Ruby solution, which swamps any advantage from reading multiple blocks in random order into random processes.

[link]

From: Ion Morega (Sep 28 2007, at 01:59)

I'm not sure I'm reading this right, but it looks like you're splitting a chunk into lines, taking the last (partial) line, and then spawning a thread for the next chunk. You could squeeze out a little more parallelism by finding the last line, and spawning the next reader, *before* you split the current chunk into lines.

Anyway, it's really entertaining to read this series, keep up the good work and good luck!

[link]

From: Lars Marius Garshol (Sep 28 2007, at 03:53)

What I find very surprising is that nobody seems to have brought up Google's MapReduce yet. This whole idea of paralellizing traversal of a log file for processing is *exactly* what MapReduce is made for. Google even solves the problem of paralellized IO by having the input file distributedly stored in the file system to begin with.

The MapReduce paper gives an excellent walkthrough of the whole thing:

http://labs.google.com/papers/mapreduce-osdi04.pdf

[link]

From: roberthahn (Sep 28 2007, at 05:37)

Maybe I'm missing something here. All the reading I've done on Erlang suggests that it is optimized for building servers.

Instead of building a 'script' that processes logfiles whenever you feel like it, why not build a server that eats log entries as they're being generated, and process requests on the fly? For the multiple gigs of data that you have already, I would suggest designing another app to preprocess them into a state that this server can build on - yeah, it might take 7x longer than Ruby, but you'd only have to do this once.

[link]

From: Doug Cutting (Sep 28 2007, at 12:19)

Maybe I'm missing something, but the way you're doing it requires that you process blocks sequentially, no? Or else that you scan backwards in the file, which is probably not advisable. If you instead chop the file at byte boundaries and use the first-line-break-after-block-start rule, then you can process the blocks in an arbitrary order. Splitting doesn't have to look at the file, it just chops at arbitrary block boundaries, and then per-block processing is independent, parallelizeable and proportional to block size (unless lines get crazy long).

[link]

author · Dad · software · colophon · rights
picture of the day
September 27, 2007
· Technology (77 fragments)
· · Concurrency (70 more)
· · Erlang (4 more)

By .

I am an employee
of Amazon.com, but
the opinions expressed here
are my own, and no other party
necessarily agrees with them.

A full disclosure of my
professional interests is
on the author page.