Monday Jan 12, 2009

Async or Sync Log in Erlang - Limit the Load of Singleton Process

In a previous blog: A Case Study of Scalability Related "Out of memory" Crash in Erlang, I described a scalability related issue, which was coming from a singleton async logger process. A singleton Erlang process can not benefit from multiple-core scalability.

I then did some testing on disk_log, which fortunately has sync log functions: log/2 and blog/2. Whenever a process calls disk_log:blog/2, the requesting process will monitor the logger process until it returns a result or failure. The piece of code is like:

monitor_request(Pid, Req) ->
    Ref = erlang:monitor(process, Pid),
    Pid ! {self(), Req},
    receive 
	{'DOWN', Ref, process, Pid, _Info} ->
	    {error, no_such_log};
	{disk_log, Pid, Reply} ->
	    erlang:demonitor(Ref),
	    receive 
		{'DOWN', Ref, process, Pid, _Reason} ->
		    Reply
	    after 0 ->
                    Reply
	    end
    end.

Where Pid is the logger's process id.

This piece of code shows how to interactive synchronously between processes.

Under sync mode, there may be a lot of simultaneous requesting processes request the logger process to log message asynchronously, but each requesting process will wait the logger's work done before it requests next log, i.e. each requesting process requests the logger to log message synchronously. Upon this sync mode, we can guarantee the logger's message queue length won't exceed the number of simultaneous requesting processes.

I wrote some testing code: dlogger.erl

-module(dlogger).

-export([sync/2,
         async/2,
         proc_loop/4
        ]).

-define(LogName, blog).
-define(LogFile, "b.log").

% 100000, 10
sync(N_Msg, N_Procs) ->
   test(N_Msg, N_Procs, fun disk_log:blog/2).

async(N_Msg, N_Procs) ->
   test(N_Msg, N_Procs, fun disk_log:balog/2).


test(N_Msg, N_Procs, FunLog) ->
   MsgPerProc = round(N_Msg / N_Procs),
   Collector = init(N_Procs),
   LogPid = logger_pid(?LogName),
   io:format("logger pid: ~p~n", [LogPid]),
   Workers = [spawn(?MODULE, proc_loop, [Collector, MsgPerProc, LogPid, FunLog]) || _I <- lists:seq(1, N_Procs)],
   Start = now(),
   [Worker ! start || Worker <- Workers],
   %% don't terminate, wait here, until all tasks done.
   receive
      {sent_done, MaxMQLen, MaxMem} ->
         probe_logger(LogPid, MaxMQLen, MaxMem),
         [exit(Worker, kill) || Worker <- Workers],
         io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])
   end.

init(N_Procs) ->
   disk_log:close(?LogName),
   disk_log:open([{name, ?LogName},
                  {file, ?LogFile},
                  {format, external}]),
   MainPid = self(),
   Collector = spawn(fun() -> collect(MainPid, N_Procs, 0, 0, 0, 0) end),
   Collector.

collect(MainPid, N_Procs, N_Finished, _N_Msg, MaxMQLen, MaxMem) when N_Procs == N_Finished ->
   MainPid ! {sent_done, MaxMQLen, MaxMem};
collect(MainPid, N_Procs, N_Finished, N_Msg, MaxMQLen, MaxMem) ->
   receive
      {Pid, sent_done, MQLen, _Mem} ->
         io:format("==== QLen ~p. Proc ~p finished, total finished: ~p ====~n", [MQLen, Pid, N_Finished + 1]),
         collect(MainPid, N_Procs, N_Finished + 1, N_Msg, MaxMQLen, MaxMem);
      {Pid, I, MQLen, Mem} ->
         %io:format("Processed/Qlen ~p/~p msgs. Logger mem is ~p. proc ~p: No.~p msgs sent~n", [N_Msg + 1, MQLen, Mem, Pid, I]),
         MaxMQLen1 = if MQLen > MaxMQLen -> MQLen; true -> MaxMQLen end,
         MaxMem1 = if Mem > MaxMem -> Mem; true -> MaxMem end,
         collect(MainPid, N_Procs, N_Finished, N_Msg + 1, MaxMQLen1, MaxMem1)
   end.

proc_loop(Collector, N_Msg, LogPid, LogFun) ->
   receive
      start ->
         do_proc_work(Collector, N_Msg, LogPid, LogFun, do_log)
   end.

do_proc_work(Collector, I, LogPid, LogFun, WorkType) ->
   Date = httpd_util:rfc1123_date(calendar:local_time()),
   MQLen = logger_mqlen(LogPid),
   Mem = logger_mem(LogPid),
   Msg = io_lib:format("logged in ~p, logger qlen is ~p, total mem is ~p\n",
                       [self(), MQLen, Mem]),
   Msg1 = list_to_binary([<<"=INFO REPORT==== ">>, Date, <<" ===\n">>, Msg, <<"\n">>]),

   WorkType1 = if WorkType == do_log ->
                     LogFun(?LogName, Msg1),
                     io:format("", []), % sync the io between collector if any 
                     Collector ! {self(), I, MQLen, Mem},
                     io:format("sent one msg, qlen:~p, mem:~p~n", [MQLen, Mem]),
                     if I =< 1 ->
                           Collector ! {self(), sent_done, MQLen, Mem},
                           io:format("~p sent done, qlen:~p, mem:~p~n", [self(), MQLen, Mem]),
                           keep_live;
                        true -> do_log
                     end;
                  true -> keep_live
               end,
   do_proc_work(Collector, I - 1, LogPid, LogFun, WorkType1).

probe_logger(Pid, MaxMQLen, MaxMem) ->
   MQLen = logger_mqlen(Pid),
   Mem = logger_mem(Pid),
   MaxMQLen1 = if MQLen > MaxMQLen -> MQLen; true -> MaxMQLen end,
   MaxMem1 = if Mem > MaxMem -> Mem; true -> MaxMem end,
   io:format("qlen is ~p, max qlen is ~p, max mem is ~p~n", [MQLen, MaxMQLen1, MaxMem1]),
   if MQLen == 0 -> done;
      true ->
         timer:sleep(10),
         probe_logger(Pid, MaxMQLen, MaxMem)
   end.

%% === helper ===
logger_pid(Log) ->
   case disk_log_server:get_log_pids(Log) of
      undefined ->
         undefined;
      {local, Pid} ->
         Pid;
      {distributed, [Pid|_Pids]} ->
         Pid
   end.

logger_mqlen(undefined) -> 0;
logger_mqlen(Pid) ->
   case process_info(Pid, message_queue_len) of
      {message_queue_len, Val} when is_integer(Val) -> Val;
      _ -> 0
   end.

logger_mem(undefined) -> 0;
logger_mem(Pid) ->
   case process_info(Pid, memory) of
      {memory, Val} when is_integer(Val) -> Val;
      _ -> 0
   end.

You can always use process_info/2 or process_info/1 to probe the information of a process. In above code, we will probe the logger process's message queue length and memory via logger_mqlen/1 and logger_mem/1 when necessary.

I wrote the code as it, so I can create thousands of processes first, then each process will repeatedly request to log message MsgPerProc times, and keep alive after all messages have been sent.

And, to evaluate the actual task time, when all requesting processes have finished sending log messages, a probe_logger/1 function will confirm all messages in logger queue have been processed.

Here's the result:

> dlogger:sync(1000000, 1000) % 1 million log messages under 1000 requesting processes:
qlen is 0, max qlen is 999, max mem is 690,400
Time:  861286.24 ms

> dlogger:async(1000000, 1000) % 1 million log messages under 1000 requesting processes:
qlen is 0, max qlen is 68487, max mem is 75,830,616
Time: 2156351.45 ms

The performance in async mode is getting much worse comparing to sync mode. Under async mode, not only the elapsed time grew a lot, but also the max queue length reached to 68487, and the memory of logger process reached about 72M. Actually, after all messages had been sent, only a few messages had been processed by logger process. Since I kept 1000 processes alive, the logger process only shared very very poor proportion CPU cycles, the message processing procedure (log to disk buffer here) became very very slow, which caused the worse elapsed time. This case can be applied on any singleton process.

On the other side, under sync mode, the max queue length did not exceed the number of simultaneous requesting processes, here is 999 in case of 1000 simultaneous processes, and the max memory of logger process is reasonable in about 674K.

I'd like to emphasize the points:

  • Singleton process in Erlang can not benefit from multiple-core scalability.
  • Move code/job out of singleton process as much as possible to simultaneous processes, for example, before send the message to singleton process, done all pre-formatting/computing job.
  • Some times, you need to sync the interacting between processes, to limit the singleton process working load.

Thursday Jan 01, 2009

A Case Study of Scalability Related "Out of memory" Crash in Erlang

We are building a platform for message switching, in Erlang. Everything looks OK on stability and features. It actually has run more than half year with zero down. We tested its performance on our 2-core CPU machine before, and got about 140 transactions/second, it's good enough.

Then, we got a 8-core CPU machine several weeks ago, and we did same performance testing on it, to see the scalability. Since Erlang is almost perfect on scalability, you can image the result, yes, about 700 transactions/second now, scaled almost linear. Until it crashed with "out of memory" when million hits processed.

It left a very big "erl_crash.dump" file there, I had to dig the issue. My first guess was, were some remote requests (access db, access remote web service etc) timeout but the process itself was not timeout yet, and cause more and more processes kept in VM?

A quick grep "=proc:" erl_crash.dump showed that the total number of processes was about 980, which was reasonable for our case.

So, which process ate so many memory? A quick grep "Stack+head" erl_crash.dump showed that there was indeed a process with 285082125 size of Stack+head there.

Following this clue, I caught this process:

=proc:<0.4.0>
State: Garbing
Name: error_logger
Spawned as: proc_lib:init_p/5
Last scheduled in for: io_lib_format:pad_char/2
Spawned by: <0.1.0>
Started: Sun Apr  1 01:21:50 2012
Message queue length: 2086029
Number of heap fragments: 1234053
Heap fragment data: 281266956
Link list: [<0.27.0>, <0.0.0>, {from,<0.42.0>,#Ref<0.0.0.88>}]
Reductions: 72745575
Stack+heap: 285082125
OldHeap: 47828850
Heap unused: 121777661
OldHeap unused: 47828850
Program counter: 0x0764c66c (io_lib_format:pad_char/2 + 4)
CP: 0x0764c1b4 (io_lib_format:collect_cseq/2 + 124)

This process was error_logger, which is from OTP/Erlang standard lib: error_logger, writing received messages to log file or tty. The typical usage is:

error_logger:info_msg("~p:~p " ++ Format, [?MODULE, ?LINE] ++ Data))

Which will format Data to a String according to the Format string, and write it to tty or log file.

The above case showed the message queue length of process "error_logger" had reached 1234053, and the Stack+heap was 285082125, about 272M size.

So the cause may be, that the message queue could not be processed in time, the messages were crowded in error_logger's process and finally caused "out of memory". The bottle-neck was that when error_logger tried to format the message to String, Erlang VM was weak on processing them, which seemed to need a lot of CPU cycles.

In my previous blog, I talked about Erlang is bad on massive text processing. Erlang processes String/Text via List, which is obvious bottle-neck in Erlang now, with Erlang is getting much and much popular and more and more Erlang applications are written.

But, why this did not happen on our 2-core CPU machine? It's an interesting scalability related problem:

"error_logger" module will registered one and only one process to receive and handle all log messages. But Erlang VM's scheduler can not distribute ONE process to use multiple CPUs' computing ability. In our 2-core machine, the whole ability is about 140 transactions/second, the one process of "error_logger" just happened to have the power to handle corresponding log messages in time. Under 8-core CPUs machine, our platform scales to handle 700 transactions/second, but there is still only one process of "error_logger", which can not use 8-core CPUs' ability at all, and finally fail on it.

Erlang treats every process fairly (although you can change the priority manually), we can do a simple/quick evaluation:

1. 2-Core machine, keeping hits at 140 trans/second:
The number of simultaneous processes will be about 200, each process shares the CPU cycles: 1/200 * 2 Core = 1%

2. 8-Core machine, keeping hits at 700 trans/second:
The number of simultaneous processes will be about 980, each process shares the CPU cycles: 1/980 * 8 Core = 0.82%

So, the CPU cycles shared by error_logger process actually not increases.

BTW, I think error_logger should cut its message queue when can not process them in time (disk IO may also be slower than receiving messages).

Sunday Dec 28, 2008

CN Erlounge III

I attended CN Erlounge III last weekend, it was a 2-day conference. I did a presentation about Scala vs Erlang.

I met Jackyz who is one of the translators of Chinese version "Programming Erlang". And Aimin who is writing a Delphi module to support Erlang c-node and c-driver in Delphi.

There is a commercial network monitoring product using Erlang from a major telecom company in China. And our Mobile-Banking platform (in Erlang) is scheduled to launch at middle of January too.

I talked with Yeka and Diuera from Broadview, a leading publisher in IT in China, they are really interested in importing "Programming in Scala" to mainland China.

And many thanks to Shiwei Xu, who is heavy working on Erlang community in China, and took the place to organize this conference.

I gave some encouragements to younger developers on learning Erlang and reading "Programming Erlang", since I'm the oldest one in attendees :-). Erlang is one of the best pragmatic and clear languages to learn concurrent/parallel and functional programming, and the book, is a very thoughtful and philosophic one on these perceptions.

And I'd like to see "Programming in Scala" also appeals in China soon, Scala is another pragmatic language on solving real world problems and, the book, is also thoughtful and philosophic one on our real world on Types, OO and FP.

Of course, choosing Scala or Erlang for your real world project should depend on the requirements.

I may be back to Vancouver next month for a while. Oh, it will be the beginning of new year.

CN Erlounge III photos by krzycube

Wednesday Nov 26, 2008

Erlang Plugin for NetBeans - 0.17.0 Released

I'm pleased to announce Erlang plugin for NetBeans (ErlyBird) 0.17.0 is released.

This is a bug-fix release, and from now on, will be in form of NetBeans plugin.

NetBeans 6.5 is a requirement.

To download, please go to: https://sourceforge.net/project/showfiles.php?group_id=192439&package_id=226387&release_id=642911

To install:

  1. Open NetBeans, go to "Tools" -> "Plugins", click on "Downloaded" tab title, click on "Add Plugins..." button, choose the directory where the Erlang plugin are unzipped, select all listed *.nbm files, following the instructions. Restart IDE.
  2. Check/set your OTP path. From [Tools]->[Options], click on 'Miscellanous', then expand 'Erlang Installation', fill in the full path of your 'erl.exe' or 'erl' file. For instance: "C:/erl/bin/erl.exe"

When you open/create an Erlang project first time, the OTP libs will be indexed. The indexing time varies from 30 to 60 minutes depending on your computer.

Feedback and bug reports are welcome.

Tuesday Oct 28, 2008

RPC Server for Erlang, In Scala

There has been Java code in my previous blog: RPC Server for Erlang, In Java, I'm now try to rewrite it in Scala. With the pattern match that I've been familiar with in Erlang, write the Scala version is really a pleasure. You can compare it with the Java version.

I do not try Scala's actor lib yet, maybe late.

And also, I should port Erlang's jinterface to Scala, since OtpErlangTuple, OtpErlangList should be written in Scala's Tuple and List.

The code is auto-formatted by NetBeans' Scala plugin, and the syntax highlighting is the same as in NetBeans, oh, not exactly.

/*
 * RpcMsg.scala
 *
 */
package net.lightpole.rpcnode

import com.ericsson.otp.erlang.{OtpErlangAtom, OtpErlangList, OtpErlangObject, OtpErlangPid, OtpErlangRef, OtpErlangTuple}

class RpcMsg(val call:OtpErlangAtom,
             val mod :OtpErlangAtom,
             val fun :OtpErlangAtom,
             val args:OtpErlangList,
             val user:OtpErlangPid,
             val to  :OtpErlangPid,
             val tag :OtpErlangRef) {
}

object RpcMsg {
   
   def apply(msg:OtpErlangObject) : Option[RpcMsg] = msg match {
      case tMsg:OtpErlangTuple =>
         tMsg.elements() match {
            /* {'$gen_call', {To, Tag}, {call, Mod, Fun, Args, User}} */
            case Array(head:OtpErlangAtom, from:OtpErlangTuple, request:OtpErlangTuple) =>
               if (head.atomValue.equals("$gen_call")) {
                  (from.elements, request.elements) match {
                     case (Array(to :OtpErlangPid,
                                 tag:OtpErlangRef), Array(call:OtpErlangAtom,
                                                          mod :OtpErlangAtom,
                                                          fun :OtpErlangAtom,
                                                          args:OtpErlangList,
                                                          user:OtpErlangPid)) =>
                        if (call.atomValue.equals("call")) {
                           Some(new RpcMsg(call, mod, fun, args, user, to, tag))
                        } else None
                     case _ => None
                  }
               } else None
            case _ => None
         }
      case _ => None
   }
}

/*
 * RpcNode.scala
 *
 * To change this template, choose Tools | Template Manager
 * and open the template in the editor.
 */
package net.lightpole.rpcnode

import com.ericsson.otp.erlang.{OtpAuthException, OtpConnection, OtpErlangAtom, OtpErlangExit, OtpErlangObject, OtpErlangString, OtpErlangTuple, OtpSelf}
import java.io.IOException
import java.net.InetAddress
import java.net.UnknownHostException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.logging.Level
import java.util.logging.Logger


trait Cons {
   val OK     = new OtpErlangAtom("ok")
   val ERROR  = new OtpErlangAtom("error")
   val STOPED = new OtpErlangAtom("stoped")
   val THREAD_POOL_SIZE = 100
}

/**
 *
 * Usage:
 *   $ erl -sname clientnode -setcookie mycookie
 *   (clientnode@cmac)> rpc:call(xnodename@cmac, xnode, parse, []).
 *
 * @author Caoyuan Deng
 */
abstract class RpcNode(xnodeName:String, cookie:String, threadPoolSize:Int) extends Cons {
   
   def this(xnodeName:String, cookie:String) = this(xnodeName, cookie, 100)
   
   private var xSelf:OtpSelf = _
   private var sConnection:OtpConnection = _
   private var execService:ExecutorService = Executors.newFixedThreadPool(threadPoolSize)
   private val flags = Array(0)

   startServerConnection(xnodeName, cookie)
   loop
    
   def startServerConnection(xnodeName:String, cookie:String ) = {
      try {
         xSelf = new OtpSelf(xnodeName, cookie);
         // The node then publishes its port to the Erlang Port Mapper Daemon.
         // This registers the node name and port, making it available to a remote client process.
         // When the port is published it is important to immediately invoke the accept method.
         // Forgetting to accept a connection after publishing the port would be the programmatic
         // equivalent of false advertising
         val registered = xSelf.publishPort();
         if (registered) {
            System.out.println(xSelf.node() + " is ready.");
            /**
             * Accept an incoming connection from a remote node. A call to this
             * method will block until an incoming connection is at least
             * attempted.
             */
            sConnection = xSelf.accept();
         } else {
            System.out.println("There should be an epmd running, start an epmd by running 'erl'.");
         }
      } catch {
         case ex:IOException =>
         case ex:OtpAuthException =>
      }
   }

   def loop : Unit = {
      try {
         val msg = sConnection.receive
            
         val task = new Runnable() {
            override
            def run = RpcMsg(msg) match {
               case None =>
                  try {
                     sConnection.send(sConnection.peer.node, new OtpErlangString("unknown request"));
                  } catch {
                     case ex:IOException =>
                  }
               case Some(call) =>
                  val t0 = System.currentTimeMillis

                  flag(0) = processRpcCall(call)

                  System.out.println("Rpc time: " + (System.currentTimeMillis() - t0) / 1000.0)
            }
         }

         execService.execute(task)

         if (flag(0) == -1) {
            System.out.println("Exited")
         } else loop
         
      } catch {
         case ex:IOException => loop
         case ex:OtpErlangExit =>
         case ex:OtpAuthException =>
      }
   }

   /** @throws IOException */
   def sendRpcResult(call:RpcMsg, head:OtpErlangAtom, result:OtpErlangObject) = {
      val tResult = new OtpErlangTuple(Array(head, result))

      // Should specify call.tag here
      val msg = new OtpErlangTuple(Array(call.tag, tResult))
      // Should specify call.to here
      sConnection.send(call.to, msg, 1024 * 1024 * 10)
   }

   /** @abstact */
   def processRpcCall(call:RpcMsg) : Int
}

object RpcCall {   
   def getShortLocalHost : String = getLocalHost(false)

   def getLongLocalHost : String = getLocalHost(true)

   def getLocalHost(longName:Boolean) : String = {
      var localHost = "localhost"
      try {
         localHost = InetAddress.getLocalHost.getHostName;
         if (!longName) {
            /* Make sure it's a short name, i.e. strip of everything after first '.' */
            val dot = localHost.indexOf(".")
            if (dot != -1) localHost = localHost.substring(0, dot)
         }
      } catch {
         case ex:UnknownHostException =>
      }

      localHost
   }
}

Wednesday Oct 15, 2008

An Example Syntax in Haskell, Erlang and Scala

>>> Updated Oct 16:
I found some conventions of coding style make code more readable for Scala. For example, use

{ x => something } instead of (x => dosomething)

for anonymous function; Use x, y, z as the names of arguments of anonymous functions; Put all modifiers to the ahead line etc. That makes me can pick up these functions by eyes quickly.
======

It's actually my first time to write true Scala code, sounds strange? Before I write Scala code, I wrote a Scala IDE first, and am a bit familiar with Scala syntax now. And I've got about 1.5 year experience on Erlang, it began after I wrote ErlyBird.

Now it's time to write some real world Scala code, I choose to port Paul R. Brown's perpubplat blog engine, which is written in Haskell. And I have also some curiosities on how the syntax looks in Erlang, so I tried some Erlang code too.

Here's some code piece of entry module in Haskell, Erlang and Scala:

Original Haskell code piece

empty :: Model
empty = Model M.empty M.empty M.empty [] 0

build_model :: [Item] -> Model
build_model [] = empty
build_model items = Model (map_by permatitle sorted_items)
                    bid                    
                    (build_child_map sorted_items)
                    (sorted_items)
                    (n+1)
    where
      sorted_items = sort_by_created_reverse items
      bid = (map_by internal_id sorted_items)
      n = fst . M.findMax $ bid

build_child_map :: [Item] -> M.Map Int [Int]
build_child_map i = build_child_map_ (M.fromList $ (map (\x -> (internal_id x,[])) i)) i

-- Constructed to take advantage of the input being in sorted order.
build_child_map_ :: M.Map Int [Int] -> [Item] -> M.Map Int [Int]
build_child_map_ m [] = m
build_child_map_ m (i:is) = if (parent i == Nothing) then
                                build_child_map_ m is
                            else
                                build_child_map_ (M.insertWith (++) (unwrap $ parent i) [internal_id i] m) is

sort_by_created_reverse :: [Item] -> [Item]
sort_by_created_reverse = sortBy created_sort_reverse

created_sort_reverse :: Item -> Item -> Ordering
created_sort_reverse a b = compare (created b) (created a)

In Erlang:

% @spec empty :: Model
empty() -> #model{}.

% @spec build_model :: [Item] -> Model
build_model([]) -> empty();
build_model(Is) -> 
    SortedIs = sort_by_created_reverse(Is),
    Bid = dict:from_list([{I#item.internal_id, I} || I <- SortedIs]),
    N = lists:max(dict:fetch_keys(Bid)),
    
    #model{by_permatitle = dict:from_list([{X#item.permatitle, X} || X <- SortedIs]),
           by_int_id = Bid,               
           child_map = build_child_map(SortedIs),
           all_items = SortedIs,
           next_id = N + 1}.


% @spec build_child_map :: [Item] -> M.Map Int [Int]
build_child_map(Is) -> build_child_map_(dict:from_list(lists:map(fun (X) -> {X#item.internal_id, []} end), Is), Is).

%% Constructed to take advantage of the input being in sorted order.
% @spec build_child_map_ :: M.Map Int [Int] -> [Item] -> M.Map Int [Int]
build_child_map_(D, []) -> D;
build_child_map_(D, [I|Is]) -> 
    case I#item.parent of 
        undefined ->                
            build_child_map_(D, Is);
        P_Id ->
            build_child_map_(dict:append(unwrap(P_Id), I#item.internal_id, D), Is)
    end.

% @spec sort_by_created_reverse :: [Item] -> [Item]
sort_by_created_reverse(Is) -> lists:sort(fun created_sort_reverse/2, Is).

% @spec created_sort_reverse :: Item -> Item -> Ordering
created_sort_reverse(A, B) -> compare(B#item.created, A#item.created).

In Scala

object Entry {
    def empty = new Model()

    def build_model(is:List[Item]) = is match {
        case Nil => empty
        case _ =>
            val sortedIs = sortByCreatedReverse(is)
            val bid = Map() ++ sortedIs.map{ x => (x.internalId -> x) }
            val n = bid.keys.toList.sort{ (x, y) => x > y }.head // max

            new Model(Map() ++ sortedIs.map{ x => (x.permatitle -> x) },
                      bid,
                      buildChildMap(sortedIs),
                      sortedIs,
                      n + 1)
    }

    def buildChildMap(is:List[Item]) = buildChildMap_(Map() ++ is.map{ x => (x.internalId -> Nil) }, is)

    private
    def buildChildMap_(map:Map[Int, List[Int]], is:List[Item]) = {
        map ++ (for (i <- is if i.parent.isDefined; pid = i.parent.get; cids = map.getOrElse(pid, Nil)) 
                yield (pid -> (cids + i.internalId)))
    }

    def sortByCreatedReverse(is:List[Item]) = is.sort{ (x, y) => x.created before y.created }
}

>>> Updated Oct 16: Per Martin's suggestion, the above code can be written more Scala style (the reasons are in the comments). Thanks, Martin.

object Entry {
   def empty = new Model()

   def build_model(is:List[Item]) = is match {
       case Nil => empty
       case _ =>
           val sortedIs = sortByCreatedReverse(is)
           val bid = Map() ++ sortedIs.map{ x => (x.internalId -> x) }
           // use predefined max in Iterable
           val n = Iterable.max(bid.keys.toList)   

           new Model(Map() ++ sortedIs.map{ x => (x.permatitle -> x) },
                     bid,
                     buildChildMap(sortedIs),
                     sortedIs,
                     n + 1)
   }

   // you can use a wildcard anonymousfunction here
   def buildChildMap(is:List[Item]) = buildChildMap_(Map() ++ is.map(_.internalId -> Nil), is)

   private
   def buildChildMap_(map:Map[Int, List[Int]], is:List[Item]) =
       map ++ {  // rewrite for so that definitions go into body -- it's more efficient.
           for (i <- is if i.parent.isDefined) yield {
               val pid = i.parent.get
               val cids = map.getOrElse(pid, Nil)
               pid -> (cids + i.internalId)
           }
       }
       
   // you can use a wildcard anonymous function here
   def sortByCreatedReverse(is:List[Item]) = is.sort{ _.created before _.created } 
}
======

I use ErlyBird for Erlang coding, and Scala for NetBeans for Scala coding. The experience seems that IDE is much aware of Scala, and I can get the typing a bit faster than writing Erlang code.

If you are not familiar with all these 3 languages, which one looks more understandable?

Saturday Oct 11, 2008

RPC Server for Erlang, In Java

We are using Erlang to do some serious things, one of them is indeed part of a banking system. Erlang is a perfect language in concurrent and syntax (yes, I like its syntax), but lacks static typing (I hope new added -spec and -type attributes may be a bit helping), and, is not suitable for processing massive data (performance, memory etc). I tried parsing a 10M size XML file with xmerl, the lib for XML in OTP/Erlang, which causes terrible memory disk-swap and I can never get the parsed tree out.

It's really a need to get some massive data processed in other languages, for example, C, Java etc. That's why I tried to write RPC server for Erlang, in Java.

There is a jinterface lib with OTP/Erlang, which is for communication between Erlang and Java. And there are docs for how to get it to work. But, for a RPC server that is called from Erlang, there are still some tips for real world:

1. When you send back the result to caller, you need set the result as a tuple, with caller's tag Ref as the first element, and the destination should be the caller's Pid. It's something like:

OtpErlangTuple msg = new OtpErlangTuple(new OtpErlangObject[] {call.tag, tResult});
sConnection.send(call.to, msg); 

where, call.tag is a OtpErlangRef, and tResult can be any OtpErlangObject, call.to is a OtpErlangPid.

2. If you need to send back a massive data back to caller, the default buffer size of OtpErlangOutputStream is not good, I set it to 1024 * 1024 * 10

3. Since there may be a lot of concurrent callers call your RPC server, you have to consider the concurrent performance of your server, I choose using thread pool here.

The RPC server in Java has two class, RpcNode.java, and RpcMsg.java:

package net.lightpole.rpcnode;

import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangPid;
import com.ericsson.otp.erlang.OtpErlangRef;
import com.ericsson.otp.erlang.OtpErlangTuple;

/**
 *
 * @author Caoyuan Deng
 */
public class RpcMsg {

    public OtpErlangAtom call;
    public OtpErlangAtom mod;
    public OtpErlangAtom fun;
    public OtpErlangList args;
    public OtpErlangPid user;
    public OtpErlangPid to;
    public OtpErlangRef tag;

    public RpcMsg(OtpErlangTuple from, OtpErlangTuple request) throws IllegalArgumentException {
        if (request.arity() != 5) {
            throw new IllegalArgumentException("Not a rpc call");
        }

        /* {call, Mod, Fun, Args, userPid} */
        if (request.elementAt(0) instanceof OtpErlangAtom && ((OtpErlangAtom) request.elementAt(0)).atomValue().equals("call") &&
                request.elementAt(1) instanceof OtpErlangAtom &&
                request.elementAt(2) instanceof OtpErlangAtom &&
                request.elementAt(3) instanceof OtpErlangList &&
                request.elementAt(4) instanceof OtpErlangPid &&
                from.elementAt(0) instanceof OtpErlangPid &&
                from.elementAt(1) instanceof OtpErlangRef) {

            call = (OtpErlangAtom) request.elementAt(0);
            mod = (OtpErlangAtom) request.elementAt(1);
            fun = (OtpErlangAtom) request.elementAt(2);
            args = (OtpErlangList) request.elementAt(3);
            user = (OtpErlangPid) request.elementAt(4);
            to = (OtpErlangPid) from.elementAt(0);
            tag = (OtpErlangRef) from.elementAt(1);

        } else {
            throw new IllegalArgumentException("Not a rpc call.");
        }
    }

    /* {'$gen_call', {To, Tag}, {call, Mod, Fun, Args, User}} */
    public static RpcMsg tryToResolveRcpCall(OtpErlangObject msg) {
        if (msg instanceof OtpErlangTuple) {
            OtpErlangTuple tMsg = (OtpErlangTuple) msg;
            if (tMsg.arity() == 3) {
                OtpErlangObject[] o = tMsg.elements();
                if (o[0] instanceof OtpErlangAtom && ((OtpErlangAtom) o[0]).atomValue().equals("$gen_call") &&
                        o[1] instanceof OtpErlangTuple && ((OtpErlangTuple) o[1]).arity() == 2 &&
                        o[2] instanceof OtpErlangTuple && ((OtpErlangTuple) o[2]).arity() == 5) {
                    OtpErlangTuple from = (OtpErlangTuple) o[1];
                    OtpErlangTuple request = (OtpErlangTuple) o[2];

                    try {
                        return new RpcMsg(from, request);
                    } catch (IllegalArgumentException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
        
        return null;
    }
}

package net.lightpole.rpcnode;

import com.ericsson.otp.erlang.OtpAuthException;
import com.ericsson.otp.erlang.OtpConnection;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangExit;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangString;
import com.ericsson.otp.erlang.OtpErlangTuple;
import com.ericsson.otp.erlang.OtpSelf;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * Usage:
 *   $ erl -sname clientnode -setcookie mycookie
 *   (clientnode@cmac)> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).
 * 
 * @author Caoyuan Deng
 */
public abstract class RpcNode {

    public static final OtpErlangAtom OK = new OtpErlangAtom("ok");
    public static final OtpErlangAtom ERROR = new OtpErlangAtom("error");
    public static final OtpErlangAtom STOPED = new OtpErlangAtom("stoped");
    private static final int THREAD_POOL_SIZE = 100;
    private OtpSelf xSelf;
    private OtpConnection sConnection;
    private ExecutorService execService;

    public RpcNode(String xnodeName, String cookie) {
        this(xnodeName, cookie, THREAD_POOL_SIZE);
    }

    public RpcNode(String xnodeName, String cookie, int threadPoolSize) {
        execService = Executors.newFixedThreadPool(threadPoolSize);

        startServerConnection(xnodeName, cookie);
        loop();
    }

    private void startServerConnection(String xnodeName, String cookie) {
        try {
            xSelf = new OtpSelf(xnodeName, cookie);
            boolean registered = xSelf.publishPort();
            if (registered) {
                System.out.println(xSelf.node() + " is ready.");
                /**
                 * Accept an incoming connection from a remote node. A call to this
                 * method will block until an incoming connection is at least
                 * attempted.
                 */
                sConnection = xSelf.accept();
            } else {
                System.out.println("There should be an epmd running, start an epmd by running 'erl'.");
            }
        } catch (IOException ex) {
            Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
        } catch (OtpAuthException ex) {
            Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private void loop() {
        while (true) {
            try {
                final int[] flag = {0};

                final OtpErlangTuple msg = (OtpErlangTuple) sConnection.receive();

                Runnable task = new Runnable() {

                    public void run() {
                        RpcMsg call = RpcMsg.tryToResolveRcpCall(msg);

                        if (call != null) {
                            long t0 = System.currentTimeMillis();

                            flag[0] = processRpcCall(call);

                            System.out.println("Rpc time: " + (System.currentTimeMillis() - t0) / 1000.0);
                        } else {
                            try {
                                sConnection.send(sConnection.peer().node(), new OtpErlangString("unknown request"));
                            } catch (IOException ex) {
                                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
                            }
                        }
                    }
                };

                execService.execute(task);

                if (flag[0] == -1) {
                    System.out.println("Exited");
                    break;
                }

            } catch (OtpErlangExit ex) {
                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
            } catch (IOException ex) {
                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
            } catch (OtpAuthException ex) {
                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    protected void sendRpcResult(RpcMsg call, OtpErlangAtom head, OtpErlangObject result) throws IOException {
        OtpErlangTuple tResult = new OtpErlangTuple(new OtpErlangObject[] {head, result});

        // Should specify call.tag here
        OtpErlangTuple msg = new OtpErlangTuple(new OtpErlangObject[]{call.tag, tResult});
        // Should specify call.to here
        sConnection.send(call.to, msg, 1024 * 1024 * 10); 
    }

    public abstract int processRpcCall(RpcMsg call);
    

    // ------ helper
    public static String getShortLocalHost() {
        return getLocalHost(false);
    }

    public static String getLongLocalHost() {
        return getLocalHost(true);
    }

    private static String getLocalHost(boolean longName) {
        String localHost;
        try {
            localHost = InetAddress.getLocalHost().getHostName();
            if (!longName) {
                /* Make sure it's a short name, i.e. strip of everything after first '.' */
                int dot = localHost.indexOf(".");
                if (dot != -1) {
                    localHost = localHost.substring(0, dot);
                }
            }
        } catch (UnknownHostException e) {
            localHost = "localhost";
        }

        return localHost;
    }
}

As you can see, the RpcNode is an abstract class, by implement int processRpcCall(RpcMsg call), you can get your what ever wanted features. For example:

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package net.lightpole.xmlnode;

import basexnode.Main;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangString;
import java.io.IOException;
import net.lightpole.rpcnode.RpcMsg;
import net.lightpole.rpcnode.RpcNode;

/**
 *
 * @author dcaoyuan
 */
public class MyNode extends RpcNode {

    public MyNode(String xnodeName, String cookie, int threadPoolSize) {
        super(xnodeName, cookie, threadPoolSize);
    }

    @Override
    public int processRpcCall(RpcMsg call) {
        final String modStr = call.mod.atomValue();
        final String funStr = call.fun.atomValue();
        final OtpErlangList args = call.args;

        try {
            OtpErlangAtom head = ERROR;
            OtpErlangObject result = null;

            if (modStr.equals("xnode") && funStr.equals("stop")) {
                head = OK;
                sendRpcResult(call, head, STOPED);
                return -1;
            }

            if (modStr.equals("System") && funStr.equals("currentTimeMillis")) {
                head = OK;
                long t = System.currentTimeMillis();
                result = new OtpErlangLong(t);
            } else {
                result = new OtpErlangString("{undef,{" + modStr + "," + funStr + "}}");
            }

            if (result == null) {
                result = new OtpErlangAtom("undefined");
            }

            sendRpcResult(call, head, result);
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (Exception ex) {
        }

        return 0;
    }
}

I tested MyNode by:

$ erl -sname clientnode -setcookie mycookie
...
(clientnode@cmac)> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).

And you can try to test its concurrent performance by:

%% $ erl -sname clientnode -setcookie mycookie
%% > xnode_test:test(10000)

-module(xnode_test).

-export([test/1]).

test(ProcN) ->
    Workers = [spawn_worker(self(), fun rpc_parse/1, {})        
     	       || I <- lists:seq(0, ProcN - 1)],
    Results = [wait_result(Worker) || Worker <- Workers].

rpc_parse({}) ->
    rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).

spawn_worker(Parent, F, A) ->
    erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end).

wait_result({Pid, Ref}) ->
    receive
        {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end;
        {'DOWN', Ref, _, _, Reason} -> exit(Reason)
    end.

I spawned 10000 calls to it, and it run smoothly.

I'm also considering to write a more general-purpose RPC server in Java, which can dynamically call any existed methods of Java class.

Sunday Feb 24, 2008

Erlang for NetBeans (ErlyBird) Recent Updates

Erlang for NetBeans (ErlyBird) has been put on the NetBeans' mercurial trunk. I added a changelog page on http://wiki.netbeans.org/ErlangChangelog, where you can learn the latest progressing.

Here is a summary of recent updates:

20080223

  • Instant rename, or refactoring for local vars and functions (put caret on var or function name, press CTRL+R)
  • Fixed: syntax broken for packaged import attribute
  • Fixed: syntax broken for wild attribute
  • Completion suggestions will search on project's own paths only
  • Track GSF changes: reindex performance was improved a lot; Can live with other GSF based language support now (Ruby, Groovy etc)

ErlyBird 0.16.0 will be available soon.

Sunday Jan 20, 2008

On Travel

I'm in Vancouver Airport right now, using the new free Wi-Fi service here. After four-day trip to San Francisco, I'll fly to China for the traditional Spring Festival.

I met friends in San Francisco, we are developing something using Erlang as I mentioned before. What we' are building is somehow a "switch" for content, we've successfully got a lot of different content sources to be "switched" to standard Atom/Rss etc, with a new designed template language (based in Erlang). Actually It's a pleasure to write these switching code in Erlang (Parse, Map, Mashup) with the pattern match syntax and xmerl lib.

Tuesday Nov 13, 2007

Regexp Syntax in Pattern Match?

Pattern match in Erlang is very useful, but it has some limits, for example, to match something like:

\d\d\dx/\d\d\d\d/\d\d/\d\d/

I have to write the code as:

<<_:S/binary, C1,C2,C3,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,_/binary>> when
     C1 > 47, C1 < 58, C2 > 47, C2 < 58, C3 > 47, C3 < 58,
     Y1 > 47, Y1 < 58, Y2 > 47, Y2 < 58, Y3 > 47, Y3 < 58, Y4 > 47, Y4 < 58,
     M1 > 47, M1 < 58, M2 > 47, M2 < 58, D1 > 47, D2 < 58, D2 > 47, D2 < 58

But life can be simple, by using parse_transform, we can write above code to:

<<_:S/binary,d,d,d,$x,$/,d,d,d,d,$/,d,d,$/,d,d,$/,_/binary>>

Where d is digital. The parse_trasnform can process the AST tree to make the true code.

And more, since current erl_parse supports any atom in binary skeleton, we can write pattern match as:

<<_:S/binary,'[a-z]','[^abc]','var[0-9]',$x,$/,d,d,d,d,$/,d,d,$/,d,d,$/,_/binary>>

Will somebody write such a parse_transform?

Tuesday Nov 06, 2007

Tim Bray's Erlang Exercise "WideFinder" - After Find Wide

===>>> Updated Nov 7:
A new version tbray9a.erl which uses ets instead of dict, with 116 LoC, took about 3.8 sec on 5 million lines log file, 0.93 sec on 1 million lines file on my 4-core linux box.

* Results on T5120, an 8-core 1.4 GHz machine with 2 integer instruction threads per core and support for 8 thread contexts per core. Solaris thinks it sees 64 CPUs:

Schedulers#Elapsed(s)User(s)System(s)(User+System)/Elapsed
137.5835.517.821.15
220.1435.318.282.16
411.8135.378.253.69
87.6335.288.335.72
165.6036.088.277.92
325.2936.648.118.46
645.4536.798.238.26
1285.2636.758.398.58

When schedulers was 16, (User+System)/Elapsed was 7.92, and the elapsed time reached 5.60 sec (near the best), so, the 8-core computing ability and 2 x 8 =16 integer instruction threads ability almost reached the maxima. The 8 x 8 thread contexts seemed to do less help on gaining more performance improvement.

It seems that T5120 is a box with 8-core parallel computing ability and 16-thread for scheduler?

On my 4-core linux box, the slowest time (1 scheduler) vs the fastest time (128 schedulers) was 6.627/3.763 = 1.76. On this T5120, was 37.58/5.26 = 7.14. So, with the 8-core and 16-integer-instruction-thread combination, the T5120 is pretty good on parallel computing.

An interesting point is, when schedulers increased, Elpased time dropped along with User time and System time keeping almost constancy. This may because I separated the parallelized / sequential part completely in my code.

* Results on 2.80Ghz 4-core Intel xeon linux box (5 million lines log file):

Schedulers#Elapsed(s)User(s)System(s)(User+System)/Elapsed
16.6275.3564.2481.45
24.4866.1763.9362.25
44.2998.9894.1563.06
83.9609.6293.6443.35
163.8269.1013.6963.34
323.8589.0293.8403.34
643.7638.8013.8203.35
1283.9209.1373.9803.35


========

I'm a widefinder these days, and after weeks found wide, I worte another concise and fast widefinder tbray9.erl, which is based on Steve, Anders and my previous code, with Boyer-Moore searching (It seems Python's findall uses this algorithm) and parallelized file reading. It's in 120 LoC (a bit shorter than Fredrik Lundh's wf-6.py), took about 1 sec for 1 million lines log file, and 5.2 sec for 5 million lines on my 4-core linux box. Got 5.29 sec on T5120 per Tim's testing.

To evaluate:

erlc -smp tbray9.erl
erl -smp +A 1024 +h 10240 -noshell -run tbray9 start o1000k.ap

BTW since I use parallelized io heavily, by adding flag +A 1024, the code can get 4.3 sec for 5 million lines log file on my 4-core linux box.

Binary efficiency in Erlang is an interesting topic, except some tips I talked about in previouse blog, it seems also depending on binary size, memory size etc., The best buffer size for my code seems to be around 20000k to 80000k, which is the best range on my 4-core linux box and T5120, but it may vary for different code.

Note: There is a maximum element size limit of 2^27 - 1 (about 131072k) for binary pattern matching in current Erlang, this would be consistent with using a 32-bit word to store the size value (with 4 of those bits used for a type identifier and 1 bit for a sign indicator) (For this topic, please see Philip Robinson's blog). So, the buffer size can not be great than 131072k.

I. Boyer-Moore searching

Thanks to Steve and Anders, they've given out a concise Boyer-Moore searching algorithm in Erlang, I can modify it a bit to get a general BM searching module for ASCII encoded binary:

%% Boyer-Moore searching on ASCII encoded binary
-module(bmsearch).
-export([compile/1, match/3]).

-record(bmCtx, {pat, len, tab}).

compile(Str) ->
    Len = length(Str),
    Default = dict:from_list([{C, Len} || C <- lists:seq(1, 255)]),
    Dict = set_shifts(Str, Len, 1, Default),
    Tab = list_to_tuple([Pos || {_, Pos} <- lists:sort(dict:to_list(Dict))]),
    #bmCtx{pat = lists:reverse(Str), len = Len, tab = Tab}.

set_shifts([], _, _, Dict) -> Dict;
set_shifts([C|T], StrLen, Pos, Dict) ->
    set_shifts(T, StrLen, Pos + 1, dict:store(C, StrLen - Pos, Dict)).

%% @spec match(Bin, Start, #bmCtx) -> {true, Len} | {false, SkipLen}
match(Bin, S, #bmCtx{pat=Pat, len=Len, tab=Tab}) -> 
    match_1(Bin, S + Len - 1, Pat, Len, Tab, 0).
match_1(Bin, S, [C|T], Len, Tab, Count) ->
    <<_:S/binary, C1, _/binary>> = Bin,
    case C1 of
        C -> 
            match_1(Bin, S - 1, T, Len, Tab, Count + 1);
        _ ->    
            case element(C1, Tab) of
                Len -> {false, Len};
                Shift when Shift =< Count -> {false, 1};
                Shift -> {false, Shift - Count}
            end
    end;
match_1(_, _, [], Len, _, _) -> {true, Len}.

Usage:

> Pattern = bmsearch:compile("is a").
{bmCtx,"a si",4,{4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,...}}
> Bin = <<"this is a test">>.
<<"this is a test">>
> bmsearch:match(Bin, 1, Pattern).
{false,1}
> bmsearch:match(Bin, 5, Pattern).
{true, 4}
> bmsearch:match(Bin, 7, Pattern).
{false, 4}

II. Reading file in parallel and scan

To read file in parallel, we should open a new file handle for each process. To resolve the line break bound, we just split each chunk to first line (Head), line-bounded Data, and last line (Tail), and mark Head, Tail with the serial number as {I * 10, Head}, {I * 10 + 1 Tail}, so we can join all pending segments (Head and Tail of each chunk) later in proper order.

scan_file({FileName, Size, I, BmCtx}) ->
    {ok, File} = file:open(FileName, [raw, binary]),
    {ok, Bin} = file:pread(File, Size * I, Size),
    file:close(File),
    HeadL = split_on_first_newline(Bin),
    TailS = split_on_last_newline(Bin),
    DataL = TailS - HeadL,
    <<Head:HeadL/binary, Data:DataL/binary, Tail/binary>> = Bin,
    {scan_chunk({Data, BmCtx}), {I * 10, Head}, {I * 10 + 1, Tail}}.

III. Spawn workers

Luke's spawn_worker are two small functions, they are very useful, stable and good abstract on processing workers:

spawn_worker(Parent, F, A) ->
    erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end).

wait_result({Pid, Ref}) ->
    receive
        {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end;
        {'DOWN', Ref, _, _, Reason} -> exit(Reason)
    end.

So, I can start a series of workers simply by:

read_file(FileName, Size, ProcN, BmCtx) ->
    [spawn_worker(self(), fun scan_file/1, {FileName, Size, I, BmCtx})        
     || I <- lists:seq(0, ProcN - 1)].

And then collect the results by:

    Results = [wait_result(Worker) || Worker <- read_file(FileName, ?BUFFER_SIZE, ProcN1, BmCtx)],

In this case, returning Results is a list of {Dict, {Seq1, Head}, {Seq2, Tail}}

IV. Concat segments to a binary and scan it

Each chunk is slipt to Head (first line), line-bounded Data, and Tail (last line). The Head and Tail segments are pending for further processing. After all workers finished scanning Data (got a Dict), we can finally sort these pending segments by SeqNum, concat and scan them in main process.

Unzip the results to Dicts and Segs, sort Segs by SeqNum:

    {Dicts, Segs} = lists:foldl(fun ({Dict, Head, Tail}, {Dicts, Segs}) ->
                                        {[Dict | Dicts], [Head, Tail | Segs]}
                                end, {[], []}, Results),
    Segs1 = [Seg || {_, Seg} <- lists:keysort(1, Segs)],    

The sorted Segments is a list of binary, list_to_binary/1 can concat them to one binary efficently, you do not need to care about if it's a deep list, they will be flatten automatically:

    Dict = scan_chunk({list_to_binary(Segs1), BmCtx}),

Conclution

Thinking in parallel is fairly simple in Erlang, right? Most of the code can run in one process, and if you want to spawn them for parallel, you do not need to modify the code too much. In this case, scan_file/4 is sequential, which just return all you want, you can spawn a lot of workers which do scan_file/4 work, then collect the results later. That's all.

Wednesday Oct 24, 2007

Learning Coding Binary (Was Tim's Erlang Exercise - Round VI)

>>> Updated Nov 1:
Tim tested tbray5.erl on T5120, for his 971,538,252 bytes of data in 4,625,236 lines log file, got:

real    0m20.74s
user    3m51.33s
sys     0m8.00s

The result was what I guessed, since the elapsed time of my code was 3 times of Anders' on my machine. I'm glad that Erlang performs linearly on different machines/os.

My code not the fastest. I did not apply Boyer-Moore searching, thus scan_chunk_1/4 has to test/skip binary 1byte by 1byte when not exactly matched. Anyway, this code shows how to code binary efficiently, and demos the performance of traversing binary byte by byte (the performance is not so bad now, right?). And also, it's what I want: a balance between simple, readable and speed.

Another approach for lazy man is something binary pattern match hacking, we can modify scan_chunk_1/4 to:

scan_chunk_1(Bin, DataL, S, Dict) when S < DataL - 34 ->
    Offset = 
      case Bin of
          <<_:S/binary,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 
              34;
          <<_:S/binary,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              35;
          <<_:S/binary,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              36;
          <<_:S/binary,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              37;
          <<_:S/binary,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              38;
          <<_:S/binary,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              39;
          <<_:S/binary,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              40;
          <<_:S/binary,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              41;
          <<_:S/binary,_,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              42;
          <<_:S/binary,_,_,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              43;
          _ -> undefined
      end,
    case Offset of
        undefined -> scan_chunk_1(Bin, DataL, S + 10, Dict);
        _ ->
            case match_until_space_newline(Bin, S + Offset) of
                {true, E} ->
                    Skip = S + Offset - 12, L = E - Skip,
                    <<_:Skip/binary,Key:L/binary,_/binary>> = Bin,
                    scan_chunk_1(Bin, DataL, E + 1, dict:update_counter(Key, 1, Dict));
                {false, E} -> 
                    scan_chunk_1(Bin, DataL, E + 1, Dict)
            end
    end;
scan_chunk_1(_, _, _, Dict) -> Dict.

The elapsed time dropped to 1.424 sec immediatley vs 2.792 sec before, speedup about 100%, on my 4-CPU linux box.

If you are patient, you can copy-paste 100... such lines :-) (in this case, I'd rather to pick Boyer-Moore), and the elapsed time will drop a little bit more, but not much after 10 lines or so.
========

>>> Updated Oct 29:
Pihis updated his WideFinder, applied guildline II, and beat Ruby on his one-core box. And he also modified Anders's code by removing all un-necessary remaining binary bindings (which cause un-necessay sub-binary splitting), then, Steve tested the refined code, and got 0.567s on his famous 8-CPU linux box. Now, we may reach the real Disk/IO bound, should we try parallelized file reading? but, I've tired of more widefinders. BTW, May we have regexped pattern match in syntax level? The End.
========

>>> Updated Oct 26:
Code cleanup
========

Binary usaully is more efficent than List in Erlang.

The memory size of Binary is 3 to 6 words plus Data itself, the Data can be allocated / deallocated in global heap, so the Data can be shared over function calls, shared over processes when do message passing (on the same node), without copying. That's why heap size affects a lot on binary.

The memory size of List is 1 word per element + the size of each element, and List is always copying between function calls, on message passing.

In my previous blogs about Tim's exercise, I suspected the performance of Binary traverse. But, with more advices, experience, it seems binary can work very efficient as an ideal Dataset processing struct.

But, there are some guidelines for efficient binary in Erlang, I'll try to give out here, which I learned from the exercise and experts.

I. Don't split a binary unless the split binaries are what you exactly want

Splitting/combining binaries is expensive, so when you want to get values from a binary at some offsets:

Do

<<_:Offset/binary, C, _/binary>> = Bin,
io:format("Char at Offset: ~p", [C]).

Do Not *

<<_:Offset/binary, C/binary, _/binary>> = Bin,
io:format("Char at Offset: ~p", [C]).

* This may be good in R12B

And when you want to split a binary to get Head or Tail only:

Do

<<Head:Offset/binary,_/binary>> = Bin.

Do Not

{Head, _} = split_binary(Bin, Offset).

II. Calculate the final offsets first, then split it when you've got the exactly offsets

When you traverse binary to test the bytes/bits, calculate and collect the final offsets first, don't split binary (bind named Var to sub-binary) at that time. When you've got all the exactly offsets, split what you want finally:

Do

get_nth_word(Bin, N) ->
    Offsets = calc_word_offsets(Bin, 0, [0]),
    S = element(N, Offsets),
    E = element(N + 1, Offsets),
    L = E - S,
    <<_:S/binary,Word:L/binary,_/binary>> = Bin,
    io:format("nth Word: ~p", [Word]).

calc_word_offsets(Bin, Offset, Acc) when Offset < size(Bin) ->
    case Bin of
        <<_:Offset/binary,$ ,_/binary>> ->
            calc_word_offsets(Bin, Offset + 1, [Offset + 1 | Acc]);
        _ ->
            calc_word_offsets(Bin, Offset + 1, Acc)
    end;
calc_word_offsets(_, _, Acc) -> list_to_tuple(lists:reverse(Acc)).

Bin = <<"This is a binary test">>,
get_nth_word(Bin, 4). %  <<"binary ">>

Do Not

get_nth_word_bad(Bin, N) ->
    Words = split_words(Bin, 0, []),
    Word = element(N, Words),
    io:format("nth Word: ~p", [Word]).

split_words(Bin, Offset, Acc) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            split_words(Rest, 0, [Word | Acc]);
        <<_:Offset/binary,_,_/binary>> ->
            split_words(Bin, Offset + 1, Acc);
        _ -> list_to_tuple(lists:reverse([Bin | Acc]))
    end.

Bin = <<"This is a binary test">>,
get_nth_word_bad(Bin, 4). %  <<"binary">>

III. Use "+h Size" option or [{min_heap_size, Size}] with spawn_opt

This is very important for binary performance. It's somehow a Key Number for binary performance. With this option set properly, the binary performs very well, otherwise, worse.

IV. Others

  • Don't forget to compile to native by adding "-compile([native])." in your code.
  • Maybe "+A Size" to set the number of threads in async thread pool also helps a bit when do IO.

Practice

Steve and Anders have pushed widefinder in Erlang to 1.1 sec on 8-CPU linux box. Their code took about 1.9 sec on my 4-CPU box. Then, how about a concise version?

According to above guide, based on my previous code and Per's dict code, Luke's spawn_worker, I rewrote a concise and straightforward tbray5.erl (less than 80 LOC), without any extra c-drived modules for Tim's exercise , and got about 2.972 sec for 1 milli lines log file, and 15.695 sec for 5 milli lines, vs no-parallelized Ruby's 4.161 sec and 20.768 sec on my 2.80Ghz 4-CPU Intel Xeon linux box:

BTW, using ets instead of dict is almost the same.

$ erlc -smp tbray5.erl
$ time erl +h 8192 -smp -noshell -run tbray5 start o1000k.ap -s erlang halt

real    0m2.972s
user    0m9.685s
sys     0m0.748s

$ time erl +h 8192 -smp -noshell -run tbray5 start o5000k.ap -s erlang halt

real    0m15.695s
user    0m53.551s
sys     0m4.268s

On 2.0GHz 2-core MacBook (Ruby code took 2.447 sec):

$ time erl +h 8192 -smp -noshell -run tbray5 start o1000k.ap -s erlang halt

real    0m3.034s
user    0m4.853s
sys     0m0.872s

The Code: tbray5.erl

-module(tbray5).
-compile([native]).
-export([start/1]).

-define(BUFFER_SIZE, (1024 * 10000)).

start(FileName) ->
    Dicts = [wait_result(Worker) || Worker <- read_file(FileName)],
    print_result(merge_dicts(Dicts)).
              
read_file(FileName) ->
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file_1(File, 0, []).            
read_file_1(File, Offset, Workers) ->
    case file:pread(File, Offset, ?BUFFER_SIZE) of
        eof ->
            file:close(File),
            Workers;
        {ok, Bin} ->
            DataL = split_on_last_newline(Bin),
            Worker = spawn_worker(self(), fun scan_chunk/1, {Bin, DataL}),
            read_file_1(File, Offset + DataL + 1, [Worker | Workers])
    end.

split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)).   
split_on_last_newline_1(Bin, S) when S > 0 ->
    case Bin of
        <<_:S/binary,$\n,_/binary>> -> S;
        _ -> split_on_last_newline_1(Bin, S - 1)
    end;
split_on_last_newline_1(_, S) -> S.

scan_chunk({Bin, DataL}) -> scan_chunk_1(Bin, DataL, 0, dict:new()).
scan_chunk_1(Bin, DataL, S, Dict) when S < DataL - 34 ->
    case Bin of
        <<_:S/binary,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
            case match_until_space_newline(Bin, S + 34) of
                {true, E} ->
                    Skip = S + 23, L = E - Skip,
                    <<_:Skip/binary,Key:L/binary,_/binary>> = Bin,
                    scan_chunk_1(Bin, DataL, E + 1, dict:update_counter(Key, 1, Dict));
                {false, E} ->
                    scan_chunk_1(Bin, DataL, E + 1, Dict)
            end;
        _ -> scan_chunk_1(Bin, DataL, S + 1, Dict)
    end;
scan_chunk_1(_, _, _, Dict) -> Dict.

match_until_space_newline(Bin, S) when S < size(Bin) ->
    case Bin of
        <<_:S/binary,10,_/binary>> -> {false, S};
        <<_:S/binary,$.,_/binary>> -> {false, S};
        <<_:S/binary,_,$ ,_/binary>> -> {true, S + 1};
        _ -> match_until_space_newline(Bin, S + 1)
    end;
match_until_space_newline(_, S) -> {false, S}.
    
spawn_worker(Parent, F, A) ->
    erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end).

wait_result({Pid, Ref}) ->
    receive
        {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end;
        {'DOWN', Ref, _, _, Reason} -> exit(Reason)
    end.
    
merge_dicts([D1,D2|Rest]) ->
    merge_dicts([dict:merge(fun(_, V1, V2) -> V1 + V2 end, D1, D2) | Rest]);
merge_dicts([D]) -> D.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

Sunday Oct 21, 2007

Tim's Erlang Exercise - Summary

>>> Updated Nov 1:
Tim tested my last attempt tbray5.erl, which was described on Learning Coding Binary (Was Tim's Erlang Exercise - Round VI), got for his 971,538,252 bytes of data in 4,625,236 lines log file:

real    0m20.74s
user    3m51.33s
sys     0m8.00s

It's not the fastest, since I did not apply Boyer-Moore searching. But it's what I want: a balance between simple, readable and speed.
========

>>> Updated Oct 24:
The Erlang code can be faster than un-parallelized Ruby, a new version run 2.97 sec on the 4-CPU box: Learning Coding Binary (Was Tim's Erlang Exercise - Round VI)
========

>>> Updated Oct 22:
As Bjorn's suggestion, I added "+h 4096" option for 'erl', which means "sets the default heap size of processes to the size 4096", the elapsed time dropped from 7.7s to 5.5s immediately:

time erl +h 4096 -smp -noshell -run tbray4 start o1000k.ap 10 -s erlang halt

The +h option seems to affect on binary version a lot, but few on list version. This may be caused by that list is always copied and binary may be left in process' heap and passed by point?

The default heap size for each process is set to 233 Word, this number may be suitable for a lot of concurrent processes to avoid too much memory exhaust. But for some parallelization tasks, with less processes, or with enough memory, the heap size can be adjusted to a bit large.

Anyway, I think Erlang/OTP has been very good there for Concurrency, but there may be still room to optimize for Parallelization.

BTW, with +h option, and some tips for efficient binary, the most concise binary version tbray5.erl can run into 3 sec now.
========

This is a performance summary on Tim's Erlang exercise on large dataset processing, I only compare the results on a 4-CPU Intel Xeon 2.80G linux box:

Log FileTimeErlang(1 Proc)Erlang(Many Proc)Erlang(Many Proc)
+h 4096
Ruby
1 milli linesreal22.088s7.700s5.475s4.161s
user21.161s25.750s18.785s3.592s
sys0.924s3.552s1.352s0.568s
5 milli linesreal195.570s37.669s27.911s20.768s
user192.496s126.296s98.162s19.009s
sys3.480s17.789s7.344s3.116s

Notice:

  • The Erlang code is tbray4.erl, which can be found in previous blog, the Ruby code is from Tim's blog.
  • Erlang code is parallelized, Ruby code not.
  • Erlang code is with tons of code, but, parallelization is not free lunch.
  • With an 8-CPU box, Erlang's version should exceed or near to non-parallelized Ruby version*.
  • Although we are talking about multiple-core era, but I'm not sure if disk/io is also ready.

* Per Steve's testing.

Saturday Oct 20, 2007

CN Erlounge II

It was last weekend, in Zhuhai, China, a two day CN Erlounge II, discussed topics of Erlang and FP:

  • Why I choose Erlang? - by xushiwei
  • Erlang emulator implementation - by mryufeng
  • Port & driver - by codeplayer
  • Py 2 Erl  - by Zoom.Quiet
  • STM: Lock free concurrent Overview - by Albert Lee
  • mnesia - by mryufeng

And a new logo of "ERL.CHINA" was born as:


(Picture source - http://www.haokanbu.com/story/1104/)

More information about CN Erlounge II can be found here and some pictures.

I did not schedule for this meeting, maybe next time.

Tuesday Oct 16, 2007

Learning Coding Parallelization (Was Tim's Erlang Exercise - Round V)

>>> Updated Oct 20:
After cleaned up my 4-CPU linux box, the result for the 1M records file is about 7.7 sec, for the 5M records file is about 38 sec.
========

>>> Updated Oct 16:
After testing my code on different machines, I found that disk/io performed varyingly, for some very large files, reading file in parallel may cause longer elapsed time (typically on non-server machine, which is not equipped for fast disk/io). So, I added another version tbray4b.erl, in this version, only reading file is not parallalized, all other code is the same. If you'd like to have a test on your machine, please try both.
========

Well, I think I've learned a lot from doing Tim's exercise, not only the List vs Binary in Erlang, but also computing in parallel. Coding Concurrency is farely easy in Erlang, but coding Parallelization is not only about the Languages, it's also a real question.

I wrote tbray3.erl in The Erlang Way (Was Tim Bray's Erlang Exercise - Round IV) and got a fairly good result by far on my 2-core MacBook. But things always are a bit complex. As Steve pointed in the comment, when he tried tbray3.erl on his 8-core linux box:

"I ran it in a loop 10 times, and the best time I saw was 13.872 sec, and user/CPU time was only 16.150 sec, so it’s apparently not using the multiple cores very well."

I also encoutered this issue on my 4-CPU Intel Xeon CPU 2.80GHz debian box, it runs even worse (8.420s) than my 2-core MacBook (4.483s).

I thought about my code a while, and found that my code seems spawning too many processes for scan_chunk, as the scan_chunk's performance has been improved a lot, each process will finish its task very quickly, too quick to the file reading, the inceasing CPUs have no much chance to play the game, the cycled 'reading'-'spawning scan process' is actually almost sequential now, there has been very few simultaneously alive scanning processes. I think I finally meet the file reading bound.

But wait, as I claimed before, that reading file to memory is very fast in Erlang, for a 200M log file, it takes less than 800ms. The time elapsed for tbray3.erl is about 4900ms, far away from 800ms, why I say the file reading is the bound now?

The problem here is: since I suspect the performance of traversing binary byte by byte, I choose to convert binary to list to scan the world. Per my testing results, list is better than binary when is not too longer, in many cases, not longer than several KBytes. And, to make the code clear and readable, I also choose splitting big binary when read file in the meanwhile, so, I have to read file in pieces of no longer than n KBytes. For a very big file, the reading procedure is broken to several ten-thousands steps, which finally cause the whole file reading time elapsed is bit long. That's bad.

So, I decide to write another version, which will read file in parallel (Round III), and split each chunk on lastest new-line (Round II), scan the words using pattern match (Round IV), and yes, I'll use binary instead of list this time, try to solve the worse performance of binary-traverse by parallel, on multiple cores.

The result is interesting, it's the first time I achieved around 10 sec in my 2-core MacBook when use binary match only, and it's also the first time, on my dummy 4-CPU Intel Xeon CPU 2.80GHz debian box, I got better result (7.700 sec) than my MacBook.

>>> Updated Oct 15:
Steve run the code on his 8-core 2.33 GHz Intel Xeon Linux box, with the best time was 4.920 sec:

"the best time I saw for your newest version was 4.920 sec on my 8-core Linux box. Fast! However, user time was only 14.751 sec, so I’m not sure it’s using all the cores that well. Perhaps you’re getting down to where I/O is becoming a more significant factor."

Please see Steve's One More Erlang Wide Finder and his widefinder attempts.
========

Result on 2.0GHz 2-core MacBook:

$ time erl -smp -noshell -run tbray4_bin start o1000k.ap 20 -s erlang halt
8900    : 2006/09/29/Dynamic-IDE
2000    : 2006/07/28/Open-Data
1300    : 2003/07/25/NotGaming
800     : 2003/10/16/Debbie
800     : 2003/09/18/NXML
800     : 2006/01/31/Data-Protection
700     : 2003/06/23/SamsPie
600     : 2006/09/11/Making-Markup
600     : 2003/02/04/Construction
600     : 2005/11/03/Cars-and-Office-Suites
Time:   10527.50 ms

real    0m10.910s
user    0m13.927s
sys     0m6.413s

Result on 4-CPU Intel Xeon CPU 2.80GHz debian box,:

# When process number is set to 20:
time erl -smp -noshell -run tbray4_bin start o1000k.ap 20 -s erlang halt 

real    0m7.700s
user    0m25.750s
sys     0m3.552s

# When process number is set to 1:
$ time erl -smp -noshell -run tbray4_bin start o1000k.ap 1 -s erlang halt

real    0m22.035s
user    0m21.525s
sys     0m0.512s

# On a 940M 5 million lines log file:
time erl -smp -noshell -run tbray4_bin start o5000k.ap 100 -s erlang halt
44500   : 2006/09/29/Dynamic-IDE
10000   : 2006/07/28/Open-Data
6500    : 2003/07/25/NotGaming
4000    : 2003/10/16/Debbie
4000    : 2003/09/18/NXML
4000    : 2006/01/31/Data-Protection
3500    : 2003/06/23/SamsPie
3000    : 2006/09/11/Making-Markup
3000    : 2003/02/04/Construction
3000    : 2005/11/03/Cars-and-Office-Suites
Time:   37512.76 ms

real    0m37.669s
user    2m6.296s
sys     0m17.789s

On the 4-CPU linux box, comparing the elapsed time between ProcNum = 20 and ProcNum = 1, the elapsed time of parallelized one was only 35% of un-parallelized one, speedup about 185%. The ratio was almost the same as my pread_file.erl testing on the same machine.

It's actually a combination of code in my four previous blogs. Although the performance is not so good as tbray3.erl on my MacBook, but I'm happy that this version is a fully parallelized one, from reading file, scanning words etc. it should scale better than all my previous versions.

The code: tbray4.erl

-module(tbray4).

-compile([native]).

-export([start/1,
         start/2]).

-include_lib("kernel/include/file.hrl").

start([FileName, ProcNum]) when is_list(ProcNum) -> 
    start(FileName, list_to_integer(ProcNum)).
start(FileName, ProcNum) ->
    Start = now(),

    Main = self(),
    Counter = spawn(fun () -> count_loop(Main) end),
    Collector = spawn(fun () -> collect_loop(Counter) end),

    pread_file(FileName, ProcNum, Collector),

    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])       
    end.

pread_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    pread_file_1(FileName, ChunkSize, ProcNum, Collector).       
pread_file_1(FileName, ChunkSize, ProcNum, Collector) ->
    [spawn(fun () ->
                   Length = if  I == ProcNum - 1 -> ChunkSize * 2; %% lastest chuck
                                true -> ChunkSize end,
                   {ok, File} = file:open(FileName, [raw, binary]),
                   {ok, Bin} = file:pread(File, ChunkSize * I, Length),
                   file:close(File),
                   {Data, Tail} = split_on_last_newline(Bin),
                   Collector ! {seq, I, Data, Tail}
           end) || I <- lists:seq(0, ProcNum - 1)],
    Collector ! {chunk_num, ProcNum}.

collect_loop(Counter) -> collect_loop_1([], <<>>, -1, Counter).
collect_loop_1(Chunks, PrevTail, LastSeq, Counter) ->
    receive
        {chunk_num, ChunkNum} ->
            Counter ! {chunk_num, ChunkNum},
            collect_loop_1(Chunks, PrevTail, LastSeq, Counter);
        {seq, I, Data, Tail} ->
            SortedChunks = lists:keysort(1, [{I, Data, Tail} | Chunks]),
            {Chunks1, PrevTail1, LastSeq1} = 
                process_chunks(SortedChunks, [], PrevTail, LastSeq, Counter),
            collect_loop_1(Chunks1, PrevTail1, LastSeq1, Counter)
    end.
    
count_loop(Main) -> count_loop_1(Main, dict:new(), undefined, 0).
count_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
count_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            count_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} ->
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            count_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.

process_chunks([], ChunkBuf, PrevTail, LastSeq, _) -> {ChunkBuf, PrevTail, LastSeq};
process_chunks([{I, Data, Tail}=Chunk|T], ChunkBuf, PrevTail, LastSeq, Counter) ->
    case LastSeq + 1 of
        I ->
            spawn(fun () -> Counter ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end),
            process_chunks(T, ChunkBuf, Tail, I, Counter);
        _ ->
            process_chunks(T, [Chunk | ChunkBuf], PrevTail, LastSeq, Counter)
    end.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

get_chunk_size(FileName, ProcNum) ->
    {ok, #file_info{size=Size}} = file:read_file_info(FileName),
    Size div ProcNum.

split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)).   
split_on_last_newline_1(Bin, Offset) when Offset > 0 ->
    case Bin of
        <<Data:Offset/binary,$\n,Tail/binary>> ->
            {Data, Tail};
        _ -> 
            split_on_last_newline_1(Bin, Offset - 1)
    end;
split_on_last_newline_1(Bin, _) -> {Bin, <<>>}.
    
scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()).    
scan_chunk_1(Bin, Offset, Dict) when Offset =< size(Bin) - 34 ->
    case Bin of
        <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> ->            
            case match_until_space_newline(Rest, 0) of
                {Rest1, <<>>} -> 
                    scan_chunk_1(Rest1, 0, Dict);
                {Rest1, Word} -> 
                    Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>,
                    scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict))
            end;
        _ -> scan_chunk_1(Bin, Offset + 1, Dict)
    end;
scan_chunk_1(_, _, Dict) -> Dict.

match_until_space_newline(Bin, Offset) when Offset < size(Bin) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            {Rest, Word};
        <<_:Offset/binary,$.,Rest/binary>> ->
            {Rest, <<>>};
        <<_:Offset/binary,10,Rest/binary>> ->
            {Rest, <<>>};
        _ ->
            match_until_space_newline(Bin, Offset + 1)
    end;
match_until_space_newline(_, _) -> {<<>>, <<>>}.

>>> Updated Oct 16:
After testing my code on different machines, I found that disk/io performed varyingly, for some very large files, reading file in parallel may cause longer elapsed time (typically on non-server machine, which is not equipped for fast disk/io). So, I wrote another version: tbray4b.erl, in this version, only reading file is not parallalized, all other code is the same. Here's a result for this version on a 940M file with 5 million lines, with ProcNum set to 200 and 400)

# On 2-core MacBook:
$ time erl -smp -noshell -run tbray4b start o5000k.ap 200 -s erlang halt

real    0m50.498s
user    0m49.746s
sys     0m11.979s

# On 4-cpu linux box:
$ time erl -smp -noshell -run tbray4b start o5000k.ap 400 -s erlang halt

real    1m2.136s
user    1m59.907s
sys     0m7.960s

The code: tbray4b.erl

-module(tbray4b).

-compile([native]).

-export([start/1,
         start/2]).

-include_lib("kernel/include/file.hrl").

start([FileName, ProcNum]) when is_list(ProcNum) -> 
    start(FileName, list_to_integer(ProcNum)).
start(FileName, ProcNum) ->
    Start = now(),

    Main = self(),
    Counter = spawn(fun () -> count_loop(Main) end),
    Collector = spawn(fun () -> collect_loop(Counter) end),

    read_file(FileName, ProcNum, Collector),

    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])       
    end.

read_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file_1(File, ChunkSize, 0, Collector).    
read_file_1(File, ChunkSize, I, Collector) ->
    case file:read(File, ChunkSize) of
        eof ->
            file:close(File),
            Collector ! {chunk_num, I};
        {ok, Bin} -> 
            spawn(fun () ->
                          {Data, Tail} = split_on_last_newline(Bin),
                          Collector ! {seq, I, Data, Tail}
                  end),
            read_file_1(File, ChunkSize, I + 1, Collector)
    end.

collect_loop(Counter) -> collect_loop_1([], <<>>, -1, Counter).
collect_loop_1(Chunks, PrevTail, LastSeq, Counter) ->
    receive
        {chunk_num, ChunkNum} ->
            Counter ! {chunk_num, ChunkNum},
            collect_loop_1(Chunks, PrevTail, LastSeq, Counter);
        {seq, I, Data, Tail} ->
            SortedChunks = lists:keysort(1, [{I, Data, Tail} | Chunks]),
            {Chunks1, PrevTail1, LastSeq1} = 
                process_chunks(SortedChunks, [], PrevTail, LastSeq, Counter),
            collect_loop_1(Chunks1, PrevTail1, LastSeq1, Counter)
    end.
    
count_loop(Main) -> count_loop_1(Main, dict:new(), undefined, 0).
count_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
count_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            count_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} ->
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            count_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.

process_chunks([], ChunkBuf, PrevTail, LastSeq, _) -> {ChunkBuf, PrevTail, LastSeq};
process_chunks([{I, Data, Tail}=Chunk|T], ChunkBuf, PrevTail, LastSeq, Counter) ->
    case LastSeq + 1 of
        I ->
            spawn(fun () -> Counter ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end),
            process_chunks(T, ChunkBuf, Tail, I, Counter);
        _ ->
            process_chunks(T, [Chunk | ChunkBuf], PrevTail, LastSeq, Counter)
    end.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

get_chunk_size(FileName, ProcNum) ->
    {ok, #file_info{size=Size}} = file:read_file_info(FileName),
    Size div ProcNum.

split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)).   
split_on_last_newline_1(Bin, Offset) when Offset > 0 ->
    case Bin of
        <<Data:Offset/binary,$\n,Tail/binary>> ->
            {Data, Tail};
        _ -> 
            split_on_last_newline_1(Bin, Offset - 1)
    end;
split_on_last_newline_1(Bin, _) -> {Bin, <<>>}.
    
scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()).    
scan_chunk_1(Bin, Offset, Dict) when Offset =< size(Bin) - 34 ->
    case Bin of
        <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> ->            
            case match_until_space_newline(Rest, 0) of
                {Rest1, <<>>} -> 
                    scan_chunk_1(Rest1, 0, Dict);
                {Rest1, Word} -> 
                    Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>,
                    scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict))
            end;
        _ -> scan_chunk_1(Bin, Offset + 1, Dict)
    end;
scan_chunk_1(_, _, Dict) -> Dict.

match_until_space_newline(Bin, Offset) when Offset < size(Bin) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            {Rest, Word};
        <<_:Offset/binary,$.,Rest/binary>> ->
            {Rest, <<>>};
        <<_:Offset/binary,10,Rest/binary>> ->
            {Rest, <<>>};
        _ ->
            match_until_space_newline(Bin, Offset + 1)
    end;
match_until_space_newline(_, _) -> {<<>>, <<>>}.


=======