Reading File in Parallel in Erlang (Was Tim Bray's Erlang Exercise - Round III)

My first solution for Tim's exercise tried to read file in parallel, but I just realized by reading file module's source code, that file:open(FileName, Options) will return a process instead of IO device. Well, this means a lot:

  • It's a process, so, when you request more data on it, you actually send message to it. Since you only send 2 integer: the offset and length, sending message should be very fast. But then, this process (File) will wait for receiving data from disk/io. For one process, the receiving is sequential rather than parallelized.
  • If we look the processes in Erlang as ActiveObjects, which send/receive messages/data in async, since the receiving is sequential in one process, requesting/waiting around one process(or, object) is almost safe for parallelized programming, you usaully do not need to worry about lock/unlock etc. (except the outside world).
  • We can open a lot of File processes to read data in parallel, the bound is the disk/IO and the os' resources limit.

I wrote some code to test file reading in parallel, discardng the disk cache, on my 2-core MacBook, reading file with two processes can speedup near 200% to one process.

The code:


-module(file_pread).

-compile([native]).

-export([start/2]).

-include_lib("kernel/include/file.hrl").

start(FileName, ProcNum) ->
    [start(FileName, ProcNum, Fun) || Fun <- [fun read_file/3, fun pread_file/3]].


start(FileName, ProcNum, Fun) ->
    Start = now(),  

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),

    Fun(FileName, ProcNum, Collector),
    
    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) 
    end.

collect_loop(Main) -> collect_loop_1(Main, undefined, 0).
collect_loop_1(Main, ChunkNum, ChunkNum) -> 
    Main ! stop;
collect_loop_1(Main, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} ->
            collect_loop_1(Main, ChunkNumX, ProcessedNum);
        {seq, _Seq} ->
            collect_loop_1(Main, ChunkNum, ProcessedNum + 1)
    end.

get_chunk_size(FileName, ProcNum) ->
    {ok, #file_info{size=Size}} = file:read_file_info(FileName),
    Size div ProcNum.

read_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file_1(File, ChunkSize, 0, Collector).
    
read_file_1(File, ChunkSize, I, Collector) ->
    case file:read(File, ChunkSize) of
        eof ->
            file:close(File),
            Collector ! {chunk_num, I};
        {ok, _Bin} -> 
            Collector ! {seq, I},
            read_file_1(File, ChunkSize, I + 1, Collector)
    end.


pread_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    pread_file_1(FileName, ChunkSize, ProcNum, Collector).
       
pread_file_1(FileName, ChunkSize, ProcNum, Collector) ->
    [spawn(fun () ->
                   %% if it's the lastest chuck, read all bytes left, 
                   %% which will not exceed ChunkSize * 2
                   Length = if  I == ProcNum - 1 -> ChunkSize * 2;
                                true -> ChunkSize end,
                   {ok, File} = file:open(FileName, [raw, binary]),
                   {ok, _Bin} = file:pread(File, ChunkSize * I, Length),
                   Collector ! {seq, I},
                   file:close(File)
           end) || I <- lists:seq(0, ProcNum - 1)],
    Collector ! {chunk_num, ProcNum}.

The pread_file/3 is parallelized, it always opens new File process for each reading process instead of sharing one opened File process during all reading processes. The read_file/3 is non-parallelized.

To evaulate: (run at least two-time for each test to average disk/IO caches.)

$ erlc -smp file_pread.erl
$ erl -smp

1> file_pread:start("o100k.ap", 2).
time:     691.72 ms
time:      44.37 ms
[ok,ok]
2> file_pread:start("o100k.ap", 2).
time:      74.50 ms
time:      43.59 ms
[ok,ok]
3> file_pread:start("o1000k.ap", 2).
time:    1717.68 ms
time:     408.48 ms
[ok,ok]
4> file_pread:start("o1000k.ap", 2).
time:     766.00 ms
time:     393.71 ms
[ok,ok]
5> 

Let's compare the results for each file (we pick the second testing result of each), the speedup:

  • o100k.ap, 20M, 74.50 / 43.59 - 1= 70%
  • o1000k.ap, 200M, 766.00 / 393.71 - 1 = 95%

On another 4-CPU debian machine, with 4 processes, the best result I got:

4> file_pread:start("o1000k.ap", 4).
time:     768.59 ms
time:     258.57 ms
[ok, ok]
5>

The parallelized reading speedup 768.59 / 258.57 -1 = 197%

I've updated my first solution according to this testing, opening new File process for each reading process instead of sharing the same File process. Of cource, there are still issues that I pointed in Tim Bray's Erlang Exercise on Large Dataset Processing - Round II

Although the above result can also be achieved in other Languages, but I find that coding parallelization in Erlang is a pleasure.

Comments

1. igwan -- 2007-10-10 09:00

How does it compare with file:read_file (whole file at once) or raw file:read (in chunks but limited to one process accessing the file) ?

2. Caoyuan -- 2007-10-10 09:00

igwan,

file:read_file for the same o1000k.ap (200M size) took about 750ms, it's almost same as file_pread:read_file/3.

The [raw, binary] option affects very few in this case, it may speedup in some cases, but I'm not sure.

And file module has pread/2 in form of pread(IoDevice?, LocNums?), where LocNums? = [{Location, Number}], which acts according to the doc: "Performs a sequence of pread/3 in one operation, which is more efficient than calling them one at a time." This may work for a lot of pieces chunks to be read with known position/length pairs, but it reads via the same File process, so, it acts still in sequence instead of in parallel.