Posts for the month of January 2009

Scala Plugin 0.15.1 for NetBeans Released

I'm pleased to announce the availability of Scala plugin 0.15.1 for NetBeans 6.5.

  • Bundling with Scala 2.7.3
  • Partly supported Java/Scala mixed project: building project is OK (need to create a new Scala project); Java source is visible in Scala editor.
  • Various bugs fixes

To download, please go to: https://sourceforge.net/project/showfiles.php?group_id=192439&package_id=256544&release_id=654650

For more information, please see http://wiki.netbeans.org/Scala

Bug reports are welcome.

=== Important Notice ===:

If you have a previous Scala plugin installed, after new plugins installed, you need to locate the NetBeans user profile:

  • On Windows system, the default location is: "C:\Documents and Settings\<username>\.netbeans\<version-number>\"
  • On Unix/Linux/MaxOS system: "<username>/.netbeans/<version-number>/"
There is directory "scala" under above location, you should delete the whole sub-folder "scala-2.7.2.final", and leave only "scala-2.7.3.final".

=====================

Async or Sync Log in Erlang - Limit the Load of Singleton Process

In a previous blog: A Case Study of Scalability Related "Out of memory" Crash in Erlang, I described a scalability related issue, which was coming from a singleton async logger process. A singleton Erlang process can not benefit from multiple-core scalability.

I then did some testing on disk_log, which fortunately has sync log functions: log/2 and blog/2. Whenever a process calls disk_log:blog/2, the requesting process will monitor the logger process until it returns a result or failure. The piece of code is like:

monitor_request(Pid, Req) ->
    Ref = erlang:monitor(process, Pid),
    Pid ! {self(), Req},
    receive 
        {'DOWN', Ref, process, Pid, _Info} ->
            {error, no_such_log};
        {disk_log, Pid, Reply} ->
            erlang:demonitor(Ref),
            receive 
                {'DOWN', Ref, process, Pid, _Reason} ->
                    Reply
            after 0 ->
                    Reply
            end
    end.

Where Pid is the logger's process id.

This piece of code shows how to interactive synchronously between processes.

Under sync mode, there may be a lot of simultaneous requesting processes request the logger process to log message asynchronously, but each requesting process will wait the logger's work done before it requests next log, i.e. each requesting process requests the logger to log message synchronously. Upon this sync mode, we can guarantee the logger's message queue length won't exceed the number of simultaneous requesting processes.

I wrote some testing code: dlogger.erl

-module(dlogger).

-export([sync/2,
         async/2,
         proc_loop/4
        ]).

-define(LogName, blog).
-define(LogFile, "b.log").

% 100000, 10
sync(N_Msg, N_Procs) ->
   test(N_Msg, N_Procs, fun disk_log:blog/2).

async(N_Msg, N_Procs) ->
   test(N_Msg, N_Procs, fun disk_log:balog/2).


test(N_Msg, N_Procs, FunLog) ->
   MsgPerProc = round(N_Msg / N_Procs),
   Collector = init(N_Procs),
   LogPid = logger_pid(?LogName),
   io:format("logger pid: ~p~n", [LogPid]),
   Workers = [spawn(?MODULE, proc_loop, [Collector, MsgPerProc, LogPid, FunLog]) || _I <- lists:seq(1, N_Procs)],
   Start = now(),
   [Worker ! start || Worker <- Workers],
   %% don't terminate, wait here, until all tasks done.
   receive
      {sent_done, MaxMQLen, MaxMem} ->
         probe_logger(LogPid, MaxMQLen, MaxMem),
         [exit(Worker, kill) || Worker <- Workers],
         io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])
   end.

init(N_Procs) ->
   disk_log:close(?LogName),
   disk_log:open([{name, ?LogName},
                  {file, ?LogFile},
                  {format, external}]),
   MainPid = self(),
   Collector = spawn(fun() -> collect(MainPid, N_Procs, 0, 0, 0, 0) end),
   Collector.

collect(MainPid, N_Procs, N_Finished, _N_Msg, MaxMQLen, MaxMem) when N_Procs == N_Finished ->
   MainPid ! {sent_done, MaxMQLen, MaxMem};
collect(MainPid, N_Procs, N_Finished, N_Msg, MaxMQLen, MaxMem) ->
   receive
      {Pid, sent_done, MQLen, _Mem} ->
         io:format("==== QLen ~p. Proc ~p finished, total finished: ~p ====~n", [MQLen, Pid, N_Finished + 1]),
         collect(MainPid, N_Procs, N_Finished + 1, N_Msg, MaxMQLen, MaxMem);
      {Pid, I, MQLen, Mem} ->
         %io:format("Processed/Qlen ~p/~p msgs. Logger mem is ~p. proc ~p: No.~p msgs sent~n", [N_Msg + 1, MQLen, Mem, Pid, I]),
         MaxMQLen1 = if MQLen > MaxMQLen -> MQLen; true -> MaxMQLen end,
         MaxMem1 = if Mem > MaxMem -> Mem; true -> MaxMem end,
         collect(MainPid, N_Procs, N_Finished, N_Msg + 1, MaxMQLen1, MaxMem1)
   end.

proc_loop(Collector, N_Msg, LogPid, LogFun) ->
   receive
      start ->
         do_proc_work(Collector, N_Msg, LogPid, LogFun, do_log)
   end.

do_proc_work(Collector, I, LogPid, LogFun, WorkType) ->
   Date = httpd_util:rfc1123_date(calendar:local_time()),
   MQLen = logger_mqlen(LogPid),
   Mem = logger_mem(LogPid),
   Msg = io_lib:format("logged in ~p, logger qlen is ~p, total mem is ~p\n",
                       [self(), MQLen, Mem]),
   Msg1 = list_to_binary([<<"=INFO REPORT==== ">>, Date, <<" ===\n">>, Msg, <<"\n">>]),

   WorkType1 = if WorkType == do_log ->
                     LogFun(?LogName, Msg1),
                     io:format("", []), % sync the io between collector if any 
                     Collector ! {self(), I, MQLen, Mem},
                     io:format("sent one msg, qlen:~p, mem:~p~n", [MQLen, Mem]),
                     if I =< 1 ->
                           Collector ! {self(), sent_done, MQLen, Mem},
                           io:format("~p sent done, qlen:~p, mem:~p~n", [self(), MQLen, Mem]),
                           keep_live;
                        true -> do_log
                     end;
                  true -> keep_live
               end,
   do_proc_work(Collector, I - 1, LogPid, LogFun, WorkType1).

probe_logger(Pid, MaxMQLen, MaxMem) ->
   MQLen = logger_mqlen(Pid),
   Mem = logger_mem(Pid),
   MaxMQLen1 = if MQLen > MaxMQLen -> MQLen; true -> MaxMQLen end,
   MaxMem1 = if Mem > MaxMem -> Mem; true -> MaxMem end,
   io:format("qlen is ~p, max qlen is ~p, max mem is ~p~n", [MQLen, MaxMQLen1, MaxMem1]),
   if MQLen == 0 -> done;
      true ->
         timer:sleep(10),
         probe_logger(Pid, MaxMQLen, MaxMem)
   end.

%% === helper ===
logger_pid(Log) ->
   case disk_log_server:get_log_pids(Log) of
      undefined ->
         undefined;
      {local, Pid} ->
         Pid;
      {distributed, [Pid|_Pids]} ->
         Pid
   end.

logger_mqlen(undefined) -> 0;
logger_mqlen(Pid) ->
   case process_info(Pid, message_queue_len) of
      {message_queue_len, Val} when is_integer(Val) -> Val;
      _ -> 0
   end.

logger_mem(undefined) -> 0;
logger_mem(Pid) ->
   case process_info(Pid, memory) of
      {memory, Val} when is_integer(Val) -> Val;
      _ -> 0
   end.

You can always use process_info/2 or process_info/1 to probe the information of a process. In above code, we will probe the logger process's message queue length and memory via logger_mqlen/1 and logger_mem/1 when necessary.

I wrote the code as it, so I can create thousands of processes first, then each process will repeatedly request to log message MsgPerProc? times, and keep alive after all messages have been sent.

And, to evaluate the actual task time, when all requesting processes have finished sending log messages, a probe_logger/1 function will confirm all messages in logger queue have been processed.

Here's the result:

> dlogger:sync(1000000, 1000) % 1 million log messages under 1000 requesting processes:
qlen is 0, max qlen is 999, max mem is 690,400
Time:  861286.24 ms
> dlogger:async(1000000, 1000) % 1 million log messages under 1000 requesting processes:
qlen is 0, max qlen is 68487, max mem is 75,830,616
Time: 2156351.45 ms

The performance in async mode is getting much worse comparing to sync mode. Under async mode, not only the elapsed time grew a lot, but also the max queue length reached to 68487, and the memory of logger process reached about 72M. Actually, after all messages had been sent, only a few messages had been processed by logger process. Since I kept 1000 processes alive, the logger process only <b>shared very very poor proportion CPU cycles</b>, the message processing procedure (log to disk buffer here) became very very slow, which caused the worse elapsed time. This case can be applied on any singleton process.

On the other side, under sync mode, the max queue length did not exceed the number of simultaneous requesting processes, here is 999 in case of 1000 simultaneous processes, and the max memory of logger process is reasonable in about 674K.

I'd like to emphasize the points:

  • Singleton process in Erlang can not benefit from multiple-core scalability.
  • Move code/job out of singleton process as much as possible to simultaneous processes, for example, before send the message to singleton process, done all pre-formatting/computing job.
  • Some times, you need to sync the interacting between processes, to limit the singleton process working load.