Posts for the month of October 2007

Learning Coding Binary (Was Tim's Erlang Exercise - Round VI)

>>> Updated Nov 1:
Tim tested tbray5.erl on T5120, for his 971,538,252 bytes of data in 4,625,236 lines log file, got:

real    0m20.74s
user    3m51.33s
sys     0m8.00s

The result was what I guessed, since the elapsed time of my code was 3 times of Anders' on my machine. I'm glad that Erlang performs linearly on different machines/os.

My code not the fastest. I did not apply Boyer-Moore searching, thus scan_chunk_1/4 has to test/skip binary 1byte by 1byte when not exactly matched. Anyway, this code shows how to code binary efficiently, and demos the performance of traversing binary byte by byte (the performance is not so bad now, right?). And also, it's what I want: a balance between simple, readable and speed.

Another approach for lazy man is something binary pattern match hacking, we can modify scan_chunk_1/4 to:

scan_chunk_1(Bin, DataL, S, Dict) when S < DataL - 34 ->
    Offset = 
      case Bin of
          <<_:S/binary,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 
              34;
          <<_:S/binary,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              35;
          <<_:S/binary,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              36;
          <<_:S/binary,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              37;
          <<_:S/binary,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              38;
          <<_:S/binary,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              39;
          <<_:S/binary,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              40;
          <<_:S/binary,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              41;
          <<_:S/binary,_,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              42;
          <<_:S/binary,_,_,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              43;
          _ -> undefined
      end,
    case Offset of
        undefined -> scan_chunk_1(Bin, DataL, S + 10, Dict);
        _ ->
            case match_until_space_newline(Bin, S + Offset) of
                {true, E} ->
                    Skip = S + Offset - 12, L = E - Skip,
                    <<_:Skip/binary,Key:L/binary,_/binary>> = Bin,
                    scan_chunk_1(Bin, DataL, E + 1, dict:update_counter(Key, 1, Dict));
                {false, E} -> 
                    scan_chunk_1(Bin, DataL, E + 1, Dict)
            end
    end;
scan_chunk_1(_, _, _, Dict) -> Dict.

The elapsed time dropped to 1.424 sec immediatley vs 2.792 sec before, speedup about 100%, on my 4-CPU linux box.

If you are patient, you can copy-paste 100... such lines :-) (in this case, I'd rather to pick Boyer-Moore), and the elapsed time will drop a little bit more, but not much after 10 lines or so.
========

>>> Updated Oct 29:
Pihis updated his WideFinder, applied guildline II, and beat Ruby on his one-core box. And he also modified Anders's code by removing all un-necessary remaining binary bindings (which cause un-necessay sub-binary splitting), then, Steve tested the refined code, and got 0.567s on his famous 8-CPU linux box. Now, we may reach the real Disk/IO bound, should we try parallelized file reading? but, I've tired of more widefinders. BTW, May we have regexped pattern match in syntax level? The End.
========

>>> Updated Oct 26:
Code cleanup
========

Binary usaully is more efficent than List in Erlang.

The memory size of Binary is 3 to 6 words plus Data itself, the Data can be allocated / deallocated in global heap, so the Data can be shared over function calls, shared over processes when do message passing (on the same node), without copying. That's why heap size affects a lot on binary.

The memory size of List is 1 word per element + the size of each element, and List is always copying between function calls, on message passing.

In my previous blogs about Tim's exercise, I suspected the performance of Binary traverse. But, with more advices, experience, it seems binary can work very efficient as an ideal Dataset processing struct.

But, there are some guidelines for efficient binary in Erlang, I'll try to give out here, which I learned from the exercise and experts.

I. Don't split a binary unless the split binaries are what you exactly want

Splitting/combining binaries is expensive, so when you want to get values from a binary at some offsets:

Do

<<_:Offset/binary, C, _/binary>> = Bin,
io:format("Char at Offset: ~p", [C]).

Do Not *

<<_:Offset/binary, C/binary, _/binary>> = Bin,
io:format("Char at Offset: ~p", [C]).

* This may be good in R12B

And when you want to split a binary to get Head or Tail only:

Do

<<Head:Offset/binary,_/binary>> = Bin.

Do Not

{Head, _} = split_binary(Bin, Offset).

II. Calculate the final offsets first, then split it when you've got the exactly offsets

When you traverse binary to test the bytes/bits, calculate and collect the final offsets first, don't split binary (bind named Var to sub-binary) at that time. When you've got all the exactly offsets, split what you want finally:

Do

get_nth_word(Bin, N) ->
    Offsets = calc_word_offsets(Bin, 0, [0]),
    S = element(N, Offsets),
    E = element(N + 1, Offsets),
    L = E - S,
    <<_:S/binary,Word:L/binary,_/binary>> = Bin,
    io:format("nth Word: ~p", [Word]).

calc_word_offsets(Bin, Offset, Acc) when Offset < size(Bin) ->
    case Bin of
        <<_:Offset/binary,$ ,_/binary>> ->
            calc_word_offsets(Bin, Offset + 1, [Offset + 1 | Acc]);
        _ ->
            calc_word_offsets(Bin, Offset + 1, Acc)
    end;
calc_word_offsets(_, _, Acc) -> list_to_tuple(lists:reverse(Acc)).

Bin = <<"This is a binary test">>,
get_nth_word(Bin, 4). %  <<"binary ">>

Do Not

get_nth_word_bad(Bin, N) ->
    Words = split_words(Bin, 0, []),
    Word = element(N, Words),
    io:format("nth Word: ~p", [Word]).

split_words(Bin, Offset, Acc) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            split_words(Rest, 0, [Word | Acc]);
        <<_:Offset/binary,_,_/binary>> ->
            split_words(Bin, Offset + 1, Acc);
        _ -> list_to_tuple(lists:reverse([Bin | Acc]))
    end.

Bin = <<"This is a binary test">>,
get_nth_word_bad(Bin, 4). %  <<"binary">>

III. Use "+h Size" option or [{min_heap_size, Size}] with spawn_opt

This is very important for binary performance. It's somehow a Key Number for binary performance. With this option set properly, the binary performs very well, otherwise, worse.

IV. Others

  • Don't forget to compile to native by adding "-compile([native])." in your code.
  • Maybe "+A Size" to set the number of threads in async thread pool also helps a bit when do IO.

Practice

Steve and Anders have pushed widefinder in Erlang to 1.1 sec on 8-CPU linux box. Their code took about 1.9 sec on my 4-CPU box. Then, how about a concise version?

According to above guide, based on my previous code and Per's dict code, Luke's spawn_worker, I rewrote a concise and straightforward tbray5.erl (less than 80 LOC), without any extra c-drived modules for Tim's exercise , and got about 2.972 sec for 1 milli lines log file, and 15.695 sec for 5 milli lines, vs no-parallelized Ruby's 4.161 sec and 20.768 sec on my 2.80Ghz 4-CPU Intel Xeon linux box:

BTW, using ets instead of dict is almost the same.

$ erlc -smp tbray5.erl
$ time erl +h 8192 -smp -noshell -run tbray5 start o1000k.ap -s erlang halt

real    0m2.972s
user    0m9.685s
sys     0m0.748s

$ time erl +h 8192 -smp -noshell -run tbray5 start o5000k.ap -s erlang halt

real    0m15.695s
user    0m53.551s
sys     0m4.268s

On 2.0GHz 2-core MacBook (Ruby code took 2.447 sec):

$ time erl +h 8192 -smp -noshell -run tbray5 start o1000k.ap -s erlang halt

real    0m3.034s
user    0m4.853s
sys     0m0.872s

The Code: tbray5.erl

-module(tbray5).
-compile([native]).
-export([start/1]).

-define(BUFFER_SIZE, (1024 * 10000)).

start(FileName) ->
    Dicts = [wait_result(Worker) || Worker <- read_file(FileName)],
    print_result(merge_dicts(Dicts)).
              
read_file(FileName) ->
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file_1(File, 0, []).            
read_file_1(File, Offset, Workers) ->
    case file:pread(File, Offset, ?BUFFER_SIZE) of
        eof ->
            file:close(File),
            Workers;
        {ok, Bin} ->
            DataL = split_on_last_newline(Bin),
            Worker = spawn_worker(self(), fun scan_chunk/1, {Bin, DataL}),
            read_file_1(File, Offset + DataL + 1, [Worker | Workers])
    end.

split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)).   
split_on_last_newline_1(Bin, S) when S > 0 ->
    case Bin of
        <<_:S/binary,$\n,_/binary>> -> S;
        _ -> split_on_last_newline_1(Bin, S - 1)
    end;
split_on_last_newline_1(_, S) -> S.

scan_chunk({Bin, DataL}) -> scan_chunk_1(Bin, DataL, 0, dict:new()).
scan_chunk_1(Bin, DataL, S, Dict) when S < DataL - 34 ->
    case Bin of
        <<_:S/binary,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
            case match_until_space_newline(Bin, S + 34) of
                {true, E} ->
                    Skip = S + 23, L = E - Skip,
                    <<_:Skip/binary,Key:L/binary,_/binary>> = Bin,
                    scan_chunk_1(Bin, DataL, E + 1, dict:update_counter(Key, 1, Dict));
                {false, E} ->
                    scan_chunk_1(Bin, DataL, E + 1, Dict)
            end;
        _ -> scan_chunk_1(Bin, DataL, S + 1, Dict)
    end;
scan_chunk_1(_, _, _, Dict) -> Dict.

match_until_space_newline(Bin, S) when S < size(Bin) ->
    case Bin of
        <<_:S/binary,10,_/binary>> -> {false, S};
        <<_:S/binary,$.,_/binary>> -> {false, S};
        <<_:S/binary,_,$ ,_/binary>> -> {true, S + 1};
        _ -> match_until_space_newline(Bin, S + 1)
    end;
match_until_space_newline(_, S) -> {false, S}.
    
spawn_worker(Parent, F, A) ->
    erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end).

wait_result({Pid, Ref}) ->
    receive
        {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end;
        {'DOWN', Ref, _, _, Reason} -> exit(Reason)
    end.
    
merge_dicts([D1,D2|Rest]) ->
    merge_dicts([dict:merge(fun(_, V1, V2) -> V1 + V2 end, D1, D2) | Rest]);
merge_dicts([D]) -> D.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

Tim's Erlang Exercise - Summary

>>> Updated Nov 1:
Tim tested my last attempt tbray5.erl, which was described on Learning Coding Binary (Was Tim's Erlang Exercise - Round VI), got for his 971,538,252 bytes of data in 4,625,236 lines log file:

real    0m20.74s
user    3m51.33s
sys     0m8.00s

It's not the fastest, since I did not apply Boyer-Moore searching. But it's what I want: a balance between simple, readable and speed.
========

>>> Updated Oct 24:
The Erlang code can be faster than un-parallelized Ruby, a new version run 2.97 sec on the 4-CPU box: Learning Coding Binary (Was Tim's Erlang Exercise - Round VI)
========

>>> Updated Oct 22:
As Bjorn's suggestion, I added "+h 4096" option for 'erl', which means "sets the default heap size of processes to the size 4096", the elapsed time dropped from 7.7s to 5.5s immediately:

time erl +h 4096 -smp -noshell -run tbray4 start o1000k.ap 10 -s erlang halt

The +h option seems to affect on binary version a lot, but few on list version. This may be caused by that list is always copied and binary may be left in process' heap and passed by point?

The default heap size for each process is set to 233 Word, this number may be suitable for a lot of concurrent processes to avoid too much memory exhaust. But for some parallelization tasks, with less processes, or with enough memory, the heap size can be adjusted to a bit large.

Anyway, I think Erlang/OTP has been very good there for Concurrency, but there may be still room to optimize for Parallelization.

BTW, with +h option, and some tips for efficient binary, the most concise binary version tbray5.erl can run into 3 sec now.
========

This is a performance summary on Tim's Erlang exercise on large dataset processing, I only compare the results on a 4-CPU Intel Xeon 2.80G linux box:

Log FileTimeErlang(1 Proc)Erlang(Many Proc)Erlang(Many Proc)
+h 4096
Ruby
1 milli linesreal22.088s7.700s5.475s4.161s
user21.161s25.750s18.785s3.592s
sys0.924s3.552s1.352s0.568s
5 milli linesreal195.570s37.669s27.911s20.768s
user192.496s126.296s98.162s19.009s
sys3.480s17.789s7.344s3.116s

Notice:

  • The Erlang code is tbray4.erl, which can be found in previous blog, the Ruby code is from Tim's blog.
  • Erlang code is parallelized, Ruby code not.
  • Erlang code is with tons of code, but, parallelization is not free lunch.
  • With an 8-CPU box, Erlang's version should exceed or near to non-parallelized Ruby version*.
  • Although we are talking about multiple-core era, but I'm not sure if disk/io is also ready.

* Per Steve's testing.

CN Erlounge II

It was last weekend, in Zhuhai, China, a two day CN Erlounge II, discussed topics of Erlang and FP:

  • Why I choose Erlang? - by xushiwei
  • Erlang emulator implementation - by mryufeng
  • Port &amp; driver - by codeplayer
  • Py 2 Erl - by Zoom.Quiet
  • STM: Lock free concurrent Overview - by Albert Lee
  • mnesia - by mryufeng

And a new logo of "ERL.CHINA" was born as:

erlchina.jpeg

(Picture source -  http://www.haokanbu.com/story/1104/)

More information about CN Erlounge II can be found  here and some  pictures.

I did not schedule for this meeting, maybe next time.

Learning Coding Parallelization (Was Tim's Erlang Exercise - Round V)

>>> Updated Oct 20:
After cleaned up my 4-CPU linux box, the result for the 1M records file is about 7.7 sec, for the 5M records file is about 38 sec.
========

>>> Updated Oct 16:
After testing my code on different machines, I found that disk/io performed varyingly, for some very large files, reading file in parallel may cause longer elapsed time (typically on non-server machine, which is not equipped for fast disk/io). So, I added another version tbray4b.erl, in this version, only reading file is not parallalized, all other code is the same. If you'd like to have a test on your machine, please try both.
========

Well, I think I've learned a lot from doing Tim's exercise, not only the List vs Binary in Erlang, but also computing in parallel. Coding Concurrency is farely easy in Erlang, but coding Parallelization is not only about the Languages, it's also a real question.

I wrote tbray3.erl in The Erlang Way (Was Tim Bray's Erlang Exercise - Round IV) and got a fairly good result by far on my 2-core MacBook. But things always are a bit complex. As Steve pointed in the comment, when he tried tbray3.erl on his 8-core linux box:

"I ran it in a loop 10 times, and the best time I saw was 13.872 sec, and user/CPU time was only 16.150 sec, so it’s apparently not using the multiple cores very well."

I also encoutered this issue on my 4-CPU Intel Xeon CPU 2.80GHz debian box, it runs even worse (8.420s) than my 2-core MacBook (4.483s).

I thought about my code a while, and found that my code seems spawning too many processes for scan_chunk, as the scan_chunk's performance has been improved a lot, each process will finish its task very quickly, too quick to the file reading, the inceasing CPUs have no much chance to play the game, the cycled 'reading'-'spawning scan process' is actually almost sequential now, there has been very few simultaneously alive scanning processes. I think I finally meet the file reading bound.

But wait, as I claimed before, that reading file to memory is very fast in Erlang, for a 200M log file, it takes less than 800ms. The time elapsed for tbray3.erl is about 4900ms, far away from 800ms, why I say the file reading is the bound now?

The problem here is: since I suspect the performance of traversing binary byte by byte, I choose to convert binary to list to scan the world. Per my testing results, list is better than binary when is not too longer, in many cases, not longer than several KBytes. And, to make the code clear and readable, I also choose splitting big binary when read file in the meanwhile, so, I have to read file in pieces of no longer than n KBytes. For a very big file, the reading procedure is broken to several ten-thousands steps, which finally cause the whole file reading time elapsed is bit long. That's bad.

So, I decide to write another version, which will read file in parallel (Round III), and split each chunk on lastest new-line (Round II), scan the words using pattern match (Round IV), and yes, I'll use binary instead of list this time, try to solve the worse performance of binary-traverse by parallel, on multiple cores.

The result is interesting, it's the first time I achieved around 10 sec in my 2-core MacBook when use binary match only, and it's also the first time, on my dummy 4-CPU Intel Xeon CPU 2.80GHz debian box, I got better result (7.700 sec) than my MacBook.

>>> Updated Oct 15:
Steve run the code on his 8-core 2.33 GHz Intel Xeon Linux box, with the best time was 4.920 sec:

"the best time I saw for your newest version was 4.920 sec on my 8-core Linux box. Fast! However, user time was only 14.751 sec, so I’m not sure it’s using all the cores that well. Perhaps you’re getting down to where I/O is becoming a more significant factor."

Please see Steve's One More Erlang Wide Finder and his widefinder attempts.
========

Result on 2.0GHz 2-core MacBook:

$ time erl -smp -noshell -run tbray4_bin start o1000k.ap 20 -s erlang halt
8900    : 2006/09/29/Dynamic-IDE
2000    : 2006/07/28/Open-Data
1300    : 2003/07/25/NotGaming
800     : 2003/10/16/Debbie
800     : 2003/09/18/NXML
800     : 2006/01/31/Data-Protection
700     : 2003/06/23/SamsPie
600     : 2006/09/11/Making-Markup
600     : 2003/02/04/Construction
600     : 2005/11/03/Cars-and-Office-Suites
Time:   10527.50 ms

real    0m10.910s
user    0m13.927s
sys     0m6.413s

Result on 4-CPU Intel Xeon CPU 2.80GHz debian box,:

# When process number is set to 20:
time erl -smp -noshell -run tbray4_bin start o1000k.ap 20 -s erlang halt 

real    0m7.700s
user    0m25.750s
sys     0m3.552s

# When process number is set to 1:
$ time erl -smp -noshell -run tbray4_bin start o1000k.ap 1 -s erlang halt

real    0m22.035s
user    0m21.525s
sys     0m0.512s

# On a 940M 5 million lines log file:
time erl -smp -noshell -run tbray4_bin start o5000k.ap 100 -s erlang halt
44500   : 2006/09/29/Dynamic-IDE
10000   : 2006/07/28/Open-Data
6500    : 2003/07/25/NotGaming
4000    : 2003/10/16/Debbie
4000    : 2003/09/18/NXML
4000    : 2006/01/31/Data-Protection
3500    : 2003/06/23/SamsPie
3000    : 2006/09/11/Making-Markup
3000    : 2003/02/04/Construction
3000    : 2005/11/03/Cars-and-Office-Suites
Time:   37512.76 ms

real    0m37.669s
user    2m6.296s
sys     0m17.789s

On the 4-CPU linux box, comparing the elapsed time between ProcNum = 20 and ProcNum = 1, the elapsed time of parallelized one was only 35% of un-parallelized one, speedup about 185%. The ratio was almost the same as my pread_file.erl testing on the same machine.

It's actually a combination of code in my four previous blogs. Although the performance is not so good as tbray3.erl on my MacBook, but I'm happy that this version is a fully parallelized one, from reading file, scanning words etc. it should scale better than all my previous versions.

The code: tbray4.erl

-module(tbray4).

-compile([native]).

-export([start/1,
         start/2]).

-include_lib("kernel/include/file.hrl").

start([FileName, ProcNum]) when is_list(ProcNum) -> 
    start(FileName, list_to_integer(ProcNum)).
start(FileName, ProcNum) ->
    Start = now(),

    Main = self(),
    Counter = spawn(fun () -> count_loop(Main) end),
    Collector = spawn(fun () -> collect_loop(Counter) end),

    pread_file(FileName, ProcNum, Collector),

    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])       
    end.

pread_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    pread_file_1(FileName, ChunkSize, ProcNum, Collector).       
pread_file_1(FileName, ChunkSize, ProcNum, Collector) ->
    [spawn(fun () ->
                   Length = if  I == ProcNum - 1 -> ChunkSize * 2; %% lastest chuck
                                true -> ChunkSize end,
                   {ok, File} = file:open(FileName, [raw, binary]),
                   {ok, Bin} = file:pread(File, ChunkSize * I, Length),
                   file:close(File),
                   {Data, Tail} = split_on_last_newline(Bin),
                   Collector ! {seq, I, Data, Tail}
           end) || I <- lists:seq(0, ProcNum - 1)],
    Collector ! {chunk_num, ProcNum}.

collect_loop(Counter) -> collect_loop_1([], <<>>, -1, Counter).
collect_loop_1(Chunks, PrevTail, LastSeq, Counter) ->
    receive
        {chunk_num, ChunkNum} ->
            Counter ! {chunk_num, ChunkNum},
            collect_loop_1(Chunks, PrevTail, LastSeq, Counter);
        {seq, I, Data, Tail} ->
            SortedChunks = lists:keysort(1, [{I, Data, Tail} | Chunks]),
            {Chunks1, PrevTail1, LastSeq1} = 
                process_chunks(SortedChunks, [], PrevTail, LastSeq, Counter),
            collect_loop_1(Chunks1, PrevTail1, LastSeq1, Counter)
    end.
    
count_loop(Main) -> count_loop_1(Main, dict:new(), undefined, 0).
count_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
count_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            count_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} ->
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            count_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.

process_chunks([], ChunkBuf, PrevTail, LastSeq, _) -> {ChunkBuf, PrevTail, LastSeq};
process_chunks([{I, Data, Tail}=Chunk|T], ChunkBuf, PrevTail, LastSeq, Counter) ->
    case LastSeq + 1 of
        I ->
            spawn(fun () -> Counter ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end),
            process_chunks(T, ChunkBuf, Tail, I, Counter);
        _ ->
            process_chunks(T, [Chunk | ChunkBuf], PrevTail, LastSeq, Counter)
    end.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

get_chunk_size(FileName, ProcNum) ->
    {ok, #file_info{size=Size}} = file:read_file_info(FileName),
    Size div ProcNum.

split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)).   
split_on_last_newline_1(Bin, Offset) when Offset > 0 ->
    case Bin of
        <<Data:Offset/binary,$\n,Tail/binary>> ->
            {Data, Tail};
        _ -> 
            split_on_last_newline_1(Bin, Offset - 1)
    end;
split_on_last_newline_1(Bin, _) -> {Bin, <<>>}.
    
scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()).    
scan_chunk_1(Bin, Offset, Dict) when Offset =< size(Bin) - 34 ->
    case Bin of
        <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> ->            
            case match_until_space_newline(Rest, 0) of
                {Rest1, <<>>} -> 
                    scan_chunk_1(Rest1, 0, Dict);
                {Rest1, Word} -> 
                    Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>,
                    scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict))
            end;
        _ -> scan_chunk_1(Bin, Offset + 1, Dict)
    end;
scan_chunk_1(_, _, Dict) -> Dict.

match_until_space_newline(Bin, Offset) when Offset < size(Bin) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            {Rest, Word};
        <<_:Offset/binary,$.,Rest/binary>> ->
            {Rest, <<>>};
        <<_:Offset/binary,10,Rest/binary>> ->
            {Rest, <<>>};
        _ ->
            match_until_space_newline(Bin, Offset + 1)
    end;
match_until_space_newline(_, _) -> {<<>>, <<>>}.

>>> Updated Oct 16:
After testing my code on different machines, I found that disk/io performed varyingly, for some very large files, reading file in parallel may cause longer elapsed time (typically on non-server machine, which is not equipped for fast disk/io). So, I wrote another version: tbray4b.erl, in this version, only reading file is not parallalized, all other code is the same. Here's a result for this version on a 940M file with 5 million lines, with ProcNum set to 200 and 400)

# On 2-core MacBook:
$ time erl -smp -noshell -run tbray4b start o5000k.ap 200 -s erlang halt

real    0m50.498s
user    0m49.746s
sys     0m11.979s

# On 4-cpu linux box:
$ time erl -smp -noshell -run tbray4b start o5000k.ap 400 -s erlang halt

real    1m2.136s
user    1m59.907s
sys     0m7.960s

The code: tbray4b.erl

-module(tbray4b).

-compile([native]).

-export([start/1,
         start/2]).

-include_lib("kernel/include/file.hrl").

start([FileName, ProcNum]) when is_list(ProcNum) -> 
    start(FileName, list_to_integer(ProcNum)).
start(FileName, ProcNum) ->
    Start = now(),

    Main = self(),
    Counter = spawn(fun () -> count_loop(Main) end),
    Collector = spawn(fun () -> collect_loop(Counter) end),

    read_file(FileName, ProcNum, Collector),

    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])       
    end.

read_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file_1(File, ChunkSize, 0, Collector).    
read_file_1(File, ChunkSize, I, Collector) ->
    case file:read(File, ChunkSize) of
        eof ->
            file:close(File),
            Collector ! {chunk_num, I};
        {ok, Bin} -> 
            spawn(fun () ->
                          {Data, Tail} = split_on_last_newline(Bin),
                          Collector ! {seq, I, Data, Tail}
                  end),
            read_file_1(File, ChunkSize, I + 1, Collector)
    end.

collect_loop(Counter) -> collect_loop_1([], <<>>, -1, Counter).
collect_loop_1(Chunks, PrevTail, LastSeq, Counter) ->
    receive
        {chunk_num, ChunkNum} ->
            Counter ! {chunk_num, ChunkNum},
            collect_loop_1(Chunks, PrevTail, LastSeq, Counter);
        {seq, I, Data, Tail} ->
            SortedChunks = lists:keysort(1, [{I, Data, Tail} | Chunks]),
            {Chunks1, PrevTail1, LastSeq1} = 
                process_chunks(SortedChunks, [], PrevTail, LastSeq, Counter),
            collect_loop_1(Chunks1, PrevTail1, LastSeq1, Counter)
    end.
    
count_loop(Main) -> count_loop_1(Main, dict:new(), undefined, 0).
count_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
count_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            count_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} ->
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            count_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.

process_chunks([], ChunkBuf, PrevTail, LastSeq, _) -> {ChunkBuf, PrevTail, LastSeq};
process_chunks([{I, Data, Tail}=Chunk|T], ChunkBuf, PrevTail, LastSeq, Counter) ->
    case LastSeq + 1 of
        I ->
            spawn(fun () -> Counter ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end),
            process_chunks(T, ChunkBuf, Tail, I, Counter);
        _ ->
            process_chunks(T, [Chunk | ChunkBuf], PrevTail, LastSeq, Counter)
    end.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

get_chunk_size(FileName, ProcNum) ->
    {ok, #file_info{size=Size}} = file:read_file_info(FileName),
    Size div ProcNum.

split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)).   
split_on_last_newline_1(Bin, Offset) when Offset > 0 ->
    case Bin of
        <<Data:Offset/binary,$\n,Tail/binary>> ->
            {Data, Tail};
        _ -> 
            split_on_last_newline_1(Bin, Offset - 1)
    end;
split_on_last_newline_1(Bin, _) -> {Bin, <<>>}.
    
scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()).    
scan_chunk_1(Bin, Offset, Dict) when Offset =< size(Bin) - 34 ->
    case Bin of
        <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> ->            
            case match_until_space_newline(Rest, 0) of
                {Rest1, <<>>} -> 
                    scan_chunk_1(Rest1, 0, Dict);
                {Rest1, Word} -> 
                    Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>,
                    scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict))
            end;
        _ -> scan_chunk_1(Bin, Offset + 1, Dict)
    end;
scan_chunk_1(_, _, Dict) -> Dict.

match_until_space_newline(Bin, Offset) when Offset < size(Bin) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            {Rest, Word};
        <<_:Offset/binary,$.,Rest/binary>> ->
            {Rest, <<>>};
        <<_:Offset/binary,10,Rest/binary>> ->
            {Rest, <<>>};
        _ ->
            match_until_space_newline(Bin, Offset + 1)
    end;
match_until_space_newline(_, _) -> {<<>>, <<>>}.


=======

The Erlang Way (Was Tim Bray's Erlang Exercise - Round IV)

Playing with Tim's Erlang Exercise is so much fun.

I've been coding in Erlang about 6 months as a newbie, in most cases, I do parsing on string (or list what ever) with no need of regular expressions, since Erlang's pattern match can usaully solve most problems straightforward.

Tim's log file is also a good example for applying pattern match in Erlang way. It's a continuous stream of dataset, after splitting it to line-bounded chunks for parallellization purpose, we can truely match whole {GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } directly on chunk with no need to split to lines any more.

This come out my third solution, which matchs whole

{GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } 

likeness using the pattern:

"GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest]

and then fetchs

[Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/] ++ match_until_space_newline(Rest, [])

as the matched key, with no need to split the chunk to lines.

But yes, we still need to split each chunk on the lastest newline to get parallelized result exactly accurate.

On my 2-core 2 GHz MacBook, the best time I’ve got is 4.483 sec

# smp enabled:
$ erlc -smp tbray3.erl
$ time erl -smp +P 60000 -noshell -run tbray3 start o1000k.ap -s erlang halt
8900    : <<"2006/09/29/Dynamic-IDE">>
2000    : <<"2006/07/28/Open-Data">>
1300    : <<"2003/07/25/NotGaming">>
800     : <<"2003/10/16/Debbie">>
800     : <<"2003/09/18/NXML">>
800     : <<"2006/01/31/Data-Protection">>
700     : <<"2003/06/23/SamsPie">>
600     : <<"2006/09/11/Making-Markup">>
600     : <<"2003/02/04/Construction">>
600     : <<"2005/11/03/Cars-and-Office-Suites">>
Time:    4142.83 ms

real    0m4.483s
user    0m5.804s
sys     0m0.615s

# no-smp:
$ erlc tbray3.erl
$ time erl -noshell -run tbray_list_no_line start o1000k.ap -s erlang halt

real    0m7.050s
user    0m6.183s
sys     0m0.644s

The smp enable result speedup about 57%

On the 2.80GHz 4-cpu xeon debian box that I mentioned before in previous blog, the best result is:

real    0m8.420s
user    0m11.637s
sys     0m0.452s

And I've noticed, adjusting the BUFFER_SIZE can balance the time consumered by parallelized parts and un-parallelized parts. That is, if the number of core is increased, we can also increase the BUFFER_SIZE a bit, so the number of chunks decreased (less un-parallelized split_on_last_new_line/1 and file:pread/3) but with more heavy work for parallelized binary_to_list/1 and scan_chunk/1 on longer list.

The best BUFFER_SIZE on my computer is 4096 * 5 bytes, which causes un-parallized split_on_last_newline/1 took about only 0.226s in the case.

The code:

-module(tbray3).

-compile([native]).

-export([start/1]).
  
%% The best Bin Buffer Size is 4096 * 1 - 4096 * 5
-define(BUFFER_SIZE, (4096 * 5)). 

start(FileName) ->
    Start = now(),

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),
 
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file(File, Collector),
    
    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])
    end.

read_file(File, Collector) -> read_file_1(File, [], 0, Collector).
read_file_1(File, PrevTail, I, Collector) ->
    case file:read(File, ?BUFFER_SIZE) of
        eof ->
            Collector ! {chunk_num, I},
            file:close(File);
        {ok, Bin} -> 
            {Chunk, NextTail} = split_on_last_newline(PrevTail ++ binary_to_list(Bin)),
            spawn(fun () -> Collector ! {dict, scan_chunk(Chunk)} end),
            read_file_1(File, NextTail, I + 1, Collector)
    end.

split_on_last_newline(List) -> split_on_last_newline_1(lists:reverse(List), []).
split_on_last_newline_1(List, Tail) ->
    case List of
        []         -> {lists:reverse(List), []};
        [$\n|Rest] -> {lists:reverse(Rest), Tail};
        [C|Rest]   -> split_on_last_newline_1(Rest, [C | Tail])
    end.

collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0).
collect_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} -> 
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.
    
print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

scan_chunk(List) -> scan_chunk_1(List, dict:new()).
scan_chunk_1(List, Dict) ->
    case List of
        [] -> Dict;
        "GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest] ->
            case match_until_space_newline(Rest, []) of
                {Rest1, []} -> 
                    scan_chunk_1(Rest1, Dict);
                {Rest1, Word} -> 
                    Key = list_to_binary([Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word]),
                    scan_chunk_1(Rest1, dict:update_counter(Key, 1, Dict))
            end;
        [_|Rest] -> scan_chunk_1(Rest, Dict)
    end.
    
match_until_space_newline(List, Word) ->
    case List of
        []     -> {[],   []};
        [10|_] -> {List, []};
        [$.|_] -> {List, []};
        [$ |_] -> {List, lists:reverse(Word)};
        [C|Rest] -> match_until_space_newline(Rest, [C | Word])
    end.

I also wrote another corresponding binary version, which is 2-3 times slower than above list version on my machine, but the result may vary depending on your compiled Erlang/OTP on various operation system. I will test it again when Erlang/OTP R12B is released, which is claimed to have been optimized for binary match performance.

The code:

-module(tbray3_bin).

-compile([native]).

-export([start/1]).

-define(BUFFER_SIZE, (4096 * 10000)).

start(FileName) ->
    Start = now(),

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),

    {ok, File} = file:open(FileName, [raw, binary]),    
    read_file(File, Collector),

    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~p ms~n", [timer:now_diff(now(), Start) / 1000])       
    end.
    
collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0).
collect_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} ->
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].
          
read_file(File, Collector) -> read_file_1(File, <<>>, 0, Collector).            
read_file_1(File, PrevTail, I, Collector) ->
    case file:read(File, ?BUFFER_SIZE) of
        eof -> 
            file:close(File),
            Collector ! {chunk_num, I};
        {ok, Bin} -> 
            {Data, NextTail} = split_on_last_newline(Bin),
            spawn(fun () -> Collector ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end),
            read_file_1(File, NextTail, I + 1, Collector)
    end.

split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)).   
split_on_last_newline_1(Bin, Offset) when Offset > 0 ->
    case Bin of
        <<Data:Offset/binary,$\n,Tail/binary>> ->
            {Data, Tail};
        _ -> 
            split_on_last_newline_1(Bin, Offset - 1)
    end;
split_on_last_newline_1(Bin, _) -> {Bin, <<>>}.
    
scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()).    
scan_chunk_1(Bin, Offset, Dict) when Offset < size(Bin) - 34 ->
    case Bin of
        <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> ->            
            case match_until_space_newline(Rest, 0) of
                {Rest1, <<>>} -> 
                    scan_chunk_1(Rest1, 0, Dict);
                {Rest1, Word} -> 
                    Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>,
                    scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict))
            end;
        _ -> scan_chunk_1(Bin, Offset + 1, Dict)
    end;
scan_chunk_1(_, _, Dict) -> Dict.

match_until_space_newline(Bin, Offset) when Offset < size(Bin) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            {Rest, Word};
        <<_:Offset/binary,$.,Rest/binary>> ->
            {Rest, <<>>};
        <<_:Offset/binary,10,Rest/binary>> ->
            {Rest, <<>>};
        _ ->
            match_until_space_newline(Bin, Offset + 1)
    end;
match_until_space_newline(_, _) -> {<<>>, <<>>}.

Take a Break as Trader

Udated Dec 7: Well, I bought some stocks again today. Let's see year 2008.

I sold out all my hold on Stock Exchange of China this morning, and will take a break until end of this year. Wow, what a year.

Reading File in Parallel in Erlang (Was Tim Bray's Erlang Exercise - Round III)

My first solution for Tim's exercise tried to read file in parallel, but I just realized by reading file module's source code, that file:open(FileName, Options) will return a process instead of IO device. Well, this means a lot:

  • It's a process, so, when you request more data on it, you actually send message to it. Since you only send 2 integer: the offset and length, sending message should be very fast. But then, this process (File) will wait for receiving data from disk/io. For one process, the receiving is sequential rather than parallelized.
  • If we look the processes in Erlang as ActiveObjects, which send/receive messages/data in async, since the receiving is sequential in one process, requesting/waiting around one process(or, object) is almost safe for parallelized programming, you usaully do not need to worry about lock/unlock etc. (except the outside world).
  • We can open a lot of File processes to read data in parallel, the bound is the disk/IO and the os' resources limit.

I wrote some code to test file reading in parallel, discardng the disk cache, on my 2-core MacBook, reading file with two processes can speedup near 200% to one process.

The code:


-module(file_pread).

-compile([native]).

-export([start/2]).

-include_lib("kernel/include/file.hrl").

start(FileName, ProcNum) ->
    [start(FileName, ProcNum, Fun) || Fun <- [fun read_file/3, fun pread_file/3]].


start(FileName, ProcNum, Fun) ->
    Start = now(),  

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),

    Fun(FileName, ProcNum, Collector),
    
    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) 
    end.

collect_loop(Main) -> collect_loop_1(Main, undefined, 0).
collect_loop_1(Main, ChunkNum, ChunkNum) -> 
    Main ! stop;
collect_loop_1(Main, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} ->
            collect_loop_1(Main, ChunkNumX, ProcessedNum);
        {seq, _Seq} ->
            collect_loop_1(Main, ChunkNum, ProcessedNum + 1)
    end.

get_chunk_size(FileName, ProcNum) ->
    {ok, #file_info{size=Size}} = file:read_file_info(FileName),
    Size div ProcNum.

read_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file_1(File, ChunkSize, 0, Collector).
    
read_file_1(File, ChunkSize, I, Collector) ->
    case file:read(File, ChunkSize) of
        eof ->
            file:close(File),
            Collector ! {chunk_num, I};
        {ok, _Bin} -> 
            Collector ! {seq, I},
            read_file_1(File, ChunkSize, I + 1, Collector)
    end.


pread_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    pread_file_1(FileName, ChunkSize, ProcNum, Collector).
       
pread_file_1(FileName, ChunkSize, ProcNum, Collector) ->
    [spawn(fun () ->
                   %% if it's the lastest chuck, read all bytes left, 
                   %% which will not exceed ChunkSize * 2
                   Length = if  I == ProcNum - 1 -> ChunkSize * 2;
                                true -> ChunkSize end,
                   {ok, File} = file:open(FileName, [raw, binary]),
                   {ok, _Bin} = file:pread(File, ChunkSize * I, Length),
                   Collector ! {seq, I},
                   file:close(File)
           end) || I <- lists:seq(0, ProcNum - 1)],
    Collector ! {chunk_num, ProcNum}.

The pread_file/3 is parallelized, it always opens new File process for each reading process instead of sharing one opened File process during all reading processes. The read_file/3 is non-parallelized.

To evaulate: (run at least two-time for each test to average disk/IO caches.)

$ erlc -smp file_pread.erl
$ erl -smp

1> file_pread:start("o100k.ap", 2).
time:     691.72 ms
time:      44.37 ms
[ok,ok]
2> file_pread:start("o100k.ap", 2).
time:      74.50 ms
time:      43.59 ms
[ok,ok]
3> file_pread:start("o1000k.ap", 2).
time:    1717.68 ms
time:     408.48 ms
[ok,ok]
4> file_pread:start("o1000k.ap", 2).
time:     766.00 ms
time:     393.71 ms
[ok,ok]
5> 

Let's compare the results for each file (we pick the second testing result of each), the speedup:

  • o100k.ap, 20M, 74.50 / 43.59 - 1= 70%
  • o1000k.ap, 200M, 766.00 / 393.71 - 1 = 95%

On another 4-CPU debian machine, with 4 processes, the best result I got:

4> file_pread:start("o1000k.ap", 4).
time:     768.59 ms
time:     258.57 ms
[ok, ok]
5>

The parallelized reading speedup 768.59 / 258.57 -1 = 197%

I've updated my first solution according to this testing, opening new File process for each reading process instead of sharing the same File process. Of cource, there are still issues that I pointed in Tim Bray's Erlang Exercise on Large Dataset Processing - Round II

Although the above result can also be achieved in other Languages, but I find that coding parallelization in Erlang is a pleasure.

Tim Bray's Erlang Exercise on Large Dataset Processing - Round II

Updated Oct 09: Added more benchmark results under linux on other machines.
Updated Oct 07: More concise code.
Updated Oct 06: Fixed bugs: 1. Match "GET /ongoing/When/" instead of "/ongoing/When/"; 2. split_on_last_newline should not reverse Tail.

Backed from a short vacation, and sit down in front of my computer, I'm thinking about Tim Bray's exercise again.

As I realized, the most expensive procedure is splitting dataset to lines. To get the multiple-core benefit, we should parallelize this procedure instead of reading file to binary or macthing process only.

In my previous solution, there are at least two issues:

  • Since the file reading is fast in Erlang, then, parallelizing the file reading is not much helpful.
  • The buffered_read actually can be merged with the buffered file reading.

And, Per's solution parallelizes process_match procedure only, based on a really fast divide_to_lines, but with hacked binary matching syntax.

After a couple of hours working, I finially get the second version of tbray.erl (with some code from Per's solution).

  • Read file to small pieces of binary (about 4096 bytes each chunk), then convert to list.
  • Merge the previous tail for each chunk, search this chunk from tail, find the last new line mark, split this chunk to line-bounded data part, and tail part for next chunk.
  • The above steps are difficult to parallelize. If we try, there will be about 30 more LOC, and not so readable.
  • Spawn a new process at once to split line-bounded chunk to lines, process match and update dict.
  • Thus we can go on reading file with non-stop.
  • A collect_loop will receive dicts from each process, and merge them.

What I like of this version is, it scales on mutiple-core almost linearly! On my 2.0G 2-core MacBook, it took about 13.522 seconds with non-smp, 7.624 seconds with smp enabled (for a 200M data file, with about 50,000 processes spawned). The 2-core smp result achieves about 77% faster than non-smp result. I'm not sure how will it achieve on an 8-core computer, but we'll finally reach the limit due to the un-parallelized procedures.

The Erlang time results:

$ erlc tbray.erl
$ time erl -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null

real    0m13.522s
user    0m12.265s
sys     0m1.199s

$ erlc -smp tbray.erl
$ time erl -smp +P 60000 -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null

real    0m7.624s
user    0m13.302s
sys     0m1.602s

# For 5 million lines, 958.4M size:
$ time erl -smp +P 300000 -noshell -run tbray start o5000k.ap -s erlang halt > /dev/null

real    0m37.085s
user    1m5.605s
sys     0m7.554s

And the original Tim's Ruby version:

$ time ruby tbray.rb o1000k.ap > /dev/null

real    0m2.447s
user    0m2.123s
sys     0m0.306s

# For 5 million lines, 958.4M size:
$ time ruby tbray.rb o5000k.ap > /dev/null

real    0m12.115s
user    0m10.494s
sys     0m1.473s

Erlang time result on 2-core 1.86GHz CPU RedHat linux box, with kernel:
Linux version 2.6.18-1.2798.fc6 (brewbuilder@hs20-bc2-4.build.redhat.com) (gcc v ersion 4.1.1 20061011 (Red Hat 4.1.1-30)) #1 SMP Mon Oct 16 14:37:32 EDT 2006
is 7.7 seconds.

Erlang time result on 2.80GHz 4-cpu xeon debian box, with kernel:
Linux version 2.6.15.4-big-smp-tidy (root@test) (gcc version 4.0.3 20060128 (prerelease) (Debian 4.0 .2-8)) #1 SMP Sat Feb 25 21:24:23 CST 2006

The smp result on this 4-cpu computer is questionable. It speededup only 50% than non-smp, even worse than my 2.0GHz 2-core MacBook. I also tested the Big Bang on this machine, it speedup less than 50% too.

$ erlc tbray.erl 
$ time erl -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null 

real 0m22.279s 
user 0m21.597s 
sys  0m0.676s 

$ erlc -smp tbray.erl 
$ time erl -smp +S 4 +P 60000 -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null 

real 0m14.765s 
user 0m28.722s 
sys  0m0.840s 

Notice:

  • All tests run several times to have the better result expressed, so, the status of disk/io cache should be near.
  • You may need to compile tbray.erl to two different BEAMs, one for smp version, and one for no-smp version.
  • If you'd like to process bigger file, you can use +P processNum to get more simultaneously alive Erlang processes. For BUFFER_SIZE=4096, you can set +P arg as FileSize / 4096, or above. From Erlang's Efficiency Guide:
    Processes
    The maximum number of simultaneously alive Erlang processes is by default 32768. This limit can be raised up to at most 268435456 processes at startup (see documentation of the system flag +P in the erl(1) documentation). The maximum limit of 268435456 processes will at least on a 32-bit architecture be impossible to reach due to memory

To evaluate with smp enable: (Erlang/OTP R11B-5 for Windows may not support smp yet)

erl -smp +P 60000
> tbray:start("o1000k.ap").

The code: (pretty formatted by ErlyBird 0.15.1)

-module(tbray_blog).

-compile([native]).

-export([start/1]).

%% The best Bin Buffer Size is 4096
-define(BUFFER_SIZE, 4096). 

start(FileName) ->
    Start = now(),

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),

    {ok, File} = file:open(FileName, [raw, binary]),
    read_file(File, Collector),
    
    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])
    end.

read_file(File, Collector) -> read_file_1(File, [], 0, Collector).
read_file_1(File, PrevTail, I, Collector) ->
    case file:read(File, ?BUFFER_SIZE) of
        eof ->
            Collector ! {chunk_num, I},
            file:close(File);
        {ok, Bin} -> 
            {Data, NextTail} = split_on_last_newline(PrevTail ++ binary_to_list(Bin)),
            spawn(fun () -> Collector ! {dict, scan_lines(Data)} end),
            read_file_1(File, NextTail, I + 1, Collector)
    end.

split_on_last_newline(List) -> split_on_last_newline_1(lists:reverse(List), []).
split_on_last_newline_1(List, Tail) ->
    case List of
        []         -> {lists:reverse(List), []};
        [$\n|Rest] -> {lists:reverse(Rest), Tail};
        [C|Rest]   -> split_on_last_newline_1(Rest, [C | Tail])
    end.

collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0).
collect_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} -> 
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.
    
print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~p\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

scan_lines(List) -> scan_lines_1(List, [], dict:new()).
scan_lines_1(List, Line, Dict) -> 
    case List of
        [] -> match_and_update_dict(lists:reverse(Line), Dict);
        [$\n|Rest] ->
            scan_lines_1(Rest, [], match_and_update_dict(lists:reverse(Line), Dict));
        [C|Rest] ->
            scan_lines_1(Rest, [C | Line], Dict)
    end.

match_and_update_dict(Line, Dict) ->
    case process_match(Line) of
        false -> Dict;
        {true, Word} -> 
            dict:update_counter(Word, 1, Dict)
    end.
    
process_match(Line) ->
    case Line of
        [] -> false;
        "GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest] -> 
            case match_until_space(Rest, []) of
                [] -> false;
                Word -> {true, [Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/] ++ Word}
            end;
        [_|Rest] -> 
            process_match(Rest)
    end.
    
match_until_space(List, Word) ->
    case List of
        [] -> [];
        [$.|_] -> [];
        [$ |_] -> lists:reverse(Word);
        [C|Rest] -> match_until_space(Rest, [C | Word])
    end.

Lessons learnt:

  • Split large binary to proper size chunks, then convert to list for further processing
  • Parallelize the most expensive part (of course)
  • We need a new or more complete Efficent Erlang