-module(cascade).
-export([pop/1, chunk/0, line/0]).

-define(BUFSIZ, 32768).

pop(FName) ->
    {ok, File} = file:open(FName, read),
    Counter = counter:new(),
    Size = filelib:file_size(FName),
    case Size rem ?BUFSIZ of
	0 -> Chunks = Size div ?BUFSIZ;
	_ -> Chunks = (Size div ?BUFSIZ) + 1
    end,
    %% io:format("******** Chunk count: ~p~n", [ Chunks ]),

    %% Fire off the file-chunk cascade
    spawn(fun cascade:chunk/0) ! { "", File, Counter, self() },

    %% stop till all the chunk processes are done
    wait_for_chunks(Chunks),

    %% payload
    Sorted = lists:keysort(2, counter:counts(Counter)),
    io:format("~p unique URIs~n", [length(Sorted)]),
    lists:reverse(lists:nthtail(length(Sorted) - 10, Sorted)).

wait_for_chunks(0) -> done;
wait_for_chunks(Remaining) ->
    receive
	done -> wait_for_chunks(Remaining - 1)
    end.

%% The file-chunk cascade
chunk() ->
    receive
	{ Previous, File, Counter, Source } ->
	    %% read one chunk of the file
	    case file:read(File, ?BUFSIZ) of
		{ok, Chunk} ->
		    split_chunk(Previous, Chunk, File, Counter, Source);
		eof -> done
	    end
    end.

%% mostly to avoid excessive indentation
split_chunk(Previous, Chunk, File, Counter, Source) ->

    %% When we split a chunk into lines on \n, the last piece probably 
    %% doesn't end with a newline.  The "Previous" argument is that last
    %% piece from the prior chunk; we stick it on the front, and split
    %% *that* into lines
    LinesPlus = string:tokens(Previous ++ Chunk, "\n"),

    %% subtract the trailing line fragment from the list of lines to process
    Trailer = lists:last(LinesPlus),
    Lines = lists:delete(Trailer, LinesPlus),
    %% io:format("**** Chunk split into ~p lines~n", [length(Lines)]),

    %% a new process for the next chunk
    spawn(fun cascade:chunk/0) ! { Trailer, File, Counter, Source },

    %% a new process for each line in this chunk
    lists:foreach(fun(L) ->
			  spawn(fun cascade:line/0) ! { L, Counter, self() }
		  end, Lines),

    %% wait for all the line processing to be done
    wait_for_lines(length(Lines)),

    %% let the mainline know this chunk is done
    Source ! done.

wait_for_lines(0) -> done;
wait_for_lines(Remaining) ->
    receive
	done -> wait_for_lines(Remaining - 1)
    end.

%% The per-line cascade
line() ->
    receive
	{ Line, Counter, Chunker } ->
	    %% Pull out the URI field
	    Uri = lists:nth(7, string:tokens(Line, " ")),
	    case Uri of

		%% we'll count it if it matches the pattern and doesn't
		%% contain a '.'
		"/ongoing/When" ++ Rest ->
		    case lists:member($., Rest) of

			%% counting is done in another process
			false -> counter:incr(Rest, Counter);
			true -> pass
		    end;
		_ -> done
	    end,

	    %% let the chunk processor know this line is done
	    Chunker ! done
    end.