Reading File in Parallel in Erlang (Was Tim Bray's Erlang Exercise - Round III)
My first solution for Tim's exercise tried to read file in parallel, but I just realized by reading file module's source code, that file:open(FileName, Options) will return a process instead of IO device. Well, this means a lot:
- It's a process, so, when you request more data on it, you actually send message to it. Since you only send 2 integer: the offset and length, sending message should be very fast. But then, this process (File) will wait for receiving data from disk/io. For one process, the receiving is sequential rather than parallelized.
- If we look the processes in Erlang as ActiveObjects, which send/receive messages/data in async, since the receiving is sequential in one process, requesting/waiting around one process(or, object) is almost safe for parallelized programming, you usaully do not need to worry about lock/unlock etc. (except the outside world).
- We can open a lot of File processes to read data in parallel, the bound is the disk/IO and the os' resources limit.
I wrote some code to test file reading in parallel, discardng the disk cache, on my 2-core MacBook, reading file with two processes can speedup near 200% to one process.
The code:
-module(file_pread). -compile([native]). -export([start/2]). -include_lib("kernel/include/file.hrl"). start(FileName, ProcNum) -> [start(FileName, ProcNum, Fun) || Fun <- [fun read_file/3, fun pread_file/3]]. start(FileName, ProcNum, Fun) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), Fun(FileName, ProcNum, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. collect_loop(Main) -> collect_loop_1(Main, undefined, 0). collect_loop_1(Main, ChunkNum, ChunkNum) -> Main ! stop; collect_loop_1(Main, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, ChunkNumX, ProcessedNum); {seq, _Seq} -> collect_loop_1(Main, ChunkNum, ProcessedNum + 1) end. get_chunk_size(FileName, ProcNum) -> {ok, #file_info{size=Size}} = file:read_file_info(FileName), Size div ProcNum. read_file(FileName, ProcNum, Collector) -> ChunkSize = get_chunk_size(FileName, ProcNum), {ok, File} = file:open(FileName, [raw, binary]), read_file_1(File, ChunkSize, 0, Collector). read_file_1(File, ChunkSize, I, Collector) -> case file:read(File, ChunkSize) of eof -> file:close(File), Collector ! {chunk_num, I}; {ok, _Bin} -> Collector ! {seq, I}, read_file_1(File, ChunkSize, I + 1, Collector) end. pread_file(FileName, ProcNum, Collector) -> ChunkSize = get_chunk_size(FileName, ProcNum), pread_file_1(FileName, ChunkSize, ProcNum, Collector). pread_file_1(FileName, ChunkSize, ProcNum, Collector) -> [spawn(fun () -> %% if it's the lastest chuck, read all bytes left, %% which will not exceed ChunkSize * 2 Length = if I == ProcNum - 1 -> ChunkSize * 2; true -> ChunkSize end, {ok, File} = file:open(FileName, [raw, binary]), {ok, _Bin} = file:pread(File, ChunkSize * I, Length), Collector ! {seq, I}, file:close(File) end) || I <- lists:seq(0, ProcNum - 1)], Collector ! {chunk_num, ProcNum}.
The pread_file/3 is parallelized, it always opens new File process for each reading process instead of sharing one opened File process during all reading processes. The read_file/3 is non-parallelized.
To evaulate: (run at least two-time for each test to average disk/IO caches.)
$ erlc -smp file_pread.erl
$ erl -smp
1> file_pread:start("o100k.ap", 2).
time: 691.72 ms
time: 44.37 ms
[ok,ok]
2> file_pread:start("o100k.ap", 2).
time: 74.50 ms
time: 43.59 ms
[ok,ok]
3> file_pread:start("o1000k.ap", 2).
time: 1717.68 ms
time: 408.48 ms
[ok,ok]
4> file_pread:start("o1000k.ap", 2).
time: 766.00 ms
time: 393.71 ms
[ok,ok]
5>
Let's compare the results for each file (we pick the second testing result of each), the speedup:
- o100k.ap, 20M, 74.50 / 43.59 - 1= 70%
- o1000k.ap, 200M, 766.00 / 393.71 - 1 = 95%
On another 4-CPU debian machine, with 4 processes, the best result I got:
4> file_pread:start("o1000k.ap", 4).
time: 768.59 ms
time: 258.57 ms
[ok, ok]
5>
The parallelized reading speedup 768.59 / 258.57 -1 = 197%
I've updated my first solution according to this testing, opening new File process for each reading process instead of sharing the same File process. Of cource, there are still issues that I pointed in Tim Bray's Erlang Exercise on Large Dataset Processing - Round II
Although the above result can also be achieved in other Languages, but I find that coding parallelization in Erlang is a pleasure.
Posted at 12:49AM Oct 11, 2007 by dcaoyuan in Erlang | Comments[2]

How does it compare with file:read_file (whole file at once) or raw file:read (in chunks but limited to one process accessing the file) ?
Posted by igwan on October 11, 2007 at 04:45 AM PDT #
igwan,
file:read_file for the same o1000k.ap (200M size) took about 750ms, it's almost same as file_pread:read_file/3.
The [raw, binary] option affects very few in this case, it may speedup in some cases, but I'm not sure.
And file module has pread/2 in form of pread(IoDevice, LocNums), where LocNums = [{Location, Number}], which acts according to the doc:
"Performs a sequence of pread/3 in one operation, which is more efficient than calling them one at a time."
This may work for a lot of pieces chunks to be read with known position/length pairs, but it reads via the same File process, so, it acts still in sequence instead of in parallel.
Posted by Caoyuan on October 11, 2007 at 05:19 AM PDT #