-module(cascade). -export([pop/1, chunk/0, line/0]). -define(BUFSIZ, 32768). pop(FName) -> {ok, File} = file:open(FName, read), Counter = counter:new(), Size = filelib:file_size(FName), case Size rem ?BUFSIZ of 0 -> Chunks = Size div ?BUFSIZ; _ -> Chunks = (Size div ?BUFSIZ) + 1 end, %% io:format("******** Chunk count: ~p~n", [ Chunks ]), %% Fire off the file-chunk cascade spawn(fun cascade:chunk/0) ! { "", File, Counter, self() }, %% stop till all the chunk processes are done wait_for_chunks(Chunks), %% payload Sorted = lists:keysort(2, counter:counts(Counter)), io:format("~p unique URIs~n", [length(Sorted)]), lists:reverse(lists:nthtail(length(Sorted) - 10, Sorted)). wait_for_chunks(0) -> done; wait_for_chunks(Remaining) -> receive done -> wait_for_chunks(Remaining - 1) end. %% The file-chunk cascade chunk() -> receive { Previous, File, Counter, Source } -> %% read one chunk of the file case file:read(File, ?BUFSIZ) of {ok, Chunk} -> split_chunk(Previous, Chunk, File, Counter, Source); eof -> done end end. %% mostly to avoid excessive indentation split_chunk(Previous, Chunk, File, Counter, Source) -> %% When we split a chunk into lines on \n, the last piece probably %% doesn't end with a newline. The "Previous" argument is that last %% piece from the prior chunk; we stick it on the front, and split %% *that* into lines LinesPlus = string:tokens(Previous ++ Chunk, "\n"), %% subtract the trailing line fragment from the list of lines to process Trailer = lists:last(LinesPlus), Lines = lists:delete(Trailer, LinesPlus), %% io:format("**** Chunk split into ~p lines~n", [length(Lines)]), %% a new process for the next chunk spawn(fun cascade:chunk/0) ! { Trailer, File, Counter, Source }, %% a new process for each line in this chunk lists:foreach(fun(L) -> spawn(fun cascade:line/0) ! { L, Counter, self() } end, Lines), %% wait for all the line processing to be done wait_for_lines(length(Lines)), %% let the mainline know this chunk is done Source ! done. wait_for_lines(0) -> done; wait_for_lines(Remaining) -> receive done -> wait_for_lines(Remaining - 1) end. %% The per-line cascade line() -> receive { Line, Counter, Chunker } -> %% Pull out the URI field Uri = lists:nth(7, string:tokens(Line, " ")), case Uri of %% we'll count it if it matches the pattern and doesn't %% contain a '.' "/ongoing/When" ++ Rest -> case lists:member($., Rest) of %% counting is done in another process false -> counter:incr(Rest, Counter); true -> pass end; _ -> done end, %% let the chunk processor know this line is done Chunker ! done end.