Friday Oct 05, 2007

Tim Bray's Erlang Exercise on Large Dataset Processing - Round II

Updated Oct 09: Added more benchmark results under linux on other machines.
Updated Oct 07: More concise code.
Updated Oct 06: Fixed bugs: 1. Match "GET /ongoing/When/" instead of "/ongoing/When/"; 2. split_on_last_newline should not reverse Tail.

Backed from a short vacation, and sit down in front of my computer, I'm thinking about Tim Bray's exercise again.

As I realized, the most expensive procedure is splitting dataset to lines. To get the multiple-core benefit, we should parallelize this procedure instead of reading file to binary or macthing process only.

In my previous solution, there are at least two issues:

  • Since the file reading is fast in Erlang, then, parallelizing the file reading is not much helpful.
  • The buffered_read actually can be merged with the buffered file reading.

And, Per's solution parallelizes process_match procedure only, based on a really fast divide_to_lines, but with hacked binary matching syntax.

After a couple of hours working, I finially get the second version of tbray.erl (with some code from Per's solution).

  • Read file to small pieces of binary (about 4096 bytes each chunk), then convert to list.
  • Merge the previous tail for each chunk, search this chunk from tail, find the last new line mark, split this chunk to line-bounded data part, and tail part for next chunk.
  • The above steps are difficult to parallelize. If we try, there will be about 30 more LOC, and not so readable.
  • Spawn a new process at once to split line-bounded chunk to lines, process match and update dict.
  • Thus we can go on reading file with non-stop.
  • A collect_loop will receive dicts from each process, and merge them.

What I like of this version is, it scales on mutiple-core almost linearly! On my 2.0G 2-core MacBook, it took about 13.522 seconds with non-smp, 7.624 seconds with smp enabled (for a 200M data file, with about 50,000 processes spawned). The 2-core smp result achieves about 77% faster than non-smp result. I'm not sure how will it achieve on an 8-core computer, but we'll finally reach the limit due to the un-parallelized procedures.

The Erlang time results:

$ erlc tbray.erl
$ time erl -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null

real    0m13.522s
user    0m12.265s
sys     0m1.199s

$ erlc -smp tbray.erl
$ time erl -smp +P 60000 -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null

real    0m7.624s
user    0m13.302s
sys     0m1.602s

# For 5 million lines, 958.4M size:
$ time erl -smp +P 300000 -noshell -run tbray start o5000k.ap -s erlang halt > /dev/null

real    0m37.085s
user    1m5.605s
sys     0m7.554s

And the original Tim's Ruby version:

$ time ruby tbray.rb o1000k.ap > /dev/null

real    0m2.447s
user    0m2.123s
sys     0m0.306s

# For 5 million lines, 958.4M size:
$ time ruby tbray.rb o5000k.ap > /dev/null

real    0m12.115s
user    0m10.494s
sys     0m1.473s

Erlang time result on 2-core 1.86GHz CPU RedHat linux box, with kernel:
Linux version 2.6.18-1.2798.fc6 (brewbuilder@hs20-bc2-4.build.redhat.com) (gcc v ersion 4.1.1 20061011 (Red Hat 4.1.1-30)) #1 SMP Mon Oct 16 14:37:32 EDT 2006
is 7.7 seconds.

Erlang time result on 2.80GHz 4-cpu xeon debian box, with kernel:
Linux version 2.6.15.4-big-smp-tidy (root@test) (gcc version 4.0.3 20060128 (prerelease) (Debian 4.0 .2-8)) #1 SMP Sat Feb 25 21:24:23 CST 2006

The smp result on this 4-cpu computer is questionable. It speededup only 50% than non-smp, even worse than my 2.0GHz 2-core MacBook. I also tested the Big Bang on this machine, it speedup less than 50% too.

$ erlc tbray.erl 
$ time erl -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null 

real 0m22.279s 
user 0m21.597s 
sys  0m0.676s 

$ erlc -smp tbray.erl 
$ time erl -smp +S 4 +P 60000 -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null 

real 0m14.765s 
user 0m28.722s 
sys  0m0.840s 

Notice:

  • All tests run several times to have the better result expressed, so, the status of disk/io cache should be near.
  • You may need to compile tbray.erl to two different BEAMs, one for smp version, and one for no-smp version.
  • If you'd like to process bigger file, you can use +P processNum to get more simultaneously alive Erlang processes. For BUFFER_SIZE=4096, you can set +P arg as FileSize / 4096, or above. From Erlang's Efficiency Guide:
    Processes
    The maximum number of simultaneously alive Erlang processes is by default 32768. This limit can be raised up to at most 268435456 processes at startup (see documentation of the system flag +P in the erl(1) documentation). The maximum limit of 268435456 processes will at least on a 32-bit architecture be impossible to reach due to memory

To evaluate with smp enable: (Erlang/OTP R11B-5 for Windows may not support smp yet)

erl -smp +P 60000
> tbray:start("o1000k.ap").

The code: (pretty formatted by ErlyBird 0.15.1)

-module(tbray_blog).

-compile([native]).

-export([start/1]).

%% The best Bin Buffer Size is 4096
-define(BUFFER_SIZE, 4096). 

start(FileName) ->
    Start = now(),

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),

    {ok, File} = file:open(FileName, [raw, binary]),
    read_file(File, Collector),
    
    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])
    end.

read_file(File, Collector) -> read_file_1(File, [], 0, Collector).
read_file_1(File, PrevTail, I, Collector) ->
    case file:read(File, ?BUFFER_SIZE) of
        eof ->
            Collector ! {chunk_num, I},
            file:close(File);
        {ok, Bin} -> 
            {Data, NextTail} = split_on_last_newline(PrevTail ++ binary_to_list(Bin)),
            spawn(fun () -> Collector ! {dict, scan_lines(Data)} end),
            read_file_1(File, NextTail, I + 1, Collector)
    end.

split_on_last_newline(List) -> split_on_last_newline_1(lists:reverse(List), []).
split_on_last_newline_1(List, Tail) ->
    case List of
        []         -> {lists:reverse(List), []};
        [$\n|Rest] -> {lists:reverse(Rest), Tail};
        [C|Rest]   -> split_on_last_newline_1(Rest, [C | Tail])
    end.

collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0).
collect_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} -> 
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.
    
print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~p\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

scan_lines(List) -> scan_lines_1(List, [], dict:new()).
scan_lines_1(List, Line, Dict) -> 
    case List of
        [] -> match_and_update_dict(lists:reverse(Line), Dict);
        [$\n|Rest] ->
            scan_lines_1(Rest, [], match_and_update_dict(lists:reverse(Line), Dict));
        [C|Rest] ->
            scan_lines_1(Rest, [C | Line], Dict)
    end.

match_and_update_dict(Line, Dict) ->
    case process_match(Line) of
        false -> Dict;
        {true, Word} -> 
            dict:update_counter(Word, 1, Dict)
    end.
    
process_match(Line) ->
    case Line of
        [] -> false;
        "GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest] -> 
            case match_until_space(Rest, []) of
                [] -> false;
                Word -> {true, [Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/] ++ Word}
            end;
        [_|Rest] -> 
            process_match(Rest)
    end.
    
match_until_space(List, Word) ->
    case List of
        [] -> [];
        [$.|_] -> [];
        [$ |_] -> lists:reverse(Word);
        [C|Rest] -> match_until_space(Rest, [C | Word])
    end.

Lessons learnt:

  • Split large binary to proper size chunks, then convert to list for further processing
  • Parallelize the most expensive part (of course)
  • We need a new or more complete Efficent Erlang

Comments:

Caoyuan: could you please re-submit your comment over on my blog? I rejected the first as you requested, and it's hard for me to un-reject comments.

Posted by Tim Bray on October 05, 2007 at 09:08 AM PDT #

Caoyuan: I think binaries are really faster than lists.

Posted by Hynek (Pichi) Vychodil on October 06, 2007 at 03:26 PM PDT #

Hi, Pichi

I actually have another bin version, I didn't post it yet because I'll do more testing, then write sth. about List vs Bin in Erlang.

The binary version of mine and yours are almost same, and they all took about 49 seconds for same 200M file on my MacBook, so, were much slower than list version (7.4 seconds).

You are right for my timer:tc test for list, I also rewrote a travel_test and will post it too.

BR,
Caoyuan

Posted by Caoyuan Deng on October 06, 2007 at 10:58 PM PDT #

It looks like my debian's erlang base modules is not compiled with native option and this causes that I mesure yours solution slower than mine and you measure mine slover than yours (erlang-base-hipe.deb don't contain native compiled modules).

It looks like you are right and lists are fast enough. Binary is usable only for fast chunk divisions and followed processing is better on lists.

Posted by Hynek (Pichi) Vychodil on October 07, 2007 at 12:01 PM PDT #

Pichi

Per my testing, I think Binary is not suitable for detailed processing yet, at least in OTP R11B-5. I'm not sure how about the coming R12B.

I agree that binary is good for large dataset, chunk dicisions. List is good for short data processing, but not suitable for large dataset. And, lists:reverse is efficient, we do not need to worry about it.

I suspect the performance of binary version regexp implementation in current OTP versions.

Posted by Caoyuan Deng on October 08, 2007 at 05:30 AM PDT #

Hi Caoyuan, I ran your code about 20 times on my 2.33 GHz 8-core Intel Xeon Linux box. The best time I saw with the full o1000k.ap dataset was 13.92 seconds.

Posted by Steve Vinoski on October 08, 2007 at 09:23 PM PDT #

Hi Steve,

It seems the OTP on linux with different efficiency story?

I run my code on my Parallels virtual hosted Windows environment, for the o1000k.ap, with non-smp and [native] compile option supported, it took about 39s, but it's still better than the Binary version.

What I hope is, the 13.92 seconds was the Total 20 times running result on your box :-) Anyway, I'll do more testing on my friend's linux box.

Posted by Caoyuan Deng on October 09, 2007 at 12:20 AM PDT #

I just tested on a 2-core 1.86GHz CPU RedHat linux box, the result for o1000k.ap is 7.7 seconds.

Posted by Caoyuan Deng on October 09, 2007 at 12:42 AM PDT #

Post a Comment:
Comments are closed for this entry.