Erlang Plugin for NetBeans in Scala#1: Scala Enumeration Implemented a Java Interface
ErlyBird (previous plugin) was written in Java, and based on NetBeans' Generic Languages Framework ( Project Schliemann). As NetBeans 7.0 introducing new Parsing API, I'm going to migrate Erlang plugin for NetBeans to it, and rewrite in Scala.
Defining a Scala/Java mixed module project in NetBeans' trunk tree is a bit trick, but, since it's plain ant-based project, you can always take over it by writing/modifying build.xml. Challenges are actually from integrating Scala's Class/Object/Trait to an already existed framework. I met this kind of challenge quickly when I tried to implement an enum like class which should implement a Java interface org.netbeans.api.lexer.TokenId?
The signature of TokenId? is:
public interface TokenId { String name(); int ordinal(); String primaryCategory(); }
From the signature, you can get the hint to implement it as a Java enum. That's true and straightforward in Java. But how about in Scala?
There is an abstract class Enumeration in Scala's library. The example code shows how to use it by extending it as a singleton Object. But for my case, I have to implement name() and ordinal() methods of TokenId? in the meantime, which brings some inconveniences for get the name() method automatically/magically for each enumeration value.
A quick google search gave some discussions about it, for example, a method reflection and invocation may be a choice to get name() simply implemented. I tried it, but finally dropped, because you cannot guarantee the condition of call stack of name(), it may happen to be called in this Object's own method, which then, may bring infinite cycled calls.
My final code is like:
package org.netbeans.modules.erlang.editor.lexer object ErlangTokenId extends Enumeration { type ErlangTokenId = V // Extends Enumeration's inner class Val to get custom enumeration value class V(val name:String, val fixedText:String, val primaryCategory:String) extends Val(name) with TokenId { override def ordinal = id } object V { def apply(name:String, fixedText:String, primaryCategory:String) = new V(name, fixedText, primaryCategory) } val IGNORED = V("IGNORED", null, "ingore") val Error = V("Error", null, "error") // --- Spaces and comments val Ws = V("Ws", null, "whitespace") val Nl = V("Nl", null, "whitespace") val LineComment = V("LineComment", null, "comment") val CommentTag = V("CommentTag", null, "comment") val CommentData = V("CommentData", null, "comment") // --- Literals val IntegerLiteral = V("IntegerLiteral", null, "number") val FloatingPointLiteral = V("FloatingPointLiteral", null, "number") val CharacterLiteral = V("CharacterLiteral", null, "char") val StringLiteral = V("StringLiteral", null, "string") // --- Keywords val Andalso = V("Andalso", "andalso", "keyword") val After = V("After", "after", "keyword") val And = V("And", "and", "keyword") val Band = V("Band", "band", "keyword") val Begin = V("Begin", "begin", "keyword") val Bnot = V("Bnot", "bnot", "keyword") val Bor = V("Bor", "bor", "keyword") val Bsr = V("Bsr", "bsr", "keyword") val Bxor = V("Bxor", "bxor", "keyword") val Case = V("Case", "case", "keyword") val Catch = V("Catch", "catch", "keyword") val Cond = V("Cond", "cond", "keyword") val Div = V("Div", "div", "keyword") val End = V("End", "end", "keyword") val Fun = V("Fun", "fun", "keyword") val If = V("If", "if", "keyword") val Not = V("Not", "not", "keyword") val Of = V("Of", "of", "keyword") val Orelse = V("Orelse", "orelse", "keyword") val Or = V("Or", "or", "keyword") val Query = V("Query", "query", "keyword") val Receive = V("Receive", "receive", "keyword") val Rem = V("Rem", "rem", "keyword") val Try = V("Try", "try", "keyword") val When = V("When", "when", "keyword") val Xor = V("Xor", "xor", "keyword") // --- Identifiers val Atom = V("Atom", null, "identifier") val Var = V("Var", null, "identifier") // --- Symbols val LParen = V("LParen", "(", "separator") val RParen = V("RParan", ")", "separator") val LBrace = V("LBrace", "{", "separator") val RBrace = V("RBrace", "}", "separator") val LBracket = V("LBracket", "[", "separator") val RBracket = V("RBracket", "]", "separator") val Comma = V("Comma", ",", "separator") val Dot = V("Dot", ".", "separator") val Semicolon = V("Semicolon", ";", "separator") val DBar = V("DBar", "||", "separator") val Bar = V("Bar", "|", "separator") val Question = V("Question", "?","separator") val DLt = V("DLt", "<<", "separator") val LArrow = V("LArrow", "<-", "separator") val Lt = V("Lt", "<", "separator") val DGt = V("DGt", >", "separator") val Ge = V("Ge", =", "separator") val Gt = V("Gt", ", "separator") val ColonMinus = V("ColonMinus", ":-", "separator") val DColon = V("DColon", "::", "separator") val Colon = V("Colon", ":", "separator") val Hash = V("Hash", "#", "separator") val DPlus = V("DPlus", "++", "separator") val Plus = V("Plus", "+", "separator") val DMinus = V("DMinus", "--", "separator") val RArrow = V("RArrow", "->", "separator") val Minus = V("Minus", "-", "separator") val Star = V("Star", "*", "separator") val Ne = V("Ne", "/=", "separator") val Slash = V("Slash", "/", "separator") val EEq = V("EEq", "=:=", "separator") val ENe = V("ENe", "=/=", "separator") val DEq = V("DEq", "==", "separator") val Le = V("le", "=<", "separator") val Eq = V("Eq", "=", "separator") val Exclamation = V("Exclamation", "!", "separator") }
First, I defined a class V which extends Enumeration.Val, and implemented TokenId with an extra field: fixedText.
Then, I have to explicitly put the value's name to this class and pass it to Enumeration.Val's constructor, so function ErlangTokenId.valueOf(String) will work as Java's enum type.
By type ErlangTokenId = V, type ErlangTokenId.V is now aliased as ErlangTokenId, so you can use ErlangTokenId instead of ErlangTokenId.V everywhere now, which exactly gets the effect of one of the behaviors of Java's enum: enum's value is the same type of enum itself.
Async or Sync Log in Erlang - Limit the Load of Singleton Process
In a previous blog: A Case Study of Scalability Related "Out of memory" Crash in Erlang, I described a scalability related issue, which was coming from a singleton async logger process. A singleton Erlang process can not benefit from multiple-core scalability.
I then did some testing on disk_log, which fortunately has sync log functions: log/2 and blog/2. Whenever a process calls disk_log:blog/2, the requesting process will monitor the logger process until it returns a result or failure. The piece of code is like:
monitor_request(Pid, Req) -> Ref = erlang:monitor(process, Pid), Pid ! {self(), Req}, receive {'DOWN', Ref, process, Pid, _Info} -> {error, no_such_log}; {disk_log, Pid, Reply} -> erlang:demonitor(Ref), receive {'DOWN', Ref, process, Pid, _Reason} -> Reply after 0 -> Reply end end.
Where Pid is the logger's process id.
This piece of code shows how to interactive synchronously between processes.
Under sync mode, there may be a lot of simultaneous requesting processes request the logger process to log message asynchronously, but each requesting process will wait the logger's work done before it requests next log, i.e. each requesting process requests the logger to log message synchronously. Upon this sync mode, we can guarantee the logger's message queue length won't exceed the number of simultaneous requesting processes.
I wrote some testing code: dlogger.erl
-module(dlogger). -export([sync/2, async/2, proc_loop/4 ]). -define(LogName, blog). -define(LogFile, "b.log"). % 100000, 10 sync(N_Msg, N_Procs) -> test(N_Msg, N_Procs, fun disk_log:blog/2). async(N_Msg, N_Procs) -> test(N_Msg, N_Procs, fun disk_log:balog/2). test(N_Msg, N_Procs, FunLog) -> MsgPerProc = round(N_Msg / N_Procs), Collector = init(N_Procs), LogPid = logger_pid(?LogName), io:format("logger pid: ~p~n", [LogPid]), Workers = [spawn(?MODULE, proc_loop, [Collector, MsgPerProc, LogPid, FunLog]) || _I <- lists:seq(1, N_Procs)], Start = now(), [Worker ! start || Worker <- Workers], %% don't terminate, wait here, until all tasks done. receive {sent_done, MaxMQLen, MaxMem} -> probe_logger(LogPid, MaxMQLen, MaxMem), [exit(Worker, kill) || Worker <- Workers], io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. init(N_Procs) -> disk_log:close(?LogName), disk_log:open([{name, ?LogName}, {file, ?LogFile}, {format, external}]), MainPid = self(), Collector = spawn(fun() -> collect(MainPid, N_Procs, 0, 0, 0, 0) end), Collector. collect(MainPid, N_Procs, N_Finished, _N_Msg, MaxMQLen, MaxMem) when N_Procs == N_Finished -> MainPid ! {sent_done, MaxMQLen, MaxMem}; collect(MainPid, N_Procs, N_Finished, N_Msg, MaxMQLen, MaxMem) -> receive {Pid, sent_done, MQLen, _Mem} -> io:format("==== QLen ~p. Proc ~p finished, total finished: ~p ====~n", [MQLen, Pid, N_Finished + 1]), collect(MainPid, N_Procs, N_Finished + 1, N_Msg, MaxMQLen, MaxMem); {Pid, I, MQLen, Mem} -> %io:format("Processed/Qlen ~p/~p msgs. Logger mem is ~p. proc ~p: No.~p msgs sent~n", [N_Msg + 1, MQLen, Mem, Pid, I]), MaxMQLen1 = if MQLen > MaxMQLen -> MQLen; true -> MaxMQLen end, MaxMem1 = if Mem > MaxMem -> Mem; true -> MaxMem end, collect(MainPid, N_Procs, N_Finished, N_Msg + 1, MaxMQLen1, MaxMem1) end. proc_loop(Collector, N_Msg, LogPid, LogFun) -> receive start -> do_proc_work(Collector, N_Msg, LogPid, LogFun, do_log) end. do_proc_work(Collector, I, LogPid, LogFun, WorkType) -> Date = httpd_util:rfc1123_date(calendar:local_time()), MQLen = logger_mqlen(LogPid), Mem = logger_mem(LogPid), Msg = io_lib:format("logged in ~p, logger qlen is ~p, total mem is ~p\n", [self(), MQLen, Mem]), Msg1 = list_to_binary([<<"=INFO REPORT==== ">>, Date, <<" ===\n">>, Msg, <<"\n">>]), WorkType1 = if WorkType == do_log -> LogFun(?LogName, Msg1), io:format("", []), % sync the io between collector if any Collector ! {self(), I, MQLen, Mem}, io:format("sent one msg, qlen:~p, mem:~p~n", [MQLen, Mem]), if I =< 1 -> Collector ! {self(), sent_done, MQLen, Mem}, io:format("~p sent done, qlen:~p, mem:~p~n", [self(), MQLen, Mem]), keep_live; true -> do_log end; true -> keep_live end, do_proc_work(Collector, I - 1, LogPid, LogFun, WorkType1). probe_logger(Pid, MaxMQLen, MaxMem) -> MQLen = logger_mqlen(Pid), Mem = logger_mem(Pid), MaxMQLen1 = if MQLen > MaxMQLen -> MQLen; true -> MaxMQLen end, MaxMem1 = if Mem > MaxMem -> Mem; true -> MaxMem end, io:format("qlen is ~p, max qlen is ~p, max mem is ~p~n", [MQLen, MaxMQLen1, MaxMem1]), if MQLen == 0 -> done; true -> timer:sleep(10), probe_logger(Pid, MaxMQLen, MaxMem) end. %% === helper === logger_pid(Log) -> case disk_log_server:get_log_pids(Log) of undefined -> undefined; {local, Pid} -> Pid; {distributed, [Pid|_Pids]} -> Pid end. logger_mqlen(undefined) -> 0; logger_mqlen(Pid) -> case process_info(Pid, message_queue_len) of {message_queue_len, Val} when is_integer(Val) -> Val; _ -> 0 end. logger_mem(undefined) -> 0; logger_mem(Pid) -> case process_info(Pid, memory) of {memory, Val} when is_integer(Val) -> Val; _ -> 0 end.
You can always use process_info/2 or process_info/1 to probe the information of a process. In above code, we will probe the logger process's message queue length and memory via logger_mqlen/1 and logger_mem/1 when necessary.
I wrote the code as it, so I can create thousands of processes first, then each process will repeatedly request to log message MsgPerProc? times, and keep alive after all messages have been sent.
And, to evaluate the actual task time, when all requesting processes have finished sending log messages, a probe_logger/1 function will confirm all messages in logger queue have been processed.
Here's the result:
> dlogger:sync(1000000, 1000) % 1 million log messages under 1000 requesting processes: qlen is 0, max qlen is 999, max mem is 690,400 Time: 861286.24 ms
> dlogger:async(1000000, 1000) % 1 million log messages under 1000 requesting processes: qlen is 0, max qlen is 68487, max mem is 75,830,616 Time: 2156351.45 ms
The performance in async mode is getting much worse comparing to sync mode. Under async mode, not only the elapsed time grew a lot, but also the max queue length reached to 68487, and the memory of logger process reached about 72M. Actually, after all messages had been sent, only a few messages had been processed by logger process. Since I kept 1000 processes alive, the logger process only <b>shared very very poor proportion CPU cycles</b>, the message processing procedure (log to disk buffer here) became very very slow, which caused the worse elapsed time. This case can be applied on any singleton process.
On the other side, under sync mode, the max queue length did not exceed the number of simultaneous requesting processes, here is 999 in case of 1000 simultaneous processes, and the max memory of logger process is reasonable in about 674K.
I'd like to emphasize the points:
- Singleton process in Erlang can not benefit from multiple-core scalability.
- Move code/job out of singleton process as much as possible to simultaneous processes, for example, before send the message to singleton process, done all pre-formatting/computing job.
- Some times, you need to sync the interacting between processes, to limit the singleton process working load.
A Case Study of Scalability Related "Out of memory" Crash in Erlang
We are building a platform for message switching, in Erlang. Everything looks OK on stability and features. It actually has run more than half year with zero down. We tested its performance on our 2-core CPU machine before, and got about 140 transactions/second, it's good enough.
Then, we got a 8-core CPU machine several weeks ago, and we did same performance testing on it, to see the scalability. Since Erlang is almost perfect on scalability, you can image the result, yes, about 700 transactions/second now, scaled almost linear. Until it crashed with "out of memory" when million hits processed.
It left a very big "erl_crash.dump" file there, I had to dig the issue. My first guess was, were some remote requests (access db, access remote web service etc) timeout but the process itself was not timeout yet, and cause more and more processes kept in VM?
A quick grep "=proc:" erl_crash.dump showed that the total number of processes was about 980, which was reasonable for our case.
So, which process ate so many memory? A quick grep "Stack+head" erl_crash.dump showed that there was indeed a process with 285082125 size of Stack+head there.
Following this clue, I caught this process:
=proc:<0.4.0> State: Garbing Name: error_logger Spawned as: proc_lib:init_p/5 Last scheduled in for: io_lib_format:pad_char/2 Spawned by: <0.1.0> Started: Sun Apr 1 01:21:50 2012 Message queue length: 2086029 Number of heap fragments: 1234053 Heap fragment data: 281266956 Link list: [<0.27.0>, <0.0.0>, {from,<0.42.0>,#Ref<0.0.0.88>}] Reductions: 72745575 Stack+heap: 285082125 OldHeap: 47828850 Heap unused: 121777661 OldHeap unused: 47828850 Program counter: 0x0764c66c (io_lib_format:pad_char/2 + 4) CP: 0x0764c1b4 (io_lib_format:collect_cseq/2 + 124)
This process was error_logger, which is from OTP/Erlang standard lib: error_logger, writing received messages to log file or tty. The typical usage is:
error_logger:info_msg("~p:~p " ++ Format, [?MODULE, ?LINE] ++ Data))
Which will format Data to a String according to the Format string, and write it to tty or log file.
he above case showed the message queue length of process "error_logger" had reached 1234053, and the Stack+heap was 285082125, about 272M size.
So the cause may be, that the message queue could not be processed in time, the messages were crowded in error_logger's process and finally caused "out of memory". The bottle-neck was that when error_logger tried to format the message to String, Erlang VM was weak on processing them, which seemed to need a lot of CPU cycles. In my previous blog, I talked about Erlang is bad on massive text processing. Erlang processes String/Text via List, which is obvious bottle-neck in Erlang now, with Erlang is getting much and much popular and more and more Erlang applications are written.
But, why this did not happen on our 2-core CPU machine? It's an interesting scalability related problem:
"error_logger" module will registered one and only one process to receive and handle all log messages. But Erlang VM's scheduler can not distribute ONE process to use multiple CPUs' computing ability. In our 2-core machine, the whole ability is about 140 transactions/second, the one process of "error_logger" just happened to have the power to handle corresponding log messages in time. Under 8-core CPUs machine, our platform scales to handle 700 transactions/second, but there is still only one process of "error_logger", which can not use 8-core CPUs' ability at all, and finally fail on it.
Erlang treats every process fairly (although you can change the priority manually), we can do a simple/quick evaluation:
- 2-Core machine, keeping hits at 140 trans/second:
The number of simultaneous processes will be about 200, each process shares the CPU cycles: 1/200 * 2 Core = 1%
- 8-Core machine, keeping hits at 700 trans/second:
The number of simultaneous processes will be about 980, each process shares the CPU cycles: 1/980 * 8 Core = 0.82%
So, the CPU cycles shared by error_logger process actually not increases. BTW, I think error_logger should cut its message queue when can not process them in time (disk IO may also be slower than receiving messages).
CN Erlounge III
I met Jackyz who is one of the translators of Chinese version "Programming Erlang". And Aimin who is writing a Delphi module to support Erlang c-node and c-driver in Delphi.
There is a commercial network monitoring product using Erlang from a major telecom company in China. And our Mobile-Banking platform (in Erlang) is scheduled to launch at middle of January too.
I talked with Yeka and Diuera from Broadview, a leading publisher in IT in China, they are really interested in importing "Programming in Scala" to mainland China.
And many thanks to Shiwei Xu, who is heavy working on Erlang community in China, and took the place to organize this conference.
I gave some encouragements to younger developers on learning Erlang and reading "Programming Erlang", since I'm the oldest one in attendees :-). Erlang is one of the best pragmatic and clear languages to learn concurrent/parallel and functional programming, and the book, is a very thoughtful and philosophic one on these perceptions.
And I'd like to see "Programming in Scala" also appeals in China soon, Scala is another pragmatic language on solving real world problems and, the book, is also thoughtful and philosophic one on our real world on Types, OO and FP.
Of course, choosing Scala or Erlang for your real world project should depend on the requirements.
I may be back to Vancouver next month for a while. Oh, it will be the beginning of new year.
CN Erlounge III photos by krzycube
Thinking in Scala vs Erlang
Keeping Erlang in mind, I've coded two months in Scala, I'm thinking something called "Scala vs Erlang", I wrote some benchmark code to prove me (the code and result may be available someday), and I'd like to do some gradually summary on it in practical aspect. These opinions may be or not be correct currently due to lacking of deep experience and understanding, but, anyway, I need to record them now and correct myself with more experiences and understanding got on both Scala and Erlang.
Part I. Syntax
Keeping Erlang in mind, I've coded two months in Scala, I'm thinking something called "Scala vs Erlang", I wrote some benchmark code to prove me (the code and result may be available someday), and I'd like to do some gradually summary on it in practical aspect. These opinions may be or not be correct currently due to lacking of deep experience and understanding, but, anyway, I need to record them now and correct myself with more experiences and understanding got on both Scala and Erlang.
Part I. Syntax
List comprehension
Erlang:
Lst = [1,2,3,4], [X + 1 || X <- Lst], lists:map(fun(X) -> X + 1 end, Lst)
Scala:
val lst = List(1,2,3,4) for (x <- lst) yield x + 1 lst.map{x => x + 1} lst.map{_ + 1} // or place holder
Pattern match
Erlang:
case X of {A, B} when is_integer(A), A > 1 -> ok; _ -> error end, {ok, [{A, B} = H|T]} = my_function(X)
Scala:
x match { case (a:Int, b:_) if a > 1 => OK // can match type case _ => ERROR } val ("ok", (h@(a, b)) :: t) = my_function(x)
List, Tuple, Array, Map, Binary, Bit
Erlang:
Lst = [1, 2, 3] %% List [0 | Lst] %% List concat {1, 2, 3} %% Tuple <<1, 2, "abc">> %% Binary %% no Array, Map syntax
Scala:
val lst = List(1, 2, 3) // List 0 :: lst // List concat (1, 2, 3) // Tuple Array(1, 2, 3) // Array Map("a" -> 1, "b" -> 2) // Map // no Binary, Bit syntax
Process, Actor
Erlang:
the_actor(X) -> receive ok -> io:format("~p~n", [X]); I -> the_actor(X + I) %% needs to explicitly continue loop end. P = spawn(mymodule, the_actor, [0]) P ! 1 P ! ok
Scala I:
class TheActor(x:Int) extends Actor { def act = loop { react { case "ok" => println(x); exit // needs to explicitly exit loop case i:Int => x += i } } } val a = new TheActor(0) a ! 1 a ! "ok"
Scala II:
val a = actor { def loop(x:Int) = { react { case "ok" => println(x) case i:Int => loop(x + i) } } loop(0) } a ! 1 a ! "ok"
Part II. Processes vs Actors
Something I
Erlang:
- Lightweight processes
- You can always (almost) create a new process for each new comer
- Scheduler treats all processes fairly
- Share nothing between processes
- Lightweight context switch between processes
- IO has been carefully delegated to independent processes
Scala:
- Active actor is delegated to JVM thread, actor /= thread
- You can create a new actor for each new comer
- But the amount of real workers (threads) is dynamically adjusted according to the processing time
- The later comers may be in wait list for further processing until a spare thread is available
- Share nothing or share something upon you decision
- Heavy context switch between working threads
- IO block is still pain unless good NIO framework (Grizzly?)
Something II
Erlang:
- Try to service everyone simultaneously
- But may loss service quality when the work is heavy, may time out (out of service)
- Ideal when processing cost is comparable to context switching cost
- Ideal for small message processing in soft real-time
- Bad for massive data processing, and cpu-heavy work
Scala:
- Try to service limited number of customers best first
- If can not service all, the later comers will be put in waiting list and may time out (out of service)
- It's difficult for soft real-time on all coming concurrent customers
- Ideal when processing cost is far more than context switching cost (context switch time is in μs on modern JVM)
- When will there be perfect NIO + Actor library?
Erlang Plugin for NetBeans - 0.17.0 Released
I'm pleased to announce Erlang plugin for NetBeans (ErlyBird) 0.17.0 is released.
This is a bug-fix release, and from now on, will be in form of NetBeans plugin.
NetBeans 6.5 is a requirement.
To download, please go to: https://sourceforge.net/project/showfiles.php?group_id=192439&package_id=226387&release_id=642911
To install:
- Open NetBeans, go to "Tools" -> "Plugins", click on "Downloaded" tab title, click on "Add Plugins..." button, choose the directory where the Erlang plugin are unzipped, select all listed *.nbm files, following the instructions. Restart IDE.
- Check/set your OTP path. From [Tools]->[Options], click on 'Miscellanous', then expand 'Erlang Installation', fill in the full path of your 'erl.exe' or 'erl' file. For instance: "C:/erl/bin/erl.exe"
When you open/create an Erlang project first time, the OTP libs will be indexed. The indexing time varies from 30 to 60 minutes depending on your computer.
Feedback and bug reports are welcome.
RPC Server for Erlang, In Scala
There has been Java code in my previous blog: RPC Server for Erlang, In Java, I'm now try to rewrite it in Scala. With the pattern match that I've been familiar with in Erlang, write the Scala version is really a pleasure. You can compare it with the Java version.
I do not try Scala's actor lib yet, maybe late.
And also, I should port Erlang's jinterface to Scala, since OtpErlangTuple?, OtpErlangList? should be written in Scala's Tuple and List.
The code is auto-formatted by NetBeans' Scala plugin, and the syntax highlighting is the same as in NetBeans, oh, not exactly.
/* * RpcMsg.scala * */ package net.lightpole.rpcnode import com.ericsson.otp.erlang.{OtpErlangAtom, OtpErlangList, OtpErlangObject, OtpErlangPid, OtpErlangRef, OtpErlangTuple} class RpcMsg(val call:OtpErlangAtom, val mod :OtpErlangAtom, val fun :OtpErlangAtom, val args:OtpErlangList, val user:OtpErlangPid, val to :OtpErlangPid, val tag :OtpErlangRef) { } object RpcMsg { def apply(msg:OtpErlangObject) : Option[RpcMsg] = msg match { case tMsg:OtpErlangTuple => tMsg.elements() match { /* {'$gen_call', {To, Tag}, {call, Mod, Fun, Args, User}} */ case Array(head:OtpErlangAtom, from:OtpErlangTuple, request:OtpErlangTuple) => if (head.atomValue.equals("$gen_call")) { (from.elements, request.elements) match { case (Array(to :OtpErlangPid, tag:OtpErlangRef), Array(call:OtpErlangAtom, mod :OtpErlangAtom, fun :OtpErlangAtom, args:OtpErlangList, user:OtpErlangPid)) => if (call.atomValue.equals("call")) { Some(new RpcMsg(call, mod, fun, args, user, to, tag)) } else None case _ => None } } else None case _ => None } case _ => None } }
/* * RpcNode.scala * * To change this template, choose Tools | Template Manager * and open the template in the editor. */ package net.lightpole.rpcnode import com.ericsson.otp.erlang.{OtpAuthException, OtpConnection, OtpErlangAtom, OtpErlangExit, OtpErlangObject, OtpErlangString, OtpErlangTuple, OtpSelf} import java.io.IOException import java.net.InetAddress import java.net.UnknownHostException import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.logging.Level import java.util.logging.Logger trait Cons { val OK = new OtpErlangAtom("ok") val ERROR = new OtpErlangAtom("error") val STOPED = new OtpErlangAtom("stoped") val THREAD_POOL_SIZE = 100 } /** * * Usage: * $ erl -sname clientnode -setcookie mycookie * (clientnode@cmac)> rpc:call(xnodename@cmac, xnode, parse, []). * * @author Caoyuan Deng */ abstract class RpcNode(xnodeName:String, cookie:String, threadPoolSize:Int) extends Cons { def this(xnodeName:String, cookie:String) = this(xnodeName, cookie, 100) private var xSelf:OtpSelf = _ private var sConnection:OtpConnection = _ private var execService:ExecutorService = Executors.newFixedThreadPool(threadPoolSize) private val flags = Array(0) startServerConnection(xnodeName, cookie) loop def startServerConnection(xnodeName:String, cookie:String ) = { try { xSelf = new OtpSelf(xnodeName, cookie); // The node then publishes its port to the Erlang Port Mapper Daemon. // This registers the node name and port, making it available to a remote client process. // When the port is published it is important to immediately invoke the accept method. // Forgetting to accept a connection after publishing the port would be the programmatic // equivalent of false advertising val registered = xSelf.publishPort(); if (registered) { System.out.println(xSelf.node() + " is ready."); /** * Accept an incoming connection from a remote node. A call to this * method will block until an incoming connection is at least * attempted. */ sConnection = xSelf.accept(); } else { System.out.println("There should be an epmd running, start an epmd by running 'erl'."); } } catch { case ex:IOException => case ex:OtpAuthException => } } def loop : Unit = { try { val msg = sConnection.receive val task = new Runnable() { override def run = RpcMsg(msg) match { case None => try { sConnection.send(sConnection.peer.node, new OtpErlangString("unknown request")); } catch { case ex:IOException => } case Some(call) => val t0 = System.currentTimeMillis flag(0) = processRpcCall(call) System.out.println("Rpc time: " + (System.currentTimeMillis() - t0) / 1000.0) } } execService.execute(task) if (flag(0) == -1) { System.out.println("Exited") } else loop } catch { case ex:IOException => loop case ex:OtpErlangExit => case ex:OtpAuthException => } } /** @throws IOException */ def sendRpcResult(call:RpcMsg, head:OtpErlangAtom, result:OtpErlangObject) = { val tResult = new OtpErlangTuple(Array(head, result)) // Should specify call.tag here val msg = new OtpErlangTuple(Array(call.tag, tResult)) // Should specify call.to here sConnection.send(call.to, msg, 1024 * 1024 * 10) } /** @abstact */ def processRpcCall(call:RpcMsg) : Int } object RpcCall { def getShortLocalHost : String = getLocalHost(false) def getLongLocalHost : String = getLocalHost(true) def getLocalHost(longName:Boolean) : String = { var localHost = "localhost" try { localHost = InetAddress.getLocalHost.getHostName; if (!longName) { /* Make sure it's a short name, i.e. strip of everything after first '.' */ val dot = localHost.indexOf(".") if (dot != -1) localHost = localHost.substring(0, dot) } } catch { case ex:UnknownHostException => } localHost } }
An Example Syntax in Haskell, Erlang and Scala
>>> Updated Oct 16:
I found some conventions of coding style make code more readable for Scala. For example, use <pre>{ x => something } instead of (x => dosomething)</pre> <p>for anonymous function; Use x, y, z as the names of arguments of anonymous functions; Put all modifiers to the ahead line etc. That makes me can pick up these functions by eyes quickly.</br>
======
It's actually my first time to write true Scala code, sounds strange? Before I write Scala code, I wrote a Scala IDE first, and am a bit familiar with Scala syntax now. And I've got about 1.5 year experience on Erlang, it began after I wrote ErlyBird.
Now it's time to write some real world Scala code, I choose to port Paul R. Brown's perpubplat blog engine, which is written in Haskell. And I have also some curiosities on how the syntax looks in Erlang, so I tried some Erlang code too.
Here's some code piece of entry module in Haskell, Erlang and Scala:
Original Haskell code piece
empty :: Model empty = Model M.empty M.empty M.empty [] 0 build_model :: [Item] -> Model build_model [] = empty build_model items = Model (map_by permatitle sorted_items) bid (build_child_map sorted_items) (sorted_items) (n+1) where sorted_items = sort_by_created_reverse items bid = (map_by internal_id sorted_items) n = fst . M.findMax $ bid build_child_map :: [Item] -> M.Map Int [Int] build_child_map i = build_child_map_ (M.fromList $ (map (\x -> (internal_id x,[])) i)) i -- Constructed to take advantage of the input being in sorted order. build_child_map_ :: M.Map Int [Int] -> [Item] -> M.Map Int [Int] build_child_map_ m [] = m build_child_map_ m (i:is) = if (parent i == Nothing) then build_child_map_ m is else build_child_map_ (M.insertWith (++) (unwrap $ parent i) [internal_id i] m) is sort_by_created_reverse :: [Item] -> [Item] sort_by_created_reverse = sortBy created_sort_reverse created_sort_reverse :: Item -> Item -> Ordering created_sort_reverse a b = compare (created b) (created a)
In Erlang:
% @spec empty :: Model empty() -> #model{}. % @spec build_model :: [Item] -> Model build_model([]) -> empty(); build_model(Is) -> SortedIs = sort_by_created_reverse(Is), Bid = dict:from_list([{I#item.internal_id, I} || I <- SortedIs]), N = lists:max(dict:fetch_keys(Bid)), #model{by_permatitle = dict:from_list([{X#item.permatitle, X} || X <- SortedIs]), by_int_id = Bid, child_map = build_child_map(SortedIs), all_items = SortedIs, next_id = N + 1}. % @spec build_child_map :: [Item] -> M.Map Int [Int] build_child_map(Is) -> build_child_map_(dict:from_list(lists:map(fun (X) -> {X#item.internal_id, []} end), Is), Is). %% Constructed to take advantage of the input being in sorted order. % @spec build_child_map_ :: M.Map Int [Int] -> [Item] -> M.Map Int [Int] build_child_map_(D, []) -> D; build_child_map_(D, [I|Is]) -> case I#item.parent of undefined -> build_child_map_(D, Is); P_Id -> build_child_map_(dict:append(unwrap(P_Id), I#item.internal_id, D), Is) end. % @spec sort_by_created_reverse :: [Item] -> [Item] sort_by_created_reverse(Is) -> lists:sort(fun created_sort_reverse/2, Is). % @spec created_sort_reverse :: Item -> Item -> Ordering created_sort_reverse(A, B) -> compare(B#item.created, A#item.created).
In Scala
object Entry { def empty = new Model() def build_model(is:List[Item]) = is match { case Nil => empty case _ => val sortedIs = sortByCreatedReverse(is) val bid = Map() ++ sortedIs.map{ x => (x.internalId -> x) } val n = bid.keys.toList.sort{ (x, y) => x > y }.head // max new Model(Map() ++ sortedIs.map{ x => (x.permatitle -> x) }, bid, buildChildMap(sortedIs), sortedIs, n + 1) } def buildChildMap(is:List[Item]) = buildChildMap_(Map() ++ is.map{ x => (x.internalId -> Nil) }, is) private def buildChildMap_(map:Map[Int, List[Int]], is:List[Item]) = { map ++ (for (i <- is if i.parent.isDefined; pid = i.parent.get; cids = map.getOrElse(pid, Nil)) yield (pid -> (cids + i.internalId))) } def sortByCreatedReverse(is:List[Item]) = is.sort{ (x, y) => x.created before y.created } }
>>> Updated Oct 16:
Per Martin's suggestion, the above code can be written more Scala style (the reasons are in the comments). Thanks, Martin.
object Entry { def empty = new Model() def build_model(is:List[Item]) = is match { case Nil => empty case _ => val sortedIs = sortByCreatedReverse(is) val bid = Map() ++ sortedIs.map{ x => (x.internalId -> x) } // use predefined max in Iterable val n = Iterable.max(bid.keys.toList) new Model(Map() ++ sortedIs.map{ x => (x.permatitle -> x) }, bid, buildChildMap(sortedIs), sortedIs, n + 1) } // you can use a wildcard anonymousfunction here def buildChildMap(is:List[Item]) = buildChildMap_(Map() ++ is.map(_.internalId -> Nil), is) private def buildChildMap_(map:Map[Int, List[Int]], is:List[Item]) = map ++ { // rewrite for so that definitions go into body -- it's more efficient. for (i <- is if i.parent.isDefined) yield { val pid = i.parent.get val cids = map.getOrElse(pid, Nil) pid -> (cids + i.internalId) } } // you can use a wildcard anonymous function here def sortByCreatedReverse(is:List[Item]) = is.sort{ _.created before _.created } }
======
I use ErlyBird for Erlang coding, and Scala for NetBeans for Scala coding. The experience seems that IDE is much aware of Scala, and I can get the typing a bit faster than writing Erlang code.
If you are not familiar with all these 3 languages, which one looks more understandable?
RPC Server for Erlang, In Java
We are using Erlang to do some serious things, one of them is indeed part of a banking system. Erlang is a perfect language in concurrent and syntax (yes, I like its syntax), but lacks static typing (I hope new added -spec and -type attributes may be a bit helping), and, is not suitable for processing massive data (performance, memory etc). I tried parsing a 10M size XML file with xmerl, the lib for XML in OTP/Erlang, which causes terrible memory disk-swap and I can never get the parsed tree out.
It's really a need to get some massive data processed in other languages, for example, C, Java etc. That's why I tried to write RPC server for Erlang, in Java.
There is a jinterface lib with OTP/Erlang, which is for communication between Erlang and Java. And there are docs for how to get it to work. But, for a RPC server that is called from Erlang, there are still some tips for real world:
1. When you send back the result to caller, you need set the result as a tuple, with caller's tag Ref as the first element, and the destination should be the caller's Pid. It's something like:
OtpErlangTuple msg = new OtpErlangTuple(new OtpErlangObject[] {call.tag, tResult}); sConnection.send(call.to, msg);
where, call.tag is a OtpErlangRef?, and tResult can be any OtpErlangObject?, call.to is a OtpErlangPid?.
2. If you need to send back a massive data back to caller, the default buffer size of OtpErlangOutputStream? is not good, I set it to 1024 * 1024 * 10
3. Since there may be a lot of concurrent callers call your RPC server, you have to consider the concurrent performance of your server, I choose using thread pool here.
The RPC server in Java has two class, RpcNode?.java, and RpcMsg?.java:
package net.lightpole.rpcnode; import com.ericsson.otp.erlang.OtpErlangAtom; import com.ericsson.otp.erlang.OtpErlangList; import com.ericsson.otp.erlang.OtpErlangObject; import com.ericsson.otp.erlang.OtpErlangPid; import com.ericsson.otp.erlang.OtpErlangRef; import com.ericsson.otp.erlang.OtpErlangTuple; /** * * @author Caoyuan Deng */ public class RpcMsg { public OtpErlangAtom call; public OtpErlangAtom mod; public OtpErlangAtom fun; public OtpErlangList args; public OtpErlangPid user; public OtpErlangPid to; public OtpErlangRef tag; public RpcMsg(OtpErlangTuple from, OtpErlangTuple request) throws IllegalArgumentException { if (request.arity() != 5) { throw new IllegalArgumentException("Not a rpc call"); } /* {call, Mod, Fun, Args, userPid} */ if (request.elementAt(0) instanceof OtpErlangAtom && ((OtpErlangAtom) request.elementAt(0)).atomValue().equals("call") && request.elementAt(1) instanceof OtpErlangAtom && request.elementAt(2) instanceof OtpErlangAtom && request.elementAt(3) instanceof OtpErlangList && request.elementAt(4) instanceof OtpErlangPid && from.elementAt(0) instanceof OtpErlangPid && from.elementAt(1) instanceof OtpErlangRef) { call = (OtpErlangAtom) request.elementAt(0); mod = (OtpErlangAtom) request.elementAt(1); fun = (OtpErlangAtom) request.elementAt(2); args = (OtpErlangList) request.elementAt(3); user = (OtpErlangPid) request.elementAt(4); to = (OtpErlangPid) from.elementAt(0); tag = (OtpErlangRef) from.elementAt(1); } else { throw new IllegalArgumentException("Not a rpc call."); } } /* {'$gen_call', {To, Tag}, {call, Mod, Fun, Args, User}} */ public static RpcMsg tryToResolveRcpCall(OtpErlangObject msg) { if (msg instanceof OtpErlangTuple) { OtpErlangTuple tMsg = (OtpErlangTuple) msg; if (tMsg.arity() == 3) { OtpErlangObject[] o = tMsg.elements(); if (o[0] instanceof OtpErlangAtom && ((OtpErlangAtom) o[0]).atomValue().equals("$gen_call") && o[1] instanceof OtpErlangTuple && ((OtpErlangTuple) o[1]).arity() == 2 && o[2] instanceof OtpErlangTuple && ((OtpErlangTuple) o[2]).arity() == 5) { OtpErlangTuple from = (OtpErlangTuple) o[1]; OtpErlangTuple request = (OtpErlangTuple) o[2]; try { return new RpcMsg(from, request); } catch (IllegalArgumentException ex) { ex.printStackTrace(); } } } } return null; } }
package net.lightpole.rpcnode; import com.ericsson.otp.erlang.OtpAuthException; import com.ericsson.otp.erlang.OtpConnection; import com.ericsson.otp.erlang.OtpErlangAtom; import com.ericsson.otp.erlang.OtpErlangExit; import com.ericsson.otp.erlang.OtpErlangObject; import com.ericsson.otp.erlang.OtpErlangString; import com.ericsson.otp.erlang.OtpErlangTuple; import com.ericsson.otp.erlang.OtpSelf; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; /** * * Usage: * $ erl -sname clientnode -setcookie mycookie * (clientnode@cmac)> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []). * * @author Caoyuan Deng */ public abstract class RpcNode { public static final OtpErlangAtom OK = new OtpErlangAtom("ok"); public static final OtpErlangAtom ERROR = new OtpErlangAtom("error"); public static final OtpErlangAtom STOPED = new OtpErlangAtom("stoped"); private static final int THREAD_POOL_SIZE = 100; private OtpSelf xSelf; private OtpConnection sConnection; private ExecutorService execService; public RpcNode(String xnodeName, String cookie) { this(xnodeName, cookie, THREAD_POOL_SIZE); } public RpcNode(String xnodeName, String cookie, int threadPoolSize) { execService = Executors.newFixedThreadPool(threadPoolSize); startServerConnection(xnodeName, cookie); loop(); } private void startServerConnection(String xnodeName, String cookie) { try { xSelf = new OtpSelf(xnodeName, cookie); boolean registered = xSelf.publishPort(); if (registered) { System.out.println(xSelf.node() + " is ready."); /** * Accept an incoming connection from a remote node. A call to this * method will block until an incoming connection is at least * attempted. */ sConnection = xSelf.accept(); } else { System.out.println("There should be an epmd running, start an epmd by running 'erl'."); } } catch (IOException ex) { Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex); } catch (OtpAuthException ex) { Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex); } } private void loop() { while (true) { try { final int[] flag = {0}; final OtpErlangTuple msg = (OtpErlangTuple) sConnection.receive(); Runnable task = new Runnable() { public void run() { RpcMsg call = RpcMsg.tryToResolveRcpCall(msg); if (call != null) { long t0 = System.currentTimeMillis(); flag[0] = processRpcCall(call); System.out.println("Rpc time: " + (System.currentTimeMillis() - t0) / 1000.0); } else { try { sConnection.send(sConnection.peer().node(), new OtpErlangString("unknown request")); } catch (IOException ex) { Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex); } } } }; execService.execute(task); if (flag[0] == -1) { System.out.println("Exited"); break; } } catch (OtpErlangExit ex) { Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex); } catch (IOException ex) { Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex); } catch (OtpAuthException ex) { Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex); } } } protected void sendRpcResult(RpcMsg call, OtpErlangAtom head, OtpErlangObject result) throws IOException { OtpErlangTuple tResult = new OtpErlangTuple(new OtpErlangObject[] {head, result}); // Should specify call.tag here OtpErlangTuple msg = new OtpErlangTuple(new OtpErlangObject[]{call.tag, tResult}); // Should specify call.to here sConnection.send(call.to, msg, 1024 * 1024 * 10); } public abstract int processRpcCall(RpcMsg call); // ------ helper public static String getShortLocalHost() { return getLocalHost(false); } public static String getLongLocalHost() { return getLocalHost(true); } private static String getLocalHost(boolean longName) { String localHost; try { localHost = InetAddress.getLocalHost().getHostName(); if (!longName) { /* Make sure it's a short name, i.e. strip of everything after first '.' */ int dot = localHost.indexOf("."); if (dot != -1) { localHost = localHost.substring(0, dot); } } } catch (UnknownHostException e) { localHost = "localhost"; } return localHost; } }
As you can see, the RpcNode? is an abstract class, by implement int processRpcCall(RpcMsg? call), you can get your what ever wanted features. For example:
package net.lightpole.xmlnode; import basexnode.Main; import com.ericsson.otp.erlang.OtpErlangAtom; import com.ericsson.otp.erlang.OtpErlangList; import com.ericsson.otp.erlang.OtpErlangObject; import com.ericsson.otp.erlang.OtpErlangString; import java.io.IOException; import net.lightpole.rpcnode.RpcMsg; import net.lightpole.rpcnode.RpcNode; /** * * @author dcaoyuan */ public class MyNode extends RpcNode { public MyNode(String xnodeName, String cookie, int threadPoolSize) { super(xnodeName, cookie, threadPoolSize); } @Override public int processRpcCall(RpcMsg call) { final String modStr = call.mod.atomValue(); final String funStr = call.fun.atomValue(); final OtpErlangList args = call.args; try { OtpErlangAtom head = ERROR; OtpErlangObject result = null; if (modStr.equals("xnode") && funStr.equals("stop")) { head = OK; sendRpcResult(call, head, STOPED); return -1; } if (modStr.equals("System") && funStr.equals("currentTimeMillis")) { head = OK; long t = System.currentTimeMillis(); result = new OtpErlangLong(t); } else { result = new OtpErlangString("{undef,{" + modStr + "," + funStr + "}}"); } if (result == null) { result = new OtpErlangAtom("undefined"); } sendRpcResult(call, head, result); } catch (IOException ex) { ex.printStackTrace(); } catch (Exception ex) { } return 0; } }
I tested MyNode? by:
$ erl -sname clientnode -setcookie mycookie ... (clientnode@cmac)> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).
And you can try to test its concurrent performance by:
%% $ erl -sname clientnode -setcookie mycookie %% > xnode_test:test(10000)
-module(xnode_test). -export([test/1]). test(ProcN) -> Workers = [spawn_worker(self(), fun rpc_parse/1, {}) || I <- lists:seq(0, ProcN - 1)], Results = [wait_result(Worker) || Worker <- Workers]. rpc_parse({}) -> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []). spawn_worker(Parent, F, A) -> erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end). wait_result({Pid, Ref}) -> receive {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end; {'DOWN', Ref, _, _, Reason} -> exit(Reason) end.
I spawned 10000 calls to it, and it run smoothly.
I'm also considering to write a more general-purpose RPC server in Java, which can dynamically call any existed methods of Java class.
Erlang for NetBeans (ErlyBird) Recent Updates
Erlang for NetBeans (ErlyBird) has been put on the NetBeans' mercurial trunk. I added a changelog page on http://wiki.netbeans.org/ErlangChangelog, where you can learn the latest progressing.
Here is a summary of recent updates:
20080223
- Instant rename, or refactoring for local vars and functions (put caret on var or function name, press CTRL+R)
- Fixed: syntax broken for packaged import attribute
- Fixed: syntax broken for wild attribute
- Completion suggestions will search on project's own paths only
- Track GSF changes: reindex performance was improved a lot; Can live with other GSF based language support now (Ruby, Groovy etc)
ErlyBird 0.16.0 will be available soon.
On Travel
I'm in Vancouver Airport right now, using the new free Wi-Fi service here. After four-day trip to San Francisco, I'll fly to China for the traditional Spring Festival.
I met friends in San Francisco, we are developing something using Erlang as I mentioned before. What we' are building is somehow a "switch" for content, we've successfully got a lot of different content sources to be "switched" to standard Atom/Rss etc, with a new designed template language (based in Erlang). Actually It's a pleasure to write these switching code in Erlang (Parse, Map, Mashup) with the pattern match syntax and xmerl lib.
Regexp Syntax in Pattern Match?
Pattern match in Erlang is very useful, but it has some limits, for example, to match something like:
\d\d\dx/\d\d\d\d/\d\d/\d\d/
I have to write the code as:
<<_:S/binary, C1,C2,C3,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,_/binary>> when C1 > 47, C1 < 58, C2 > 47, C2 < 58, C3 > 47, C3 < 58, Y1 > 47, Y1 < 58, Y2 > 47, Y2 < 58, Y3 > 47, Y3 < 58, Y4 > 47, Y4 < 58, M1 > 47, M1 < 58, M2 > 47, M2 < 58, D1 > 47, D2 < 58, D2 > 47, D2 < 58
But life can be simple, by using parse_transform, we can write above code to:
<<_:S/binary,d,d,d,$x,$/,d,d,d,d,$/,d,d,$/,d,d,$/,_/binary>>Where d is digital. The parse_trasnform can process the AST tree to make the true code.
And more, since current erl_parse supports any atom in binary skeleton, we can write pattern match as:
<<_:S/binary,'[a-z]','[^abc]','var[0-9]',$x,$/,d,d,d,d,$/,d,d,$/,d,d,$/,_/binary>>
Will somebody write such a parse_transform?
Tim Bray's Erlang Exercise "WideFinder" - After Find Wide
>>> Updated Nov 7:
A new version tbray9a.erl which uses ets instead of dict, with 116 LoC, took about 3.8 sec on 5 million lines log file, 0.93 sec on 1 million lines file on my 4-core linux box.
- Results on T5120, an 8-core 1.4 GHz machine with 2 integer instruction threads per core and support for 8 thread contexts per core. Solaris thinks it sees 64 CPUs:
Schedulers# | Elapsed(s) | User(s) | System(s) | (User+System)/Elapsed |
1 | 37.58 | 35.51 | 7.82 | 1.15 |
2 | 20.14 | 35.31 | 8.28 | 2.16 |
4 | 11.81 | 35.37 | 8.25 | 3.69 |
8 | 7.63 | 35.28 | 8.33 | 5.72 |
16 | 5.60 | 36.08 | 8.27 | 7.92 |
32 | 5.29 | 36.64 | 8.11 | 8.46 |
64 | 5.45 | 36.79 | 8.23 | 8.26 |
128 | 5.26 | 36.75 | 8.39 | 8.58 |
When schedulers was 16, (User+System)/Elapsed was 7.92, and the elapsed time reached 5.60 sec (near the best), so, the 8-core computing ability and 2 x 8 =16 integer instruction threads ability almost reached the maxima. The 8 x 8 thread contexts seemed to do less help on gaining more performance improvement.
It seems that T5120 is a box with 8-core parallel computing ability and 16-thread for scheduler?
On my 4-core linux box, the slowest time (1 scheduler) vs the fastest time (128 schedulers) was 6.627/3.763 = 1.76. On this T5120, was 37.58/5.26 = 7.14. So, with the 8-core and 16-integer-instruction-thread combination, the T5120 is pretty good on parallel computing.
An interesting point is, when schedulers increased, Elpased time dropped along with User time and System time keeping almost constancy. This may because I separated the parallelized / sequential part completely in my code.
- Results on 2.80Ghz 4-core Intel xeon linux box (5 million lines log file):
Schedulers# | Elapsed(s) | User(s) | System(s) | (User+System)/Elapsed |
1 | 6.627 | 5.356 | 4.248 | 1.45 |
2 | 4.486 | 6.176 | 3.936 | 2.25 |
4 | 4.299 | 8.989 | 4.156 | 3.06 |
8 | 3.960 | 9.629 | 3.644 | 3.35 |
16 | 3.826 | 9.101 | 3.696 | 3.34 |
32 | 3.858 | 9.029 | 3.840 | 3.34 |
64 | 3.763 | 8.801 | 3.820 | 3.35 |
128 | 3.920 | 9.137 | 3.980 | 3.35 |
========
I'm a widefinder these days, and after weeks found wide, I worte another concise and fast widefinder tbray9.erl, which is based on Steve, Anders and my previous code, with Boyer-Moore searching (It seems Python's findall uses this algorithm) and parallelized file reading. It's in 120 LoC (a bit shorter than Fredrik Lundh's wf-6.py), took about 1 sec for 1 million lines log file, and 5.2 sec for 5 million lines on my 4-core linux box. Got 5.29 sec on T5120 per Tim's testing.
To evaluate:
erlc -smp tbray9.erl erl -smp +A 1024 +h 10240 -noshell -run tbray9 start o1000k.ap
BTW since I use parallelized io heavily, by adding flag +A 1024, the code can get 4.3 sec for 5 million lines log file on my 4-core linux box.
Binary efficiency in Erlang is an interesting topic, except some tips I talked about in previouse blog, it seems also depending on binary size, memory size etc., The best buffer size for my code seems to be around 20000k to 80000k, which is the best range on my 4-core linux box and T5120, but it may vary for different code.
Note: There is a maximum element size limit of 227 - 1 (about 131072k) for binary pattern matching in current Erlang, this would be consistent with using a 32-bit word to store the size value (with 4 of those bits used for a type identifier and 1 bit for a sign indicator) (For this topic, please see Philip Robinson's blog). So, the buffer size can not be great than 131072k.
I. Boyer-Moore searching
Thanks to Steve and Anders, they've given out a concise Boyer-Moore searching algorithm in Erlang, I can modify it a bit to get a general BM searching module for ASCII encoded binary:
%% Boyer-Moore searching on ASCII encoded binary -module(bmsearch). -export([compile/1, match/3]). -record(bmCtx, {pat, len, tab}). compile(Str) -> Len = length(Str), Default = dict:from_list([{C, Len} || C <- lists:seq(1, 255)]), Dict = set_shifts(Str, Len, 1, Default), Tab = list_to_tuple([Pos || {_, Pos} <- lists:sort(dict:to_list(Dict))]), #bmCtx{pat = lists:reverse(Str), len = Len, tab = Tab}. set_shifts([], _, _, Dict) -> Dict; set_shifts([C|T], StrLen, Pos, Dict) -> set_shifts(T, StrLen, Pos + 1, dict:store(C, StrLen - Pos, Dict)). %% @spec match(Bin, Start, #bmCtx) -> {true, Len} | {false, SkipLen} match(Bin, S, #bmCtx{pat=Pat, len=Len, tab=Tab}) -> match_1(Bin, S + Len - 1, Pat, Len, Tab, 0). match_1(Bin, S, [C|T], Len, Tab, Count) -> <<_:S/binary, C1, _/binary>> = Bin, case C1 of C -> match_1(Bin, S - 1, T, Len, Tab, Count + 1); _ -> case element(C1, Tab) of Len -> {false, Len}; Shift when Shift =< Count -> {false, 1}; Shift -> {false, Shift - Count} end end; match_1(_, _, [], Len, _, _) -> {true, Len}.
Usage:
> Pattern = bmsearch:compile("is a"). {bmCtx,"a si",4,{4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,...}} > Bin = <<"this is a test">>. <<"this is a test">> > bmsearch:match(Bin, 1, Pattern). {false,1} > bmsearch:match(Bin, 5, Pattern). {true, 4} > bmsearch:match(Bin, 7, Pattern). {false, 4}
II. Reading file in parallel and scan
To read file in parallel, we should open a new file handle for each process. To resolve the line break bound, we just split each chunk to first line (Head), line-bounded Data, and last line (Tail), and mark Head, Tail with the serial number as {I * 10, Head}, {I * 10 + 1 Tail}, so we can join all pending segments (Head and Tail of each chunk) later in proper order.
scan_file({FileName, Size, I, BmCtx}) -> {ok, File} = file:open(FileName, [raw, binary]), {ok, Bin} = file:pread(File, Size * I, Size), file:close(File), HeadL = split_on_first_newline(Bin), TailS = split_on_last_newline(Bin), DataL = TailS - HeadL, <<Head:HeadL/binary, Data:DataL/binary, Tail/binary>> = Bin, {scan_chunk({Data, BmCtx}), {I * 10, Head}, {I * 10 + 1, Tail}}.
III. Spawn workers
["Luke's spawn_worker" Luke's spawn_worker] are two small functions, they are very useful, stable and good abstract on processing workers:
spawn_worker(Parent, F, A) -> erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end). wait_result({Pid, Ref}) -> receive {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end; {'DOWN', Ref, _, _, Reason} -> exit(Reason) end.
So, I can start a series of workers simply by:
read_file(FileName, Size, ProcN, BmCtx) -> [spawn_worker(self(), fun scan_file/1, {FileName, Size, I, BmCtx}) || I <- lists:seq(0, ProcN - 1)].
And then collect the results by:
Results = [wait_result(Worker) || Worker <- read_file(FileName, ?BUFFER_SIZE, ProcN1, BmCtx)],
In this case, returning Results is a list of {Dict, {Seq1, Head}, {Seq2, Tail}}
IV. Concat segments to a binary and scan it
Each chunk is slipt to Head (first line), line-bounded Data, and Tail (last line). The Head and Tail segments are pending for further processing. After all workers finished scanning Data (got a Dict), we can finally sort these pending segments by SeqNum?, concat and scan them in main process.
Unzip the results to Dicts and Segs, sort Segs by SeqNum?:
{Dicts, Segs} = lists:foldl(fun ({Dict, Head, Tail}, {Dicts, Segs}) -> {[Dict | Dicts], [Head, Tail | Segs]} end, {[], []}, Results), Segs1 = [Seg || {_, Seg} <- lists:keysort(1, Segs)],
The sorted Segments is a list of binary, list_to_binary/1 can concat them to one binary efficently, you do not need to care about if it's a deep list, they will be flatten automatically:
Dict = scan_chunk({list_to_binary(Segs1), BmCtx}),
Conclution
Thinking in parallel is fairly simple in Erlang, right? Most of the code can run in one process, and if you want to spawn them for parallel, you do not need to modify the code too much. In this case, scan_file/4 is sequential, which just return all you want, you can spawn a lot of workers which do scan_file/4 work, then collect the results later. That's all.
Learning Coding Binary (Was Tim's Erlang Exercise - Round VI)
>>> Updated Nov 1:
Tim tested tbray5.erl on T5120, for his 971,538,252 bytes of data in 4,625,236 lines log file, got:
real 0m20.74s user 3m51.33s sys 0m8.00s
The result was what I guessed, since the elapsed time of my code was 3 times of Anders' on my machine. I'm glad that Erlang performs linearly on different machines/os.
My code not the fastest. I did not apply Boyer-Moore searching, thus scan_chunk_1/4 has to test/skip binary 1byte by 1byte when not exactly matched. Anyway, this code shows how to code binary efficiently, and demos the performance of traversing binary byte by byte (the performance is not so bad now, right?). And also, it's what I want: a balance between simple, readable and speed.
Another approach for lazy man is something binary pattern match hacking, we can modify scan_chunk_1/4 to:
scan_chunk_1(Bin, DataL, S, Dict) when S < DataL - 34 -> Offset = case Bin of <<_:S/binary,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 34; <<_:S/binary,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 35; <<_:S/binary,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 36; <<_:S/binary,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 37; <<_:S/binary,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 38; <<_:S/binary,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 39; <<_:S/binary,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 40; <<_:S/binary,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 41; <<_:S/binary,_,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 42; <<_:S/binary,_,_,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 43; _ -> undefined end, case Offset of undefined -> scan_chunk_1(Bin, DataL, S + 10, Dict); _ -> case match_until_space_newline(Bin, S + Offset) of {true, E} -> Skip = S + Offset - 12, L = E - Skip, <<_:Skip/binary,Key:L/binary,_/binary>> = Bin, scan_chunk_1(Bin, DataL, E + 1, dict:update_counter(Key, 1, Dict)); {false, E} -> scan_chunk_1(Bin, DataL, E + 1, Dict) end end; scan_chunk_1(_, _, _, Dict) -> Dict.
The elapsed time dropped to 1.424 sec immediatley vs 2.792 sec before, speedup about 100%, on my 4-CPU linux box.
If you are patient, you can copy-paste 100... such lines :-) (in this case, I'd rather to pick Boyer-Moore), and the elapsed time will drop a little bit more, but not much after 10 lines or so.
========
>>> Updated Oct 29:
Pihis updated his WideFinder, applied guildline II, and beat Ruby on his one-core box. And he also modified Anders's code by removing all un-necessary remaining binary bindings (which cause un-necessay sub-binary splitting), then, Steve tested the refined code, and got 0.567s on his famous 8-CPU linux box. Now, we may reach the real Disk/IO bound, should we try parallelized file reading? but, I've tired of more widefinders. BTW, May we have regexped pattern match in syntax level? The End.
========
>>> Updated Oct 26:
Code cleanup
========
Binary usaully is more efficent than List in Erlang.
The memory size of Binary is 3 to 6 words plus Data itself, the Data can be allocated / deallocated in global heap, so the Data can be shared over function calls, shared over processes when do message passing (on the same node), without copying. That's why heap size affects a lot on binary.
The memory size of List is 1 word per element + the size of each element, and List is always copying between function calls, on message passing.
In my previous blogs about Tim's exercise, I suspected the performance of Binary traverse. But, with more advices, experience, it seems binary can work very efficient as an ideal Dataset processing struct.
But, there are some guidelines for efficient binary in Erlang, I'll try to give out here, which I learned from the exercise and experts.
I. Don't split a binary unless the split binaries are what you exactly want
Splitting/combining binaries is expensive, so when you want to get values from a binary at some offsets:
Do
<<_:Offset/binary, C, _/binary>> = Bin, io:format("Char at Offset: ~p", [C]).
Do Not *
<<_:Offset/binary, C/binary, _/binary>> = Bin, io:format("Char at Offset: ~p", [C]).
* This may be good in R12B
And when you want to split a binary to get Head or Tail only:
Do
<<Head:Offset/binary,_/binary>> = Bin.
Do Not
{Head, _} = split_binary(Bin, Offset).
II. Calculate the final offsets first, then split it when you've got the exactly offsets
When you traverse binary to test the bytes/bits, calculate and collect the final offsets first, don't split binary (bind named Var to sub-binary) at that time. When you've got all the exactly offsets, split what you want finally:
Do
get_nth_word(Bin, N) -> Offsets = calc_word_offsets(Bin, 0, [0]), S = element(N, Offsets), E = element(N + 1, Offsets), L = E - S, <<_:S/binary,Word:L/binary,_/binary>> = Bin, io:format("nth Word: ~p", [Word]). calc_word_offsets(Bin, Offset, Acc) when Offset < size(Bin) -> case Bin of <<_:Offset/binary,$ ,_/binary>> -> calc_word_offsets(Bin, Offset + 1, [Offset + 1 | Acc]); _ -> calc_word_offsets(Bin, Offset + 1, Acc) end; calc_word_offsets(_, _, Acc) -> list_to_tuple(lists:reverse(Acc)). Bin = <<"This is a binary test">>, get_nth_word(Bin, 4). % <<"binary ">>
Do Not
get_nth_word_bad(Bin, N) -> Words = split_words(Bin, 0, []), Word = element(N, Words), io:format("nth Word: ~p", [Word]). split_words(Bin, Offset, Acc) -> case Bin of <<Word:Offset/binary,$ ,Rest/binary>> -> split_words(Rest, 0, [Word | Acc]); <<_:Offset/binary,_,_/binary>> -> split_words(Bin, Offset + 1, Acc); _ -> list_to_tuple(lists:reverse([Bin | Acc])) end. Bin = <<"This is a binary test">>, get_nth_word_bad(Bin, 4). % <<"binary">>
III. Use "+h Size" option or [{min_heap_size, Size}] with spawn_opt
This is very important for binary performance. It's somehow a Key Number for binary performance. With this option set properly, the binary performs very well, otherwise, worse.
IV. Others
- Don't forget to compile to native by adding "-compile([native])." in your code.
- Maybe "+A Size" to set the number of threads in async thread pool also helps a bit when do IO.
Practice
Steve and Anders have pushed widefinder in Erlang to 1.1 sec on 8-CPU linux box. Their code took about 1.9 sec on my 4-CPU box. Then, how about a concise version?
According to above guide, based on my previous code and Per's dict code, Luke's spawn_worker, I rewrote a concise and straightforward tbray5.erl (less than 80 LOC), without any extra c-drived modules for Tim's exercise , and got about 2.972 sec for 1 milli lines log file, and 15.695 sec for 5 milli lines, vs no-parallelized Ruby's 4.161 sec and 20.768 sec on my 2.80Ghz 4-CPU Intel Xeon linux box:
BTW, using ets instead of dict is almost the same.
$ erlc -smp tbray5.erl $ time erl +h 8192 -smp -noshell -run tbray5 start o1000k.ap -s erlang halt real 0m2.972s user 0m9.685s sys 0m0.748s $ time erl +h 8192 -smp -noshell -run tbray5 start o5000k.ap -s erlang halt real 0m15.695s user 0m53.551s sys 0m4.268s
On 2.0GHz 2-core MacBook (Ruby code took 2.447 sec):
$ time erl +h 8192 -smp -noshell -run tbray5 start o1000k.ap -s erlang halt real 0m3.034s user 0m4.853s sys 0m0.872s
The Code: tbray5.erl
-module(tbray5). -compile([native]). -export([start/1]). -define(BUFFER_SIZE, (1024 * 10000)). start(FileName) -> Dicts = [wait_result(Worker) || Worker <- read_file(FileName)], print_result(merge_dicts(Dicts)). read_file(FileName) -> {ok, File} = file:open(FileName, [raw, binary]), read_file_1(File, 0, []). read_file_1(File, Offset, Workers) -> case file:pread(File, Offset, ?BUFFER_SIZE) of eof -> file:close(File), Workers; {ok, Bin} -> DataL = split_on_last_newline(Bin), Worker = spawn_worker(self(), fun scan_chunk/1, {Bin, DataL}), read_file_1(File, Offset + DataL + 1, [Worker | Workers]) end. split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)). split_on_last_newline_1(Bin, S) when S > 0 -> case Bin of <<_:S/binary,$\n,_/binary>> -> S; _ -> split_on_last_newline_1(Bin, S - 1) end; split_on_last_newline_1(_, S) -> S. scan_chunk({Bin, DataL}) -> scan_chunk_1(Bin, DataL, 0, dict:new()). scan_chunk_1(Bin, DataL, S, Dict) when S < DataL - 34 -> case Bin of <<_:S/binary,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> case match_until_space_newline(Bin, S + 34) of {true, E} -> Skip = S + 23, L = E - Skip, <<_:Skip/binary,Key:L/binary,_/binary>> = Bin, scan_chunk_1(Bin, DataL, E + 1, dict:update_counter(Key, 1, Dict)); {false, E} -> scan_chunk_1(Bin, DataL, E + 1, Dict) end; _ -> scan_chunk_1(Bin, DataL, S + 1, Dict) end; scan_chunk_1(_, _, _, Dict) -> Dict. match_until_space_newline(Bin, S) when S < size(Bin) -> case Bin of <<_:S/binary,10,_/binary>> -> {false, S}; <<_:S/binary,$.,_/binary>> -> {false, S}; <<_:S/binary,_,$ ,_/binary>> -> {true, S + 1}; _ -> match_until_space_newline(Bin, S + 1) end; match_until_space_newline(_, S) -> {false, S}. spawn_worker(Parent, F, A) -> erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end). wait_result({Pid, Ref}) -> receive {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end; {'DOWN', Ref, _, _, Reason} -> exit(Reason) end. merge_dicts([D1,D2|Rest]) -> merge_dicts([dict:merge(fun(_, V1, V2) -> V1 + V2 end, D1, D2) | Rest]); merge_dicts([D]) -> D. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].
Tim's Erlang Exercise - Summary
>>> Updated Nov 1:
Tim tested my last attempt tbray5.erl, which was described on Learning Coding Binary (Was Tim's Erlang Exercise - Round VI), got for his 971,538,252 bytes of data in 4,625,236 lines log file:
real 0m20.74s user 3m51.33s sys 0m8.00s
It's not the fastest, since I did not apply Boyer-Moore searching. But it's what I want: a balance between simple, readable and speed.
========
>>> Updated Oct 24:
The Erlang code can be faster than un-parallelized Ruby, a new version run 2.97 sec on the 4-CPU box: Learning Coding Binary (Was Tim's Erlang Exercise - Round VI)
========
>>> Updated Oct 22:
As Bjorn's suggestion, I added "+h 4096" option for 'erl', which means "sets the default heap size of processes to the size 4096", the elapsed time dropped from 7.7s to 5.5s immediately:
time erl +h 4096 -smp -noshell -run tbray4 start o1000k.ap 10 -s erlang halt
The +h option seems to affect on binary version a lot, but few on list version. This may be caused by that list is always copied and binary may be left in process' heap and passed by point?
The default heap size for each process is set to 233 Word, this number may be suitable for a lot of concurrent processes to avoid too much memory exhaust. But for some parallelization tasks, with less processes, or with enough memory, the heap size can be adjusted to a bit large.
Anyway, I think Erlang/OTP has been very good there for Concurrency, but there may be still room to optimize for Parallelization.
BTW, with +h option, and some tips for efficient binary, the most concise binary version tbray5.erl can run into 3 sec now.
========
This is a performance summary on Tim's Erlang exercise on large dataset processing, I only compare the results on a 4-CPU Intel Xeon 2.80G linux box:
Log File | Time | Erlang(1 Proc) | Erlang(Many Proc) | Erlang(Many Proc) +h 4096 | Ruby |
1 milli lines | real | 22.088s | 7.700s | 5.475s | 4.161s |
user | 21.161s | 25.750s | 18.785s | 3.592s | |
sys | 0.924s | 3.552s | 1.352s | 0.568s | |
5 milli lines | real | 195.570s | 37.669s | 27.911s | 20.768s |
user | 192.496s | 126.296s | 98.162s | 19.009s | |
sys | 3.480s | 17.789s | 7.344s | 3.116s |
Notice:
- The Erlang code is tbray4.erl, which can be found in previous blog, the Ruby code is from Tim's blog.
- Erlang code is parallelized, Ruby code not.
- Erlang code is with tons of code, but, parallelization is not free lunch.
- With an 8-CPU box, Erlang's version should exceed or near to non-parallelized Ruby version*.
- Although we are talking about multiple-core era, but I'm not sure if disk/io is also ready.
* Per Steve's testing.
CN Erlounge II
It was last weekend, in Zhuhai, China, a two day CN Erlounge II, discussed topics of Erlang and FP:
- Why I choose Erlang? - by xushiwei
- Erlang emulator implementation - by mryufeng
- Port & driver - by codeplayer
- Py 2 Erl - by Zoom.Quiet
- STM: Lock free concurrent Overview - by Albert Lee
- mnesia - by mryufeng
And a new logo of "ERL.CHINA" was born as:
(Picture source - http://www.haokanbu.com/story/1104/)
More information about CN Erlounge II can be found here and some pictures.
I did not schedule for this meeting, maybe next time.
Learning Coding Parallelization (Was Tim's Erlang Exercise - Round V)
>>> Updated Oct 20:
After cleaned up my 4-CPU linux box, the result for the 1M records file is about 7.7 sec, for the 5M records file is about 38 sec.
========
>>> Updated Oct 16:
After testing my code on different machines, I found that disk/io performed varyingly, for some very large files, reading file in parallel may cause longer elapsed time (typically on non-server machine, which is not equipped for fast disk/io). So, I added another version tbray4b.erl, in this version, only reading file is not parallalized, all other code is the same. If you'd like to have a test on your machine, please try both.
========
Well, I think I've learned a lot from doing Tim's exercise, not only the List vs Binary in Erlang, but also computing in parallel. Coding Concurrency is farely easy in Erlang, but coding Parallelization is not only about the Languages, it's also a real question.
I wrote tbray3.erl in The Erlang Way (Was Tim Bray's Erlang Exercise - Round IV) and got a fairly good result by far on my 2-core MacBook. But things always are a bit complex. As Steve pointed in the comment, when he tried tbray3.erl on his 8-core linux box:
"I ran it in a loop 10 times, and the best time I saw was 13.872 sec, and user/CPU time was only 16.150 sec, so it’s apparently not using the multiple cores very well."
I also encoutered this issue on my 4-CPU Intel Xeon CPU 2.80GHz debian box, it runs even worse (8.420s) than my 2-core MacBook (4.483s).
I thought about my code a while, and found that my code seems spawning too many processes for scan_chunk, as the scan_chunk's performance has been improved a lot, each process will finish its task very quickly, too quick to the file reading, the inceasing CPUs have no much chance to play the game, the cycled 'reading'-'spawning scan process' is actually almost sequential now, there has been very few simultaneously alive scanning processes. I think I finally meet the file reading bound.
But wait, as I claimed before, that reading file to memory is very fast in Erlang, for a 200M log file, it takes less than 800ms. The time elapsed for tbray3.erl is about 4900ms, far away from 800ms, why I say the file reading is the bound now?
The problem here is: since I suspect the performance of traversing binary byte by byte, I choose to convert binary to list to scan the world. Per my testing results, list is better than binary when is not too longer, in many cases, not longer than several KBytes. And, to make the code clear and readable, I also choose splitting big binary when read file in the meanwhile, so, I have to read file in pieces of no longer than n KBytes. For a very big file, the reading procedure is broken to several ten-thousands steps, which finally cause the whole file reading time elapsed is bit long. That's bad.
So, I decide to write another version, which will read file in parallel (Round III), and split each chunk on lastest new-line (Round II), scan the words using pattern match (Round IV), and yes, I'll use binary instead of list this time, try to solve the worse performance of binary-traverse by parallel, on multiple cores.
The result is interesting, it's the first time I achieved around 10 sec in my 2-core MacBook when use binary match only, and it's also the first time, on my dummy 4-CPU Intel Xeon CPU 2.80GHz debian box, I got better result (7.700 sec) than my MacBook.
>>> Updated Oct 15:
Steve run the code on his 8-core 2.33 GHz Intel Xeon Linux box, with the best time was 4.920 sec:
"the best time I saw for your newest version was 4.920 sec on my 8-core Linux box. Fast! However, user time was only 14.751 sec, so I’m not sure it’s using all the cores that well. Perhaps you’re getting down to where I/O is becoming a more significant factor."
Please see Steve's One More Erlang Wide Finder and his widefinder attempts.
========
Result on 2.0GHz 2-core MacBook:
$ time erl -smp -noshell -run tbray4_bin start o1000k.ap 20 -s erlang halt 8900 : 2006/09/29/Dynamic-IDE 2000 : 2006/07/28/Open-Data 1300 : 2003/07/25/NotGaming 800 : 2003/10/16/Debbie 800 : 2003/09/18/NXML 800 : 2006/01/31/Data-Protection 700 : 2003/06/23/SamsPie 600 : 2006/09/11/Making-Markup 600 : 2003/02/04/Construction 600 : 2005/11/03/Cars-and-Office-Suites Time: 10527.50 ms real 0m10.910s user 0m13.927s sys 0m6.413s
Result on 4-CPU Intel Xeon CPU 2.80GHz debian box,:
# When process number is set to 20: time erl -smp -noshell -run tbray4_bin start o1000k.ap 20 -s erlang halt real 0m7.700s user 0m25.750s sys 0m3.552s # When process number is set to 1: $ time erl -smp -noshell -run tbray4_bin start o1000k.ap 1 -s erlang halt real 0m22.035s user 0m21.525s sys 0m0.512s # On a 940M 5 million lines log file: time erl -smp -noshell -run tbray4_bin start o5000k.ap 100 -s erlang halt 44500 : 2006/09/29/Dynamic-IDE 10000 : 2006/07/28/Open-Data 6500 : 2003/07/25/NotGaming 4000 : 2003/10/16/Debbie 4000 : 2003/09/18/NXML 4000 : 2006/01/31/Data-Protection 3500 : 2003/06/23/SamsPie 3000 : 2006/09/11/Making-Markup 3000 : 2003/02/04/Construction 3000 : 2005/11/03/Cars-and-Office-Suites Time: 37512.76 ms real 0m37.669s user 2m6.296s sys 0m17.789s
On the 4-CPU linux box, comparing the elapsed time between ProcNum = 20 and ProcNum = 1, the elapsed time of parallelized one was only 35% of un-parallelized one, speedup about 185%. The ratio was almost the same as my pread_file.erl testing on the same machine.
It's actually a combination of code in my four previous blogs. Although the performance is not so good as tbray3.erl on my MacBook, but I'm happy that this version is a fully parallelized one, from reading file, scanning words etc. it should scale better than all my previous versions.
-module(tbray4). -compile([native]). -export([start/1, start/2]). -include_lib("kernel/include/file.hrl"). start([FileName, ProcNum]) when is_list(ProcNum) -> start(FileName, list_to_integer(ProcNum)). start(FileName, ProcNum) -> Start = now(), Main = self(), Counter = spawn(fun () -> count_loop(Main) end), Collector = spawn(fun () -> collect_loop(Counter) end), pread_file(FileName, ProcNum, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. pread_file(FileName, ProcNum, Collector) -> ChunkSize = get_chunk_size(FileName, ProcNum), pread_file_1(FileName, ChunkSize, ProcNum, Collector). pread_file_1(FileName, ChunkSize, ProcNum, Collector) -> [spawn(fun () -> Length = if I == ProcNum - 1 -> ChunkSize * 2; %% lastest chuck true -> ChunkSize end, {ok, File} = file:open(FileName, [raw, binary]), {ok, Bin} = file:pread(File, ChunkSize * I, Length), file:close(File), {Data, Tail} = split_on_last_newline(Bin), Collector ! {seq, I, Data, Tail} end) || I <- lists:seq(0, ProcNum - 1)], Collector ! {chunk_num, ProcNum}. collect_loop(Counter) -> collect_loop_1([], <<>>, -1, Counter). collect_loop_1(Chunks, PrevTail, LastSeq, Counter) -> receive {chunk_num, ChunkNum} -> Counter ! {chunk_num, ChunkNum}, collect_loop_1(Chunks, PrevTail, LastSeq, Counter); {seq, I, Data, Tail} -> SortedChunks = lists:keysort(1, [{I, Data, Tail} | Chunks]), {Chunks1, PrevTail1, LastSeq1} = process_chunks(SortedChunks, [], PrevTail, LastSeq, Counter), collect_loop_1(Chunks1, PrevTail1, LastSeq1, Counter) end. count_loop(Main) -> count_loop_1(Main, dict:new(), undefined, 0). count_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; count_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> count_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), count_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. process_chunks([], ChunkBuf, PrevTail, LastSeq, _) -> {ChunkBuf, PrevTail, LastSeq}; process_chunks([{I, Data, Tail}=Chunk|T], ChunkBuf, PrevTail, LastSeq, Counter) -> case LastSeq + 1 of I -> spawn(fun () -> Counter ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end), process_chunks(T, ChunkBuf, Tail, I, Counter); _ -> process_chunks(T, [Chunk | ChunkBuf], PrevTail, LastSeq, Counter) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. get_chunk_size(FileName, ProcNum) -> {ok, #file_info{size=Size}} = file:read_file_info(FileName), Size div ProcNum. split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)). split_on_last_newline_1(Bin, Offset) when Offset > 0 -> case Bin of <<Data:Offset/binary,$\n,Tail/binary>> -> {Data, Tail}; _ -> split_on_last_newline_1(Bin, Offset - 1) end; split_on_last_newline_1(Bin, _) -> {Bin, <<>>}. scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()). scan_chunk_1(Bin, Offset, Dict) when Offset =< size(Bin) - 34 -> case Bin of <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> -> case match_until_space_newline(Rest, 0) of {Rest1, <<>>} -> scan_chunk_1(Rest1, 0, Dict); {Rest1, Word} -> Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>, scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict)) end; _ -> scan_chunk_1(Bin, Offset + 1, Dict) end; scan_chunk_1(_, _, Dict) -> Dict. match_until_space_newline(Bin, Offset) when Offset < size(Bin) -> case Bin of <<Word:Offset/binary,$ ,Rest/binary>> -> {Rest, Word}; <<_:Offset/binary,$.,Rest/binary>> -> {Rest, <<>>}; <<_:Offset/binary,10,Rest/binary>> -> {Rest, <<>>}; _ -> match_until_space_newline(Bin, Offset + 1) end; match_until_space_newline(_, _) -> {<<>>, <<>>}.
>>> Updated Oct 16:
After testing my code on different machines, I found that disk/io performed varyingly, for some very large files, reading file in parallel may cause longer elapsed time (typically on non-server machine, which is not equipped for fast disk/io). So, I wrote another version: tbray4b.erl, in this version, only reading file is not parallalized, all other code is the same. Here's a result for this version on a 940M file with 5 million lines, with ProcNum set to 200 and 400)
# On 2-core MacBook: $ time erl -smp -noshell -run tbray4b start o5000k.ap 200 -s erlang halt real 0m50.498s user 0m49.746s sys 0m11.979s # On 4-cpu linux box: $ time erl -smp -noshell -run tbray4b start o5000k.ap 400 -s erlang halt real 1m2.136s user 1m59.907s sys 0m7.960s
The code: tbray4b.erl
-module(tbray4b). -compile([native]). -export([start/1, start/2]). -include_lib("kernel/include/file.hrl"). start([FileName, ProcNum]) when is_list(ProcNum) -> start(FileName, list_to_integer(ProcNum)). start(FileName, ProcNum) -> Start = now(), Main = self(), Counter = spawn(fun () -> count_loop(Main) end), Collector = spawn(fun () -> collect_loop(Counter) end), read_file(FileName, ProcNum, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. read_file(FileName, ProcNum, Collector) -> ChunkSize = get_chunk_size(FileName, ProcNum), {ok, File} = file:open(FileName, [raw, binary]), read_file_1(File, ChunkSize, 0, Collector). read_file_1(File, ChunkSize, I, Collector) -> case file:read(File, ChunkSize) of eof -> file:close(File), Collector ! {chunk_num, I}; {ok, Bin} -> spawn(fun () -> {Data, Tail} = split_on_last_newline(Bin), Collector ! {seq, I, Data, Tail} end), read_file_1(File, ChunkSize, I + 1, Collector) end. collect_loop(Counter) -> collect_loop_1([], <<>>, -1, Counter). collect_loop_1(Chunks, PrevTail, LastSeq, Counter) -> receive {chunk_num, ChunkNum} -> Counter ! {chunk_num, ChunkNum}, collect_loop_1(Chunks, PrevTail, LastSeq, Counter); {seq, I, Data, Tail} -> SortedChunks = lists:keysort(1, [{I, Data, Tail} | Chunks]), {Chunks1, PrevTail1, LastSeq1} = process_chunks(SortedChunks, [], PrevTail, LastSeq, Counter), collect_loop_1(Chunks1, PrevTail1, LastSeq1, Counter) end. count_loop(Main) -> count_loop_1(Main, dict:new(), undefined, 0). count_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; count_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> count_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), count_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. process_chunks([], ChunkBuf, PrevTail, LastSeq, _) -> {ChunkBuf, PrevTail, LastSeq}; process_chunks([{I, Data, Tail}=Chunk|T], ChunkBuf, PrevTail, LastSeq, Counter) -> case LastSeq + 1 of I -> spawn(fun () -> Counter ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end), process_chunks(T, ChunkBuf, Tail, I, Counter); _ -> process_chunks(T, [Chunk | ChunkBuf], PrevTail, LastSeq, Counter) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. get_chunk_size(FileName, ProcNum) -> {ok, #file_info{size=Size}} = file:read_file_info(FileName), Size div ProcNum. split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)). split_on_last_newline_1(Bin, Offset) when Offset > 0 -> case Bin of <<Data:Offset/binary,$\n,Tail/binary>> -> {Data, Tail}; _ -> split_on_last_newline_1(Bin, Offset - 1) end; split_on_last_newline_1(Bin, _) -> {Bin, <<>>}. scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()). scan_chunk_1(Bin, Offset, Dict) when Offset =< size(Bin) - 34 -> case Bin of <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> -> case match_until_space_newline(Rest, 0) of {Rest1, <<>>} -> scan_chunk_1(Rest1, 0, Dict); {Rest1, Word} -> Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>, scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict)) end; _ -> scan_chunk_1(Bin, Offset + 1, Dict) end; scan_chunk_1(_, _, Dict) -> Dict. match_until_space_newline(Bin, Offset) when Offset < size(Bin) -> case Bin of <<Word:Offset/binary,$ ,Rest/binary>> -> {Rest, Word}; <<_:Offset/binary,$.,Rest/binary>> -> {Rest, <<>>}; <<_:Offset/binary,10,Rest/binary>> -> {Rest, <<>>}; _ -> match_until_space_newline(Bin, Offset + 1) end; match_until_space_newline(_, _) -> {<<>>, <<>>}.
=======
The Erlang Way (Was Tim Bray's Erlang Exercise - Round IV)
Playing with Tim's Erlang Exercise is so much fun.
I've been coding in Erlang about 6 months as a newbie, in most cases, I do parsing on string (or list what ever) with no need of regular expressions, since Erlang's pattern match can usaully solve most problems straightforward.
Tim's log file is also a good example for applying pattern match in Erlang way. It's a continuous stream of dataset, after splitting it to line-bounded chunks for parallellization purpose, we can truely match whole {GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } directly on chunk with no need to split to lines any more.
This come out my third solution, which matchs whole
{GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) }
likeness using the pattern:
"GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest]
and then fetchs
[Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/] ++ match_until_space_newline(Rest, [])
as the matched key, with no need to split the chunk to lines.
But yes, we still need to split each chunk on the lastest newline to get parallelized result exactly accurate.
On my 2-core 2 GHz MacBook, the best time I’ve got is 4.483 sec
# smp enabled: $ erlc -smp tbray3.erl $ time erl -smp +P 60000 -noshell -run tbray3 start o1000k.ap -s erlang halt 8900 : <<"2006/09/29/Dynamic-IDE">> 2000 : <<"2006/07/28/Open-Data">> 1300 : <<"2003/07/25/NotGaming">> 800 : <<"2003/10/16/Debbie">> 800 : <<"2003/09/18/NXML">> 800 : <<"2006/01/31/Data-Protection">> 700 : <<"2003/06/23/SamsPie">> 600 : <<"2006/09/11/Making-Markup">> 600 : <<"2003/02/04/Construction">> 600 : <<"2005/11/03/Cars-and-Office-Suites">> Time: 4142.83 ms real 0m4.483s user 0m5.804s sys 0m0.615s # no-smp: $ erlc tbray3.erl $ time erl -noshell -run tbray_list_no_line start o1000k.ap -s erlang halt real 0m7.050s user 0m6.183s sys 0m0.644s
The smp enable result speedup about 57%
On the 2.80GHz 4-cpu xeon debian box that I mentioned before in previous blog, the best result is:
real 0m8.420s user 0m11.637s sys 0m0.452s
And I've noticed, adjusting the BUFFER_SIZE can balance the time consumered by parallelized parts and un-parallelized parts. That is, if the number of core is increased, we can also increase the BUFFER_SIZE a bit, so the number of chunks decreased (less un-parallelized split_on_last_new_line/1 and file:pread/3) but with more heavy work for parallelized binary_to_list/1 and scan_chunk/1 on longer list.
The best BUFFER_SIZE on my computer is 4096 * 5 bytes, which causes un-parallized split_on_last_newline/1 took about only 0.226s in the case.
The code:
-module(tbray3). -compile([native]). -export([start/1]). %% The best Bin Buffer Size is 4096 * 1 - 4096 * 5 -define(BUFFER_SIZE, (4096 * 5)). start(FileName) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), {ok, File} = file:open(FileName, [raw, binary]), read_file(File, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. read_file(File, Collector) -> read_file_1(File, [], 0, Collector). read_file_1(File, PrevTail, I, Collector) -> case file:read(File, ?BUFFER_SIZE) of eof -> Collector ! {chunk_num, I}, file:close(File); {ok, Bin} -> {Chunk, NextTail} = split_on_last_newline(PrevTail ++ binary_to_list(Bin)), spawn(fun () -> Collector ! {dict, scan_chunk(Chunk)} end), read_file_1(File, NextTail, I + 1, Collector) end. split_on_last_newline(List) -> split_on_last_newline_1(lists:reverse(List), []). split_on_last_newline_1(List, Tail) -> case List of [] -> {lists:reverse(List), []}; [$\n|Rest] -> {lists:reverse(Rest), Tail}; [C|Rest] -> split_on_last_newline_1(Rest, [C | Tail]) end. collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0). collect_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. scan_chunk(List) -> scan_chunk_1(List, dict:new()). scan_chunk_1(List, Dict) -> case List of [] -> Dict; "GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest] -> case match_until_space_newline(Rest, []) of {Rest1, []} -> scan_chunk_1(Rest1, Dict); {Rest1, Word} -> Key = list_to_binary([Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word]), scan_chunk_1(Rest1, dict:update_counter(Key, 1, Dict)) end; [_|Rest] -> scan_chunk_1(Rest, Dict) end. match_until_space_newline(List, Word) -> case List of [] -> {[], []}; [10|_] -> {List, []}; [$.|_] -> {List, []}; [$ |_] -> {List, lists:reverse(Word)}; [C|Rest] -> match_until_space_newline(Rest, [C | Word]) end.
I also wrote another corresponding binary version, which is 2-3 times slower than above list version on my machine, but the result may vary depending on your compiled Erlang/OTP on various operation system. I will test it again when Erlang/OTP R12B is released, which is claimed to have been optimized for binary match performance.
-module(tbray3_bin). -compile([native]). -export([start/1]). -define(BUFFER_SIZE, (4096 * 10000)). start(FileName) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), {ok, File} = file:open(FileName, [raw, binary]), read_file(File, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~p ms~n", [timer:now_diff(now(), Start) / 1000]) end. collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0). collect_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. read_file(File, Collector) -> read_file_1(File, <<>>, 0, Collector). read_file_1(File, PrevTail, I, Collector) -> case file:read(File, ?BUFFER_SIZE) of eof -> file:close(File), Collector ! {chunk_num, I}; {ok, Bin} -> {Data, NextTail} = split_on_last_newline(Bin), spawn(fun () -> Collector ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end), read_file_1(File, NextTail, I + 1, Collector) end. split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)). split_on_last_newline_1(Bin, Offset) when Offset > 0 -> case Bin of <<Data:Offset/binary,$\n,Tail/binary>> -> {Data, Tail}; _ -> split_on_last_newline_1(Bin, Offset - 1) end; split_on_last_newline_1(Bin, _) -> {Bin, <<>>}. scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()). scan_chunk_1(Bin, Offset, Dict) when Offset < size(Bin) - 34 -> case Bin of <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> -> case match_until_space_newline(Rest, 0) of {Rest1, <<>>} -> scan_chunk_1(Rest1, 0, Dict); {Rest1, Word} -> Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>, scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict)) end; _ -> scan_chunk_1(Bin, Offset + 1, Dict) end; scan_chunk_1(_, _, Dict) -> Dict. match_until_space_newline(Bin, Offset) when Offset < size(Bin) -> case Bin of <<Word:Offset/binary,$ ,Rest/binary>> -> {Rest, Word}; <<_:Offset/binary,$.,Rest/binary>> -> {Rest, <<>>}; <<_:Offset/binary,10,Rest/binary>> -> {Rest, <<>>}; _ -> match_until_space_newline(Bin, Offset + 1) end; match_until_space_newline(_, _) -> {<<>>, <<>>}.
Reading File in Parallel in Erlang (Was Tim Bray's Erlang Exercise - Round III)
My first solution for Tim's exercise tried to read file in parallel, but I just realized by reading file module's source code, that file:open(FileName, Options) will return a process instead of IO device. Well, this means a lot:
- It's a process, so, when you request more data on it, you actually send message to it. Since you only send 2 integer: the offset and length, sending message should be very fast. But then, this process (File) will wait for receiving data from disk/io. For one process, the receiving is sequential rather than parallelized.
- If we look the processes in Erlang as ActiveObjects, which send/receive messages/data in async, since the receiving is sequential in one process, requesting/waiting around one process(or, object) is almost safe for parallelized programming, you usaully do not need to worry about lock/unlock etc. (except the outside world).
- We can open a lot of File processes to read data in parallel, the bound is the disk/IO and the os' resources limit.
I wrote some code to test file reading in parallel, discardng the disk cache, on my 2-core MacBook, reading file with two processes can speedup near 200% to one process.
The code:
-module(file_pread). -compile([native]). -export([start/2]). -include_lib("kernel/include/file.hrl"). start(FileName, ProcNum) -> [start(FileName, ProcNum, Fun) || Fun <- [fun read_file/3, fun pread_file/3]]. start(FileName, ProcNum, Fun) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), Fun(FileName, ProcNum, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. collect_loop(Main) -> collect_loop_1(Main, undefined, 0). collect_loop_1(Main, ChunkNum, ChunkNum) -> Main ! stop; collect_loop_1(Main, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, ChunkNumX, ProcessedNum); {seq, _Seq} -> collect_loop_1(Main, ChunkNum, ProcessedNum + 1) end. get_chunk_size(FileName, ProcNum) -> {ok, #file_info{size=Size}} = file:read_file_info(FileName), Size div ProcNum. read_file(FileName, ProcNum, Collector) -> ChunkSize = get_chunk_size(FileName, ProcNum), {ok, File} = file:open(FileName, [raw, binary]), read_file_1(File, ChunkSize, 0, Collector). read_file_1(File, ChunkSize, I, Collector) -> case file:read(File, ChunkSize) of eof -> file:close(File), Collector ! {chunk_num, I}; {ok, _Bin} -> Collector ! {seq, I}, read_file_1(File, ChunkSize, I + 1, Collector) end. pread_file(FileName, ProcNum, Collector) -> ChunkSize = get_chunk_size(FileName, ProcNum), pread_file_1(FileName, ChunkSize, ProcNum, Collector). pread_file_1(FileName, ChunkSize, ProcNum, Collector) -> [spawn(fun () -> %% if it's the lastest chuck, read all bytes left, %% which will not exceed ChunkSize * 2 Length = if I == ProcNum - 1 -> ChunkSize * 2; true -> ChunkSize end, {ok, File} = file:open(FileName, [raw, binary]), {ok, _Bin} = file:pread(File, ChunkSize * I, Length), Collector ! {seq, I}, file:close(File) end) || I <- lists:seq(0, ProcNum - 1)], Collector ! {chunk_num, ProcNum}.
The pread_file/3 is parallelized, it always opens new File process for each reading process instead of sharing one opened File process during all reading processes. The read_file/3 is non-parallelized.
To evaulate: (run at least two-time for each test to average disk/IO caches.)
$ erlc -smp file_pread.erl $ erl -smp 1> file_pread:start("o100k.ap", 2). time: 691.72 ms time: 44.37 ms [ok,ok] 2> file_pread:start("o100k.ap", 2). time: 74.50 ms time: 43.59 ms [ok,ok] 3> file_pread:start("o1000k.ap", 2). time: 1717.68 ms time: 408.48 ms [ok,ok] 4> file_pread:start("o1000k.ap", 2). time: 766.00 ms time: 393.71 ms [ok,ok] 5>
Let's compare the results for each file (we pick the second testing result of each), the speedup:
- o100k.ap, 20M, 74.50 / 43.59 - 1= 70%
- o1000k.ap, 200M, 766.00 / 393.71 - 1 = 95%
On another 4-CPU debian machine, with 4 processes, the best result I got:
4> file_pread:start("o1000k.ap", 4). time: 768.59 ms time: 258.57 ms [ok, ok] 5>
The parallelized reading speedup 768.59 / 258.57 -1 = 197%
I've updated my first solution according to this testing, opening new File process for each reading process instead of sharing the same File process. Of cource, there are still issues that I pointed in Tim Bray's Erlang Exercise on Large Dataset Processing - Round II
Although the above result can also be achieved in other Languages, but I find that coding parallelization in Erlang is a pleasure.
Tim Bray's Erlang Exercise on Large Dataset Processing - Round II
Updated Oct 09: Added more benchmark results under linux on other machines.
Updated Oct 07: More concise code.
Updated Oct 06: Fixed bugs: 1. Match "GET /ongoing/When/" instead of "/ongoing/When/"; 2. split_on_last_newline should not reverse Tail.
Backed from a short vacation, and sit down in front of my computer, I'm thinking about Tim Bray's exercise again.
As I realized, the most expensive procedure is splitting dataset to lines. To get the multiple-core benefit, we should parallelize this procedure instead of reading file to binary or macthing process only.
In my previous solution, there are at least two issues:
- Since the file reading is fast in Erlang, then, parallelizing the file reading is not much helpful.
- The buffered_read actually can be merged with the buffered file reading.
And, Per's solution parallelizes process_match procedure only, based on a really fast divide_to_lines, but with hacked binary matching syntax.
After a couple of hours working, I finially get the second version of tbray.erl (with some code from Per's solution).
- Read file to small pieces of binary (about 4096 bytes each chunk), then convert to list.
- Merge the previous tail for each chunk, search this chunk from tail, find the last new line mark, split this chunk to line-bounded data part, and tail part for next chunk.
- The above steps are difficult to parallelize. If we try, there will be about 30 more LOC, and not so readable.
- Spawn a new process at once to split line-bounded chunk to lines, process match and update dict.
- Thus we can go on reading file with non-stop.
- A collect_loop will receive dicts from each process, and merge them.
What I like of this version is, it scales on mutiple-core almost linearly! On my 2.0G 2-core MacBook, it took about 13.522 seconds with non-smp, 7.624 seconds with smp enabled (for a 200M data file, with about 50,000 processes spawned). The 2-core smp result achieves about 77% faster than non-smp result. I'm not sure how will it achieve on an 8-core computer, but we'll finally reach the limit due to the un-parallelized procedures.
The Erlang time results:
$ erlc tbray.erl $ time erl -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null real 0m13.522s user 0m12.265s sys 0m1.199s $ erlc -smp tbray.erl $ time erl -smp +P 60000 -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null real 0m7.624s user 0m13.302s sys 0m1.602s # For 5 million lines, 958.4M size: $ time erl -smp +P 300000 -noshell -run tbray start o5000k.ap -s erlang halt > /dev/null real 0m37.085s user 1m5.605s sys 0m7.554s
And the original Tim's Ruby version:
$ time ruby tbray.rb o1000k.ap > /dev/null real 0m2.447s user 0m2.123s sys 0m0.306s # For 5 million lines, 958.4M size: $ time ruby tbray.rb o5000k.ap > /dev/null real 0m12.115s user 0m10.494s sys 0m1.473s
Erlang time result on 2-core 1.86GHz CPU RedHat linux box, with kernel:
Linux version 2.6.18-1.2798.fc6 (brewbuilder@hs20-bc2-4.build.redhat.com) (gcc v
ersion 4.1.1 20061011 (Red Hat 4.1.1-30)) #1 SMP Mon Oct 16 14:37:32 EDT 2006
is 7.7 seconds.
Erlang time result on 2.80GHz 4-cpu xeon debian box, with kernel:
Linux version 2.6.15.4-big-smp-tidy (root@test) (gcc version 4.0.3 20060128 (prerelease) (Debian 4.0
.2-8)) #1 SMP Sat Feb 25 21:24:23 CST 2006
The smp result on this 4-cpu computer is questionable. It speededup only 50% than non-smp, even worse than my 2.0GHz 2-core MacBook. I also tested the Big Bang on this machine, it speedup less than 50% too.
$ erlc tbray.erl $ time erl -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null real 0m22.279s user 0m21.597s sys 0m0.676s $ erlc -smp tbray.erl $ time erl -smp +S 4 +P 60000 -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null real 0m14.765s user 0m28.722s sys 0m0.840s
Notice:
- All tests run several times to have the better result expressed, so, the status of disk/io cache should be near.
- You may need to compile tbray.erl to two different BEAMs, one for smp version, and one for no-smp version.
- If you'd like to process bigger file, you can use +P processNum to get more simultaneously alive Erlang processes. For BUFFER_SIZE=4096, you can set +P arg as FileSize / 4096, or above. From Erlang's Efficiency Guide:
Processes
The maximum number of simultaneously alive Erlang processes is by default 32768. This limit can be raised up to at most 268435456 processes at startup (see documentation of the system flag +P in the erl(1) documentation). The maximum limit of 268435456 processes will at least on a 32-bit architecture be impossible to reach due to memory
To evaluate with smp enable: (Erlang/OTP R11B-5 for Windows may not support smp yet)
erl -smp +P 60000 > tbray:start("o1000k.ap").
The code: (pretty formatted by ErlyBird 0.15.1)
-module(tbray_blog). -compile([native]). -export([start/1]). %% The best Bin Buffer Size is 4096 -define(BUFFER_SIZE, 4096). start(FileName) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), {ok, File} = file:open(FileName, [raw, binary]), read_file(File, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. read_file(File, Collector) -> read_file_1(File, [], 0, Collector). read_file_1(File, PrevTail, I, Collector) -> case file:read(File, ?BUFFER_SIZE) of eof -> Collector ! {chunk_num, I}, file:close(File); {ok, Bin} -> {Data, NextTail} = split_on_last_newline(PrevTail ++ binary_to_list(Bin)), spawn(fun () -> Collector ! {dict, scan_lines(Data)} end), read_file_1(File, NextTail, I + 1, Collector) end. split_on_last_newline(List) -> split_on_last_newline_1(lists:reverse(List), []). split_on_last_newline_1(List, Tail) -> case List of [] -> {lists:reverse(List), []}; [$\n|Rest] -> {lists:reverse(Rest), Tail}; [C|Rest] -> split_on_last_newline_1(Rest, [C | Tail]) end. collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0). collect_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~p\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. scan_lines(List) -> scan_lines_1(List, [], dict:new()). scan_lines_1(List, Line, Dict) -> case List of [] -> match_and_update_dict(lists:reverse(Line), Dict); [$\n|Rest] -> scan_lines_1(Rest, [], match_and_update_dict(lists:reverse(Line), Dict)); [C|Rest] -> scan_lines_1(Rest, [C | Line], Dict) end. match_and_update_dict(Line, Dict) -> case process_match(Line) of false -> Dict; {true, Word} -> dict:update_counter(Word, 1, Dict) end. process_match(Line) -> case Line of [] -> false; "GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest] -> case match_until_space(Rest, []) of [] -> false; Word -> {true, [Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/] ++ Word} end; [_|Rest] -> process_match(Rest) end. match_until_space(List, Word) -> case List of [] -> []; [$.|_] -> []; [$ |_] -> lists:reverse(Word); [C|Rest] -> match_until_space(Rest, [C | Word]) end.
Lessons learnt:
- Split large binary to proper size chunks, then convert to list for further processing
- Parallelize the most expensive part (of course)
- We need a new or more complete Efficent Erlang
Tim Bray's Erlang Exercise on Large Dataset Processing
Updated Oct 10: pread_file/5 should open a new FIle process each cycle
Updated Oct 05: Wrote a new version, which is more parallelized-likeness.
Updated Sep 27: Per Gustafsson gave a better solution, it took 5.155 seconds on my computer, with all record maching tasks done. And, get all lines ready without record matching only took 1.58 seconds.
Updated Sep 27: The best result on my computer is 5.188 seconds, after added -compile([native])., with: tbray:start(20, "o1000k.ap").
Tim's Wide Finder Project tried Erlang on large log file (around 200M):
Where, the Erlang code took more than 30 seconds to fetch the matched records.
I'm with a lot of interesting on Erlang's efficiency to process large dataset, so I tried several testing, and got some conclutions.
First, the file io itself in Erlang is reasonable fast, to read a 200M file into memory, using file:read_file/1 took less than 800ms on my computer.
But, to process the dataset, you can not avoid to travel the whole dataset, to find newline marks, to match the express etc.
I wrote a piece of code to travel the whole dataset in binary form, as simple as just count it byte by byte. Well, it took about 35s for a 200M file, seems travel a binary byte by byte is the major cost.
Thomas gave some hints on how to travel a binary efficiently, here.
And Bjorn from OTP term gave some hints too.
Yes, the most efficient data struct in Erlang to travel dataset byte by byte is List.
Let's take some testing:
travel_bin(Bin) -> travel_bin(Bin, 0). travel_bin(<<>>, ByteCount) -> ByteCount; travel_bin(<<$\n, Rest/binary>>, ByteCount) -> travel_bin(Rest, ByteCount + 1); travel_bin(<<_C, Rest/binary>>, ByteCount) -> travel_bin(Rest, ByteCount + 1). travel_list(List) -> travel_list(List, 0). travel_list([], CharCount) -> CharCount; travel_list([$\n|Rest], CharCount) -> travel_list(Rest, CharCount + 1); travel_list([_C|Rest], CharCount) -> travel_list(Rest, CharCount + 1).
When apply to a 20M file, we got:
> {ok, Bin} = file:read_file("o100k.ap"). {ok,<<"host-24-225-218-245.patmedia.net - - [01/Oct/2006:06:33:45 -0700] \"GET /ongoing/ongoing.atom HTTP/1.1\" 304 -"...>>} > timer:tc(tbray, travel_bin, [Bin]). {2787402,20099550} > timer:tc(tbray, travel_list, [binary_to_list(Bin)]). {370906,20099550}
(Updated Oct 7: The statements about travel_list below are not quite accurate, the test of travel_list actually did not include the time taken by binary_to_list. The story of "Binary vs List in Erlang" is bit complex, the results vary depending on data size a lot. I'll post another article talking about it)
Where, travel_bin took about 2787.402ms, and travel_list took about 370.906ms (including the time costed to apply binary_to_list).
Pretty good result for travel_list, which was about 13% time costed comparing to travel_bin.
But, List is memory eater than Binary. Yes, when you try to apply above code to a file with 200M size, scene changes a lot:
> f(Bin). ok > {ok, Bin} = file:read_file("o1000k.ap"). {ok,<<"host-24-225-218-245.patmedia.net - - [01/Oct/2006:06:33:45 -0700] \"GET /ongoing/ongoing.atom HTTP/1.1\" 304 -"...>>} > timer:tc(tbray, travel_bin, [Bin]). {35414374,200995500} > timer:tc(tbray, travel_list, [binary_to_list(Bin)]). beam.smp(25965,0x1806400) malloc: *** vm_allocate(size=1782579200) failed (error code=3) ...
Where, size of o1000k.ap is about 200M. travel_bin took 35s, travel_list crashed.
How about split large binary to pieces, then convert them to lists, and travel them?
I tried, and, it's a bit trick. The trick is the buffer size. At first, I split binary to pieces of 1024 * 1024 size, the performance was even worse. I almost dropped. But, I tried more, when I adjusted the buffer size to 4096, this solution shines.
And finally, with a parallel file reader, I got an answer to Tim's exercise, plus a simple express matching, for an 1 million lines file (200M size), is 8.311 seconds when -smp enable, and 10.206 seconds when smp disabled.
My computer is a 2.0G 2-core MacBook, I'd like a see a result on more-core machine :-)
The code:
-module(tbray). -compile([native]). -export([start/2, collect_loop/2, buffered_read/3]). -include_lib("kernel/include/file.hrl"). %% The best BUFFER_SIZE is 4096 -define(BUFFER_SIZE, 4096). -record(context, {lineBuf = [], matched = 0, total = 0, lastProcessedSeq = 0, dataBuf = [], processNum}). %% erl -smp enable +P 60000 %% timer:tc(wide, start, [1000, "o1000k.ap"]). start(ProcessNum, FileName) -> statistics(wall_clock), {ok, FileInfo} = file:read_file_info(FileName), Size = FileInfo#file_info.size, Collect = spawn(?MODULE, collect_loop, [self(), #context{processNum = ProcessNum}]), psplit_read_file(Collect, FileName, Size div ProcessNum, ProcessNum, 1), {Matched, Total} = receive #context{matched=MatchedX, total=TotalX} -> {MatchedX, TotalX} end, {_, Duration2} = statistics(wall_clock), io:format("scan lines:\t ~pms~nMatched: ~B, Total: ~B~n", [Duration2, Matched, Total]). psplit_read_file(_Collector, _FileName, _ChunkSize, ProcessNum, I) when I > ProcessNum -> done; psplit_read_file(Collector, FileName, ChunkSize, ProcessNum, I) -> spawn( fun () -> Offset = ChunkSize * (I - 1), %% if it's last chuck, read all bytes left, which will not exceed ChunkSize * 2 Length = if I == ProcessNum -> ChunkSize * 2; true -> ChunkSize end, {ok, File} = file:open(FileName, [read, binary]), {ok, Data} = file:pread(File, Offset, Length), Collector ! {I, Data} end), psplit_read_file(Collector, FileName, ChunkSize, ProcessNum, I + 1). collect_loop(Pid, #context{lastProcessedSeq= ProcessNum, processNum=ProcessNum}=Context) -> Pid ! Context; collect_loop(Pid, #context{dataBuf=DataBuf}=Context) -> receive {Seq, Data} -> SortedDatas = lists:keysort(1, [{Seq, Data} | DataBuf]), Context1 = process_arrived_datas(SortedDatas, Context#context{dataBuf = []}), %io:format("Last processed Seq: ~B~n", [Context1#context.lastProcessedSeq]), collect_loop(Pid, Context1) end. process_arrived_datas([], Context) -> Context; process_arrived_datas([{Seq, Data}|T], #context{lineBuf=LineBuf, matched=Matched, total=Total, lastProcessedSeq=LastProcessedSeq, dataBuf=DataBuf}=Context) -> if Seq == LastProcessedSeq + 1 -> {LineBuf1, Matched1, Total1} = buffered_read( fun (Buffer, {LineBufX, MatchedX, TotalX}) -> scan_line(binary_to_list(Buffer), LineBufX, MatchedX, TotalX) end, {LineBuf, Matched, Total}, Data), process_arrived_datas(T, Context#context{lineBuf = LineBuf1, matched = Matched1, total = Total1, lastProcessedSeq = Seq}); true -> process_arrived_datas(T, Context#context{dataBuf = [{Seq, Data} | DataBuf]}) end. buffered_read(Fun, Acc, Bin) -> case Bin of <<Buf:?BUFFER_SIZE/binary, Rest/binary>> -> Acc1 = Fun(Buf, Acc), buffered_read(Fun, Acc1, Rest); _ -> Fun(Bin, Acc) end. scan_line([], LineBuf, Matched, Total) -> {LineBuf, Matched, Total}; scan_line([$\n|Rest], LineBuf, Matched, Total) -> Line1 = lists:reverse(LineBuf), %io:format("~n~s~n", [Line1]), Matched1 = Matched + process_match(Line1), scan_line(Rest, [], Matched1, Total + 1); scan_line([C|Rest], LineBuf, Matched, Total) -> scan_line(Rest, [C | LineBuf], Matched, Total). process_match([]) -> 0; process_match("GET /ongoing/When/"++Rest) -> case match_until_space(Rest, false) of true -> 0; false -> 1 end; process_match([_H|Rest]) -> process_match(Rest). match_until_space([$\040|_Rest], Bool) -> Bool; match_until_space([$.|_Rest], _Bool) -> true; match_until_space([_H|Rest], Bool) -> match_until_space(Rest, Bool).
Some hints:
The solution spawns a lot of processes to read the file to binary in parallel. Then send them to a collect_loop, collect_loop will buffered_read each chunk (when chunks order is correct), buffered_read then converts each binary to small (4096 bytes here) lists, the scan_line will merge them to lines, and process_match on line.
As I mentioned before, handle a short string line in Erlang is fast, so I do not fork process_match to processes.
The code can handle very large files.
The matching code may not be correct, and does not finish all tasks that Tim wants.
From Rails to Erlyweb - Part II Manage Project - Reloaded
The migrating from Rails to Erlyweb of our project is going to be finished. I got more experience on how to deal with Erlyweb. First, the project management can be more straightforward. Here is it:
2. Manage project - Reloaded
Erlyweb provides erlyweb:compile(App, ..) to compile the source files under app directory. To start an app, you usually should erlydb:start(mysql, ....) and compile app files first. To make life easy, you can put some scripting like code under myproject\script directory. Here's my project source tree:
myproject + apps | + myapp | + ebin | + include | + nbproject | + src | + components | + lib | + services | + test | + www + config | * yaws.conf | * erlyweb.conf + script + ebin + src * erlyweb_app.erl
Where, config/yaws.conf contains the yaws' configuration. Here's mine:
ebin_dir = D:/myapp/trunk/script/ebin <server localhost> port = 8000 listen = 0.0.0.0 docroot = D:/myapp/trunk/apps/myapp/www appmods = </myapp, erlyweb> start_mod = erlyweb_app <opaque> appname = myapp environment = development </opaque> </server>
You may have noticed, all beams under D:/myapp/trunk/script/ebin will be auto-loaded when yaws starts up. And you can prepare another yaws.conf for test or production environment by change the environment var in opaque
Now the config/erlyweb.conf:
{pa, ["script/ebin", "apps/myapp/ebin", "vendor/erlyweb/ebin", "vendor/eunit/ebin"]}. {i, ["vendor", "apps/myapp/include", "/usr/local/lib/yaws"]}. {production, [{dbdriver, mysql}, {database, "mydb_production"}, {hostname, "localhost"}, {username, "mememe"}, {password, "pwpwpw"}]}. {development, [{dbdriver, mysql}, {database, "mydb_development"}, {hostname, "localhost"}, {username, "mememe"}, {password, "pwpwpw"}]}. {test, [{dbdriver, mysql}, {database, "mydb_test"}, {hostname, "localhost"}, {username, "mememe"}, {password, "pwpwpw"}]}.
erlyweb_app.erl is the boot scripting code, which will be used to start db connection and compile the code. Currently I run these scripts manually. I'll talk later.
Notice: erlyweb 0.6.2 needed, which contains Haoboy's logfun patch.
%% @doc Main entrance to the entire erlyweb application. -module(erlyweb_app). -export([start/1]). -export([get_conf/1, build/1, build_test/1, build_product/1, environment/1, decompile/2, db_log/4, db_dummy_log/4 ]). -include("yaws/include/yaws.hrl"). -include("yaws/include/yaws_api.hrl"). db_log(Module, Line, Level, FormatFun) -> mysql:log(Module, Line, Level, FormatFun). db_dummy_log(_Mod, _Line, _Level, _FormatFun) -> empty. %% @doc call back function when yaws start an app %% @see man yaws.conf %% start_mod = Module %% Defines a user provided callback module. At startup of the %% server, Module:start/1 will be called. The #sconf{} record %% (defined in yaws.hrl) will be used as the input argument. This %% makes it possible for a user application to syncronize the %% startup with the yaws server as well as getting hold of user %% specific configuration data, see the explanation for the %%context. start(SConf) -> Opaque = SConf#sconf.opaque, AppName = proplists:get_value("appname", Opaque), Environment = list_to_atom(proplists:get_value("environment", Opaque)), {_I, Pa, Pz, Dbdriver, Database, Hostname, Username, Password} = get_conf(Environment), {ok, Cwd} = file:get_cwd(), error_logger:info_msg("CWD: ~s~n", [Cwd]), add_code_path(Pa, Pz), LogFun = case Environment of undefined -> fun erlyweb_app:db_log/4; production -> fun erlyweb_app:db_dummy_log/4; development -> %code:add_pathz("../apps/ewp/src/test"), fun erlyweb_app:db_log/4; test -> fun erlyweb_app:db_log/4 end, error_logger:info_msg("Starting app <~s> as <~s> using database <~s>~n", [AppName, Environment, Database]), start_db(Dbdriver, Database, Hostname, Username, Password, LogFun). add_code_path(Pa, Pz) -> AddedPa = [{Dir, code:add_patha(Dir)} || Dir <- Pa], AddedPz = [{Dir, code:add_pathz(Dir)} || Dir <- Pz], error_logger:info_msg("Add code patha: ~p~n", [AddedPa]), error_logger:info_msg("Add code pathz: ~p~n", [AddedPz]). get_conf(Environment) when is_list(Environment) -> get_conf(list_to_atom(Environment)); get_conf(Environment) when is_atom(Environment) -> {ok, Confs} = file:consult("config/erlyweb.conf"), I = case proplists:get_value(i, Confs) of undefined -> []; IX -> IX end, Pa = case proplists:get_value(pa, Confs) of undefined -> []; PaX -> PaX end, Pz = case proplists:get_value(pz, Confs) of undefined -> []; PzX -> PzX end, EnvConfs = proplists:get_value(Environment, Confs), Dbdriver = proplists:get_value(dbdriver, EnvConfs), Database = proplists:get_value(database, EnvConfs), Hostname = proplists:get_value(hostname, EnvConfs), Username = proplists:get_value(username, EnvConfs), Password = proplists:get_value(password, EnvConfs), {I, Pa, Pz, Dbdriver, Database, Hostname, Username, Password}. start_db(Dbdriver, Database, Hostname, Username, Password, LogFun) -> erlydb:start(Dbdriver, [{database, Database}, {hostname, Hostname}, {username, Username}, {password, Password}, {logfun, LogFun}]). %% This is developer's entrance to the module. build(AppName) -> io:format("Building development version of ~s.~n", [AppName]), build(AppName, [debug_info], development). build_test(AppName) -> io:format("Building test version of ~s.~n", [AppName]), build(AppName, [debug_info], test). build_product(AppName) -> io:format("Building product version of ~s.~n", [AppName]), build(AppName, [no_debug_info], production). build(AppName, Options, Environment) when is_atom(AppName) -> build(atom_to_list(AppName), Options, Environment); build(AppName, Options, Environment) when is_list(AppName) -> {I, Pa, Pz, Dbdriver, Database, Hostname, Username, Password} = get_conf(Environment), add_code_path(Pa, Pz), start_db(Dbdriver, Database, Hostname, Username, Password, fun erlyweb_app:db_log/4), compile(AppName, Options ++ [{auto_compile, false}], I, Dbdriver). compile(AppName, Options, I, Dbdriver) -> erlyweb:compile("./apps/" ++ AppName, lists:foldl( fun(Dir, Acc) -> [{i, filename:absname(Dir)} | Acc] end, [], I) ++ [{erlydb_driver, Dbdriver}] ++ Options). decompile(AppName, Beam) when is_list(AppName) -> decompile(list_to_atom(AppName), Beam); decompile(AppName, Beam) when is_atom(AppName) -> {BinFilename, SrcFilename} = case AppName of erlyweb -> {"./vendor/erlyweb/ebin/" ++ atom_to_list(Beam), "./erlyweb_" ++ atom_to_list(Beam)}; _ -> {"./apps/" ++ atom_to_list(AppName) ++ "/ebin/" ++ atom_to_list(Beam), "./apps/" ++ atom_to_list(AppName) ++ "_" ++ atom_to_list(Beam)} end, decompile_beam(BinFilename, SrcFilename). decompile_beam(BinFilename, SrcFilename) -> io:format("Beam file: ~s~n", [BinFilename]), io:format("Source file: ~s~n", [SrcFilename++".erl"]), {ok, {_, [{abstract_code, {_, AC}}]}} = beam_lib:chunks(BinFilename, [abstract_code]), %% do not with ".erl" ext?, otherwise will be compiled by erlyweb {ok, S} = file:open(SrcFilename ++ ".erl", write), io:fwrite(S, "~s~n", [erl_prettypr:format(erl_syntax:form_list(AC))]).
To build it,
> erlc -I /opt/local/lib/yaws/include erlyweb_app.erl -o ebin
The erlyweb_app.erl is almost escript ready, but I use it as module functions currently. It's pre-compiled and erlyweb_app.beam is placed under script/ebin
So, I start myapp by steps:
cd \myproject yaws -sname myapp -i --conf config/yaws.conf --erlang "-smp auto" 1> erlyweb_app:build(myapp).
The erlyweb_app.erl is almost escript ready, but I use it as module functions currently. It's pre-compiled and erlyweb_app.beam is placed under script/ebin
After I made changes to myapp, I run above erlyweb_app:build(myapp). again, then everything is up to date.
And if you'd like to build it from another erl shell, try this:
erl -sname erlybird (erlybird@myhost)1> rpc:call(myapp@myhost, erlyweb_app, build, [myapp])
Yes, next version of ErlyBird will support building erlyweb apps remotely in ErlyBird's Erlang shell.
recbird - An Erlang Dynamic Record Inferring Parse Transform
You should have read Yariv's recless blog, a nice blog talking about how to make record accessing simple.
Recless is a static type inferring record parse transform, that means, as described in Yariv's blog:
one main restriction in Recless’s type inference algorithm: function parameters must indicate their record types for type inference to work on them. For instance, this won’t work:get_name(Person) -> Person.name.Instead, you must write this:get_name(Person = #person{}) -> Person.name.
How about a dynamic record inferring solution? I got some idea that was also inspired from my friend Haobo. As I'm familiar with Erlang AST tree when developing ErlyBird, I took a try and got some good result. I named it recbird.
The magic behind recbird is, it parses the Erlang AST tree, and adds some setter/getter functions for each record's field. Then, at runtime, it will detect the first element of record var, and thus knows which setter/getter function should be redirected, and call it.
It just works now, with none limits, you can write R.a.b.c and R.a.b.c = Sth almost every where.
Notice: There may be some bugs.
The perfomance is also reasonable, for example, when you do R.a.b = 'yes' 1,000,000 times, the original Erlang record syntax takes about 300ms in my machine, the recbird is about 310ms. When you run it 10,000,000 times, the recbird is about 150% more time costed than original Erlang record accessing.
The recbird's source code can be got at: recbird.erl
To use it, compile it, include compiled recbird.beam under your code path, add
-compile({parse_transform, recbird}).
in your source file.
A Simple POET State Machine Accepting SAX Events to Build Plain Old Erlang Term
Per previous blogs:
- A Simple XML State Machine Accepting SAX Events to Build xmerl Compitable XML Tree: icalendar demo
- Parse JSON to xmerl Compitable XML Tree via A Simple XML State Machine
I wrote a simple xml state machine that receives SAX events to build xmerl compitable XML tree.
This time, it's a simple POET (Plain Old Erlang Term) state machine, which receives SAX events to build the data in form of List and Tuple.
%%% A state machine which receives sax events and builds a Plain Old Erlang Term -module(poet_sm). -export([state/2]). -export([test/0 ]). -record(poetsmState, { qname = undefined, attributes = [], content = [], parents = [] }). receive_events(Events) -> receive_events(Events, undefined). receive_events([], _States) -> {ok, [], []}; receive_events([Event|T], States) -> case state(Event, States) of {ok, TopObject} -> {ok, TopObject, T}; {error, Reason} -> {error, Reason}; States1 -> receive_events(T, States1) end. state({startDocument}, _StateStack) -> State = #poetsmState{}, [State]; state({endDocument}, StateStack) -> %io:fwrite(user, "endDocument, states: ~p~n", [StateStack]), case StateStack of {ok, TopObject} -> {ok, TopObject}; _ -> {error, io:fwrite( user, "Bad object match, StateStack is: ~n~p~n", [StateStack])} end; state({startElement, _Uri, _LocalName, QName, Attrs}, StateStack) -> %io:fwrite(user, "startElement~n", []), %% pop current State [State|_StatesPrev] = StateStack, #poetsmState{parents=Parents} = State, {_Pos, Attributes1} = lists:foldl( fun ({Key, Value}, {Pos, AccAttrs}) -> Pos1 = Pos + 1, Attr = {atom_to_list(Key), to_poet_value(Value)}, %parents = [{LocalName, Pos1}|Parents]}, {Pos1, [Attr|AccAttrs]} end, {0, []}, Attrs), Parents1 = [{QName, 0}|Parents], %% push new state of Attributes, Content and Parents to StateStack NewState = #poetsmState{qname = QName, attributes = Attributes1, content = [], parents = Parents1}, [NewState|StateStack]; state({endElement, _Uri, _LocalName, QName}, StateStack) -> %% pop current State [State|StatesPrev] = StateStack, #poetsmState{qname=ElemName, attributes=Attributes, content=Content, parents=Parents} = State, %io:fwrite(user, "Element end with Name: ~p~n", [Name]), if QName == undefined -> %% don't care undefined; QName /= ElemName -> throw(lists:flatten(io_lib:format( "Element name match error: ~p should be ~p~n", [QName, ElemName]))); true -> undefined end, %% composite a new object [_|_ParentsPrev] = Parents, Object = if Attributes == [] -> {QName, lists:reverse(Content)}; true -> {QName, lists:reverse(Attributes), lists:reverse(Content)} %parents = ParentsPrev end, %io:fwrite(user, "object: ~p~n", [Object]), %% put Object to parent's content and return new state stack case StatesPrev of [_ParentState|[]] -> %% reached the top now, return final result {ok, Object}; [ParentState|Other] -> #poetsmState{content=ParentContent} = ParentState, ParentContent1 = [Object|ParentContent], %% update parent state and backward to it: ParentState1 = ParentState#poetsmState{content = ParentContent1}, %io:fwrite(user, "endElement, state: ~p~n", [State1]), [ParentState1|Other] end; state({characters, Characters}, StateStack) -> %% pop current State [State|StatesPrev] = StateStack, #poetsmState{qname=_, content=Content, parents=Parents} = State, [{Parent, Pos}|ParentsPrev] = Parents, Pos1 = Pos + 1, Value = to_poet_value(Characters), %parents = [{Parent, Pos1}|ParentsPrev]}, Content1 = [Value|Content], Parents1 = [{Parent, Pos1}|ParentsPrev], UpdatedState = State#poetsmState{content = Content1, parents = Parents1}, [UpdatedState|StatesPrev]. to_poet_value(Name) when is_atom(Name) -> to_poet_value(atom_to_list(Name)); to_poet_value(Chars) when is_list(Chars) -> %% it's string, should convert to binary, since list in poet means array list_to_binary(Chars); to_poet_value(Value) -> Value. test() -> Events = [ {startDocument}, {startElement, "", feed, feed, [{link, "http://lightpole.net"}, {author, "Caoyuan"}]}, {characters, "feed text"}, {startElement, "", entry, entry, [{tag, "Erlang, Function"}]}, {characters, "Entry1's text"}, {endElement, "", entry, entry}, {startElement, "", entry, entry, []}, {characters, "Entry2's text"}, {endElement, "", entry, entry}, {endElement, "", feed, feed}, {endDocument} ], %% Streaming: {ok, Poet1, _Rest} = receive_events(Events), io:fwrite(user, "Streaming Result: ~n~p~n", [Poet1]), {feed,[{"link",<<"http://lightpole.net">>},{"author",<<"Caoyuan">>}], [<<"feed text">>, {entry,[{"tag",<<"Erlang, Function">>}],[<<"Entry1's text">>]}, {entry,[<<"Entry2's text">>]}]} = Poet1.
The result will be something like:
{feed,[{"link",<<"http://lightpole.net">>},{"author",<<"Caoyuan">>}], [<<"feed text">>, {entry,[{"tag",<<"Erlang, Function">>}],[<<"Entry1's text">>]}, {entry,[<<"Entry2's text">>]}]}
The previous iCal and JSON examples can be parsed to POET by modifing the front-end parser a bit.
ErlyBird 0.12.0 released - Erlang IDE based on NetBeans
I'm pleased to announce ErlyBird 0.12.0, an Erlang IDE based on NetBeans.
This is a bug-fix, performance improvement release. This release will only provide all-in-one IDE package, which is in size of 15.9M.
Java JRE 5.0+ is requested.
To download, please go to: http://sourceforge.net/project/showfiles.php?group_id=192439
To install:
- Unzip erlybird-bin-0.12.0-ide.zip to somewhere. For Windows user, execute 'bin/erlybird.exe'. For *nix user, 'bin/erlybird'.
- Check/set your OTP path. From [Tools]->[Options], click on 'Miscellanous', then expand 'Erlang Installation', fill in the full path of your 'erl.exe' or 'erl' file. For instance: "C:/erl/bin/erl.exe"
- The default -Xmx option for jvm is set to 256M, if you want to increase it, please open the config file that is located at etc/erlybird.conf, set -J-Xmx of 'default_options'.
When you run ErlyBird first time, the OTP libs will be indexed. The indexing time varies from 30 to 60 minutes deponding on your computer.
Notice:
If you have previous version of ErlyBird installed, please delete the old cache files which are located at:
- *nix: "${HOME}/.erlybird/dev"
- mac os x: "${HOME}/Library/Application Support/erlybird/dev"
- windows: "C:\Documents and Settings\yourusername\.erlybird\dev" or some where
The status of ErlyBird is still Alpha, feedback and bug reports are welcome.
CHANGELOG:
- Performance improvement, especially source code rendering performance.
- Highlighting for unbound/unused variables.
- Completion for macros and records.
- Go to source files of -include and -include_lib.
- Erlang shell window in Mac OS X should work now.
- Various bug fixes.
Parse JSON to xmerl Compitable XML Tree via A Simple XML State Machine
Updated Aug 16: Fix bugs when json is an array. Add a 'json:root' element always since valid xml should have a root. Remove 'obj' tag that is not necessary.
Updated Aug 15: A more complete json_parser.erl. Thanks for tonyg's beautiful work, fixed some bugs.
Updated Aug 5: rewrote json_parser.erl base on tonyg's RFC4627 implementation, fixed some bugs.
In my previous blog: A Simple XML State Machine Accepting SAX Events to Build xmerl Compitable XML Tree: icalendar demo, I wrote a simple state machine to parse icalendar to xmerl compitable XML tree. This time, I'll use this state machine to parse a JSON expression to xmerl compitable XML tree, the work is fairly simple:
%%--------------------------------------------------------------------------- %% Copyright (c) 2007 Tony Garnock-Jones%% Copyright (c) 2007 LShift Ltd. %% Copyright (c) 2007 LightPole, Inc. %% %% Permission is hereby granted, free of charge, to any person %% obtaining a copy of this software and associated documentation %% files (the "Software"), to deal in the Software without %% restriction, including without limitation the rights to use, copy, %% modify, merge, publish, distribute, sublicense, and/or sell copies %% of the Software, and to permit persons to whom the Software is %% furnished to do so, subject to the following conditions: %% %% The above copyright notice and this permission notice shall be %% included in all copies or substantial portions of the Software. %% %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, %% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF %% MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND %% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS %% BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN %% ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN %% CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %% SOFTWARE. %%--------------------------------------------------------------------------- %% -module(json_parser). -define(stateMachine, fun xml_sm:state/2). -define(JsonNSUri, "http://www.lightpole.net/xmlns/1.0"). -define(JsonNSAtrr, {'xmlns:json', ?JsonNSUri}). -define(JsonNSRoot, 'json:root'). -define(JsonNSArray, 'json:array'). -record(context, {machine, qname}). -export([parse_to_xml/1, parse_to_poet/1]). -export([test/0]). parse_to_xml(Data) -> parse(Data, #context{machine = fun xml_sm:state/2}). parse_to_poet(Data) -> parse(Data, #context{machine = fun poet_sm:state/2}). parse(Bin, Context) when is_binary(Bin) -> parse(binary_to_list(Bin), Context); parse(Str, #context{machine=MachineFun}=Context) -> State1 = MachineFun({startDocument}, undefined), State2 = parse_root(skip_ws(Str), State1, Context), _State = MachineFun({endDocument}, State2). %% since a valid xml should have a root element, we add one here. parse_root([${|T], State, #context{machine=MachineFun}=Context) -> State1 = MachineFun({startElement, ?JsonNSUri, root, ?JsonNSRoot, [?JsonNSAtrr]}, State), Context1 = Context#context{qname = undefined}, {_Rest, State2} = parse_object(skip_ws(T), State1, Context1), _State = MachineFun({endElement, ?JsonNSUri, root, ?JsonNSRoot}, State2); parse_root([$[|T], State, #context{machine=MachineFun}=Context) -> State1 = MachineFun({startElement, ?JsonNSUri, root, ?JsonNSRoot, [?JsonNSAtrr]}, State), Context1 = Context#context{qname = ?JsonNSArray}, {_Rest, State2} = parse_array(skip_ws(T), State1, Context1), _State = MachineFun({endElement, ?JsonNSUri, root, ?JsonNSRoot}, State2). parse_object([$}|T], State, _Context) -> {T, State}; parse_object([$,|T], State, Context) -> parse_object(skip_ws(T), State, Context); parse_object([$"|T], State, #context{machine=MachineFun}=Context) -> {Rest, ObjNameStr} = parse_string(skip_ws(T), []), ObjName = list_to_atom(ObjNameStr), Context1 = Context#context{qname = ObjName}, [$:|T1] = skip_ws(Rest), {Rest1, State1} = case skip_ws(T1) of [$[|T2] -> %% the value is array, we'll create a list of elements named as this 'ObjName' parse_array(skip_ws(T2), State, Context1); _ -> StateX1 = MachineFun({startElement, "", ObjName, ObjName, []}, State), {RestX, StateX2} = parse_value(skip_ws(T1), StateX1, Context1), StateX3 = MachineFun({endElement, "", ObjName, ObjName}, StateX2), {RestX, StateX3} end, parse_object(skip_ws(Rest1), State1, Context1). parse_array([$]|T], State, _Context) -> {T, State}; parse_array([$,|T], State, Context) -> parse_array(skip_ws(T), State, Context); parse_array(Chars, State, #context{machine=MachineFun, qname=QName}=Context) -> State1 = MachineFun({startElement, "", QName, QName, []}, State), {Rest, State2} = parse_value(Chars, State1, Context), State3 = MachineFun({endElement, "", QName, QName}, State2), parse_array(skip_ws(Rest), State3, Context). parse_value([], State, _Context) -> {[], State}; parse_value("true"++T, State, #context{machine=MachineFun}) -> State1 = MachineFun({characters, "true"}, State), {T, State1}; parse_value("false"++T, State, #context{machine=MachineFun}) -> State1 = MachineFun({characters, "false"}, State), {T, State1}; parse_value("null"++T, State, #context{machine=MachineFun}) -> State1 = MachineFun({characters, "null"}, State), {T, State1}; parse_value([$"|T], State, #context{machine=MachineFun}) -> {Rest, Value} = parse_string(T, []), State1 = MachineFun({characters, Value}, State), {Rest, State1}; parse_value([${|T], State, Context) -> parse_object(skip_ws(T), State, Context); parse_value([$[|T], State, Context) -> parse_array(skip_ws(T), State, Context); parse_value(Chars, State, #context{machine=MachineFun}) -> {Rest, Value} = parse_number(skip_ws(Chars), []), State1 = MachineFun({characters, Value}, State), {Rest, State1}. parse_string([$"|T], Acc) -> {T, lists:reverse(Acc)}; parse_string([$\\, Key|T], Acc) -> parse_escaped_char(Key, T, Acc); parse_string([H|T], Acc) -> parse_string(T, [H|Acc]). parse_escaped_char($b, Rest, Acc) -> parse_string(Rest, [8|Acc]); parse_escaped_char($t, Rest, Acc) -> parse_string(Rest, [9|Acc]); parse_escaped_char($n, Rest, Acc) -> parse_string(Rest, [10|Acc]); parse_escaped_char($f, Rest, Acc) -> parse_string(Rest, [12|Acc]); parse_escaped_char($r, Rest, Acc) -> parse_string(Rest, [13|Acc]); parse_escaped_char($/, Rest, Acc) -> parse_string(Rest, [$/|Acc]); parse_escaped_char($\\, Rest, Acc) -> parse_string(Rest, [$\\|Acc]); parse_escaped_char($", Rest, Acc) -> parse_string(Rest, [$"|Acc]); parse_escaped_char($u, [D0, D1, D2, D3|Rest], Acc) -> parse_string(Rest, [(digit_hex(D0) bsl 12) + (digit_hex(D1) bsl 8) + (digit_hex(D2) bsl 4) + (digit_hex(D3))|Acc]). digit_hex($0) -> 0; digit_hex($1) -> 1; digit_hex($2) -> 2; digit_hex($3) -> 3; digit_hex($4) -> 4; digit_hex($5) -> 5; digit_hex($6) -> 6; digit_hex($7) -> 7; digit_hex($8) -> 8; digit_hex($9) -> 9; digit_hex($A) -> 10; digit_hex($B) -> 11; digit_hex($C) -> 12; digit_hex($D) -> 13; digit_hex($E) -> 14; digit_hex($F) -> 15; digit_hex($a) -> 10; digit_hex($b) -> 11; digit_hex($c) -> 12; digit_hex($d) -> 13; digit_hex($e) -> 14; digit_hex($f) -> 15. finish_number(Rest, Acc) -> Value = lists:reverse(Acc), % Value = % case catch list_to_integer(Str) of % {'EXIT', _} -> list_to_float(Str); % Number -> Number % end, {Rest, Value}. parse_number([], _Acc) -> exit(syntax_error); parse_number([$-|T], Acc) -> parse_number1(T, [$-|Acc]); parse_number(Rest, Acc) -> parse_number1(Rest, Acc). parse_number1(Rest, Acc) -> {Acc1, Rest1} = parse_int_part(Rest, Acc), case Rest1 of [] -> finish_number([], Acc1); [$.|More] -> {Acc2, Rest2} = parse_int_part(More, [$.| Acc1]), parse_exp(Rest2, Acc2, false); _ -> parse_exp(Rest1, Acc1, true) end. parse_int_part([], Acc) -> {Acc, []}; parse_int_part([Ch|Rest], Acc) -> case is_digit(Ch) of true -> parse_int_part(Rest, [Ch | Acc]); false -> {Acc, [Ch | Rest]} end. parse_exp([$e|T], Acc, NeedFrac) -> parse_exp1(T, Acc, NeedFrac); parse_exp([$E|T], Acc, NeedFrac) -> parse_exp1(T, Acc, NeedFrac); parse_exp(Rest, Acc, _NeedFrac) -> finish_number(Rest, Acc). parse_exp1(Rest, Acc, NeedFrac) -> {Acc1, Rest1} = parse_signed_int_part(Rest, if NeedFrac -> [$e, $0, $.|Acc]; true -> [$e|Acc] end), finish_number(Rest1, Acc1). parse_signed_int_part([$+|T], Acc) -> parse_int_part(T, [$+|Acc]); parse_signed_int_part([$-|T], Acc) -> parse_int_part(T, [$-|Acc]); parse_signed_int_part(Rest, Acc) -> parse_int_part(Rest, Acc). is_digit(C) when is_integer(C) andalso C >= $0 andalso C =< $9 -> true; is_digit(_) -> false. skip_ws([H|T]) when H =< 32 -> skip_ws(T); skip_ws(Chars) -> Chars. test() -> Text1 = "{\"firstname\":\"Caoyuan\", \"iq\":\"150\"}", {ok, Xml1} = parse_to_xml(Text1), XmlText1 = lists:flatten(xmerl:export_simple([Xml1], xmerl_xml)), io:fwrite(user, "Parsed XML: ~n~p~n", [XmlText1]), {ok, Poet1} = parse_to_poet(Text1), io:fwrite(user, "Parsed POET: ~n~p~n", [Poet1]), Text2 = "[{\"firstname\":\"Caoyuan\", \"iq\":\"150\"}, {\"firstname\":\"Haobo\", \"iq\":150}]", {ok, Xml2} = parse_to_xml(Text2), XmlText2 = lists:flatten(xmerl:export_simple([Xml2], xmerl_xml)), io:fwrite(user, "Parsed: ~n~p~n", [XmlText2]), Text = " {\"businesses\": [{\"address1\": \"650 Mission Street\", \"address2\": \"\", \"avg_rating\": 4.5, \"categories\": [{\"category_filter\": \"localflavor\", \"name\": \"Local Flavor\", \"search_url\": \"http://lightpole.net/search\"}], \"city\": \"San Francisco\", \"distance\": 0.085253790020942688, \"id\": \"4kMBvIEWPxWkWKFN__8SxQ\", \"latitude\": 37.787185668945298, \"longitude\": -122.40093994140599}, {\"address1\": \"25 Maiden Lane\", \"address2\": \"\", \"avg_rating\": 5.0, \"categories\": [{\"category_filter\": \"localflavor\", \"name\": \"Local Flavor\", \"search_url\": \"http://lightpole.net/search\"}], \"city\": \"San Francisco\", \"distance\": 0.23186808824539185, \"id\": \"O1zPF_b7RyEY_NNsizX7Yw\", \"latitude\": 37.788387, \"longitude\": -122.40401}]} ", {ok, Xml} = parse_to_xml(Text), %io:fwrite(user, "Xml Tree: ~p~n", [Xml]), XmlText = lists:flatten(xmerl:export_simple([Xml], xmerl_xml)), io:fwrite(user, "Parsed: ~n~p~n", [XmlText]), Latitude1 = xmerl_xpath:string("/lp:root/businesses[1]/latitude/text()", Xml), io:format(user, "Latitude1: ~p~n", [Latitude1]).
The result will be something like:
<?xml version="1.0"?> <json:root xmlns:json="http://www.lightpole.net/xmlns/1.0"> <businesses> <address1>650 Mission Street</address1> <address2></address2> <avg_rating>4.5</avg_rating> <categories> <category_filter>localflavor</category_filter> <name>Local Flavor</name> <search_url>http://lightpole.net/search</search_url> </categories> <city>San Francisco</city> <distance>0.085253790020942688</distance> <id>4kMBvIEWPxWkWKFN__8SxQ</id> <latitude>37.787185668945298</latitude> <longitude>-122.40093994140599</longitude> </businesses> <businesses> <address1>25 Maiden Lane</address1> <address2></address2> <avg_rating>5.0</avg_rating> <categories> <category_filter>localflavor</category_filter> <name>Local Flavor</name> <search_url>http://lightpole.net/search</search_url> </categories> <city>San Francisco</city> <distance>0.23186808824539185</distance> <id>O1zPF_b7RyEY_NNsizX7Yw</id> <latitude>37.788387</latitude> <longitude>-122.40401</longitude> </businesses> </root>
Now you fecth element by:
> [Latitude1] = xmerl_xpath:string("/json:root/businesses[1]/latitude/text()", Xml), > Latitude1#xmlText.value. "37.787185668945298"
Next time, I'll write a simple Erlang Data state machine, which will parse icalendar and json to simple Erlang Lists + Tuples.
The code of xml_sm.erl can be found in my previous blog.
A Simple XML State Machine Accepting SAX Events to Build xmerl Compitable XML Tree: icalendar demo
xmerl is a full XML functionality in Erlang, with a lot of features like XPATH, XSLT, event_function, acc_function etc. Well, now I just want to get icalendar to be parsed to form of xmerl tree, which will contain #xmlElement, #xmlAttribute, #xmlText etc, and easily to apply XPATH on it.
How about an approach that the parser just generates SAX events, and then, by attaching to a callback state machine to build a JSON or XML tree, or anything else?
I hoped xmerl is something like this, i.e. a parser to generate SAX events, and a state machine to accept the events and build the XML tree. I digged into xmerl's code, but, unfortunately, the parser and state machine are coupled together.
So I wrote a simple state machine which just receives SAX events to build a xmerl compitable XML tree. And, I applied it to icalendar.
I like this idea, by using SAX events as the common interface, I only need to write a another JSON state machine later, then, the result will be JSON of icalendar. I can share the same parser which just generates SAX events.
Here's the code, which is not completed yet, just to show how a SAX interface can serve a lot.
%%% A state machine which receives sax events and builds a xmerl compitable tree -module(xml_sm). -include_lib("xmerl/include/xmerl.hrl"). -export([state/2]). -export([test/0 ]). -record(xmlsmState, { qname = undefined, attributes = [], content = [], parents = [] }). receive_events(Events) -> receive_events(Events, undefined). receive_events([], _States) -> {ok, [], []}; receive_events([Event|T], States) -> case state(Event, States) of {ok, TopElement} -> {ok, TopElement, T}; {error, Reason} -> {error, Reason}; States1 -> receive_events(T, States1) end. state({startDocument}, _StateStack) -> State = #xmlsmState{}, [State]; state({endDocument}, StateStack) -> %io:fwrite(user, "endDocument, states: ~p~n", [StateStack]), case StateStack of {ok, TopElement} -> {ok, TopElement}; _ -> {error, io:fwrite(user, "Bad element match, StateStack is: ~n~p~n", [StateStack])} end; state({startElement, _Uri, _LocalName, QName, Attrs}, StateStack) -> %io:fwrite(user, "startElement~n", []), %% pop current State [State|_StatesPrev] = StateStack, #xmlsmState{parents=Parents} = State, {_Pos, Attributes1} = lists:foldl( fun ({Key, Value}, {Pos, AccAttrs}) -> Pos1 = Pos + 1, Attr = #xmlAttribute{name = Key, value = Value, parents = [{QName, Pos1}|Parents]}, {Pos1, [Attr|AccAttrs]} end, {0, []}, Attrs), Parents1 = [{QName, 0}|Parents], %% push new state of Attributes, Content and Parents to StateStack NewState = #xmlsmState{qname = QName, attributes = Attributes1, content = [], parents = Parents1}, [NewState|StateStack]; state({endElement, _Uri, _LocalName, QName}, StateStack) -> %% pop current State [State|StatesPrev] = StateStack, #xmlsmState{qname=ElemName, attributes=Attributes, content=Content, parents=Parents} = State, %io:fwrite(user, "Element end with Name: ~p~n", [Name]), if QName == undefined -> %% don't care undefined; QName /= ElemName -> throw(lists:flatten(io_lib:format( "Element name match error: ~p should be ~p~n", [QName, ElemName]))); true -> undefined end, %% composite a new element [_|ParentsPrev] = Parents, Element = #xmlElement{name = QName, attributes = lists:reverse(Attributes), content = lists:reverse(Content), parents = ParentsPrev}, %io:fwrite(user, "Element: ~p~n", [Element]), %% put Element to parent's content and return new state stack case StatesPrev of [_ParentState|[]] -> %% reached the top now, return final result {ok, Element}; [ParentState|Other] -> #xmlsmState{content=ParentContent} = ParentState, ParentContent1 = [Element|ParentContent], %% update parent state and backward to it: ParentState1 = ParentState#xmlsmState{content = ParentContent1}, %io:fwrite(user, "endElement, state: ~p~n", [State1]), [ParentState1|Other] end; state({characters, Characters}, StateStack) -> %% pop current State [State|StatesPrev] = StateStack, #xmlsmState{content=Content, parents=Parents} = State, [{Parent, Pos}|ParentsPrev] = Parents, Pos1 = Pos + 1, Text = #xmlText{value = Characters, parents = [{Parent, Pos1}|ParentsPrev]}, Content1 = [Text|Content], Parents1 = [{Parent, Pos1}|ParentsPrev], UpdatedState = State#xmlsmState{content = Content1, parents = Parents1}, [UpdatedState|StatesPrev]. test() -> Events = [ {startDocument}, {startElement, "", feed, feed, [{link, "http://lightpole.net"}, {author, "Caoyuan"}]}, {characters, "feed text"}, {startElement, "", entry, entry, [{tag, "Erlang, Function"}]}, {characters, "Entry1's text"}, {endElement, "", entry, entry}, {startElement, "", entry, entry, []}, {characters, "Entry2's text"}, {endElement, "", entry, entry}, {endElement, "", feed, feed}, {endDocument} ], %% Streaming: {ok, Xml1, _Rest} = receive_events(Events), io:fwrite(user, "Streaming Result: ~n~p~n", [Xml1]), %% Stepped: FunCallback = fun xml_sm:state/2, FinalStates = lists:foldl( fun (Event, States) -> FunCallback(Event, States) end, undefined, Events), {ok, Xml2} = FinalStates, XmlText = lists:flatten(xmerl:export_simple([Xml2], xmerl_xml)), io:fwrite(user, "Stepped Result: ~n~p~n", [XmlText]).
And the primary icalendar front end:
-module(ical_parser). -include_lib("xmerl/include/xmerl.hrl"). -export([parse/1 ]). -export([test/0 ]). -define(stateMachine, fun xml_sm:state/2). parse(Text) -> States1 = ?stateMachine({startDocument}, undefined), States2 = parse_line(skip_ws(Text), 0, States1), ?stateMachine({endDocument}, States2). parse_line([], _Line, States) -> States; parse_line([$\s|T], Line, States) -> parse_line(T, Line, States); parse_line([$\t|T], Line, States) -> parse_line(T, Line, States); parse_line([$\r|T], Line, States) -> parse_line(T, Line, States); parse_line([$\n|T], Line, States) -> parse_line(T, Line + 1, States); parse_line("BEGIN"++T, Line, States) -> case skip_ws(T) of [$:|T1] -> {Rest, Line1, Name} = parse_component_name(skip_ws(T1), Line, States, []), %io:fwrite(user, "Component started: ~p~n", [Name]), States1 = ?stateMachine({startElement, "", Name, Name, []}, States), parse_line(skip_ws(Rest), Line1, States1); _ -> error end; parse_line("END"++T, Line, States) -> case skip_ws(T) of [$:|T1] -> {Rest, Line1, Name} = parse_component_name(skip_ws(T1), Line, States, []), States1 = ?stateMachine({endElement, "", Name, Name}, States), parse_line(skip_ws(Rest), Line1, States1); _ -> error end; parse_line(Text, Line, States) -> {Rest, Line1, {Name, Params}, Value} = parse_prop(skip_ws(Text), Line, States, {[], []}), States1 = ?stateMachine({startElement, "", Name, Name, Params}, States), States2 = ?stateMachine({characters, Value}, States1), States3 = ?stateMachine({endElement, "", Name, Name}, States2), parse_line(skip_ws(Rest), Line1, States3). parse_component_name([$\r|T], Line, States, Name) -> parse_component_name(T, Line, States, Name); parse_component_name([$\n|T], Line, States, Name) -> case unfolding_line(T) of {true, Rest} -> parse_component_name(Rest, Line, States, Name); {false, Rest} -> {Rest, Line + 1, list_to_atom(string:to_lower(lists:reverse(Name)))} end; parse_component_name([H|T], Line, States, Name) -> parse_component_name(skip_ws(T), Line, States, [H|Name]). parse_prop([$:|T], Line, States, {Name, NameParams}) -> PropName = list_to_atom(string:to_lower(lists:reverse(Name))), PropNameParams = lists:reverse(NameParams), %io:fwrite(user, "parsed prop name: ~p, with params: ~p~n", [PropName, NameParams]), {Rest, Line1, Value} = parse_prop_value(T, Line, States, []), %io:fwrite(user, "parsed prop : ~p~n", [{PropName, NameParams, Value}]), {Rest, Line1, {PropName, PropNameParams}, Value}; parse_prop([$;|T], Line, States, {Name, NameParams}) -> {Rest, Line1, ParamName, ParamValue} = parse_param(T, Line, States, []), parse_prop(Rest, Line1, States, {Name, [{ParamName, ParamValue}|NameParams]}); parse_prop([H|T], Line, States, {Name, NameParams}) -> parse_prop(skip_ws(T), Line, States, {[H|Name], NameParams}). parse_prop_value([$\r|T], Line, States, Value) -> parse_prop_value(T, Line, States, Value); parse_prop_value([$\n|T], Line, States, Value) -> case unfolding_line(T) of {true, Rest} -> parse_prop_value(Rest, Line, States, Value); {false, Rest} -> {Rest, Line + 1, lists:reverse(Value)} end; parse_prop_value([H|T], Line, States, Value) -> parse_prop_value(T, Line, States, [H|Value]). parse_param([$=|T], Line, States, Name) -> ParamName = list_to_atom(string:to_lower(lists:reverse(Name))), {Rest, Line1, Value} = parse_param_value(T, Line, States, []), {Rest, Line1, ParamName, Value}; parse_param([H|T], Line, States, Name) -> parse_param(skip_ws(T), Line, States, [H|Name]). parse_param_value([$;|T], Line, _States, Value) -> {T, Line, lists:reverse(Value)}; parse_param_value([$:|T], Line, _States, Value) -> %% keep $: for end of prop name {[$:|T], Line, lists:reverse(Value)}; parse_param_value([H|T], Line, States, Value) -> parse_param_value(T, Line, States, [H|Value]). unfolding_line([$\s|T]) -> {true, T}; %% space unfolding_line([$\t|T]) -> {true, T}; %% htab unfolding_line(Chars) -> {false, Chars}. skip_ws([$\s|T]) -> skip_ws(T); skip_ws([$\t|T]) -> skip_ws(T); skip_ws(Text) -> Text. test() -> Text = " BEGIN:VCALENDAR METHOD:PUBLISH X-WR-CALNAME:Mi's Calendar VERSION:2.0 PRODID:Spongecell CALSCALE:GREGORIAN BEGIN:VEVENT DTSTART;TZID=America/Los_Angeles:20061206T120000 DTSTAMP:20070728T004842 LOCATION:Gordon Biersch, 640 Emerson St, Palo Alto, CA URL: UID:295803:spongecell.com SUMMARY:All hands meeting RRULE:FREQ=WEEKLY;INTERVAL=1 DTEND;TZID=America/Los_Angeles:20061206T130000 DESCRIPTION: END:VEVENT BEGIN:VEVENT DTSTART;TZID=America/Los_Angeles:20061207T120000 DTSTAMP:20070728T004842 LOCATION:395 ano nuevo ave\, sunnyvale\, ca URL: UID:295802:spongecell.com SUMMARY:Company lunch RRULE:FREQ=WEEKLY;INTERVAL=1 DTEND;TZID=America/Los_Angeles:20061207T130000 DESCRIPTION:Let's have lots of beer!! (well\, and some code review :) END:VEVENT BEGIN:VEVENT DTSTART;TZID=America/Los_Angeles:20061213T123000 DTSTAMP:20070728T004842 LOCATION:369 S California Ave\, Palo Alto\, CA URL: UID:295714:spongecell.com SUMMARY:Ben is back.. want to meet again DTEND;TZID=America/Los_Angeles:20061213T133000 DESCRIPTION:Re: Ben is back.. want to meet again\n Marc END:VEVENT BEGIN:VEVENT DTSTART;TZID=America/Los_Angeles:20070110T200000 DTSTAMP:20070728T004842 LOCATION: URL: UID:304529:spongecell.com SUMMARY:flight back home DTEND;TZID=America/Los_Angeles:20070110T210000 DESCRIPTION: END:VEVENT BEGIN:VTIMEZONE TZID:America/Los_Angeles BEGIN:STANDARD DTSTART:20071104T000000 TZNAME:PST RRULE:FREQ=YEARLY;BYMONTH=11;BYDAY=1SU TZOFFSETFROM:-0700 TZOFFSETTO:-0800 END:STANDARD BEGIN:DAYLIGHT DTSTART:20070311T000000 TZNAME:PDT RRULE:FREQ=YEARLY;BYMONTH=3;BYDAY=1SU TZOFFSETFROM:-0800 TZOFFSETTO:-0700 END:DAYLIGHT END:VTIMEZONE END:VCALENDAR ", io:fwrite(user, "Text: ~s~n", [Text]), {ok, Xml} = parse(Text), XmlText = lists:flatten(xmerl:export_simple([Xml], xmerl_xml)), io:fwrite(user, "Parsed: ~n~p~n", [XmlText]).
You may have noticed, the ?stateMachine can be pointed to a json_machine:state/2 some day, and we can get a JSON result without modification of icalendar.erl.
This also can be applied on JSON<->XML transform. Actually, I think SAX events is a good interface for various formats transform of data object. It's also a bit Erlang Style (Event passing). The parser/state-machine can communicate via SAX events as two separate processes and live with send/receive.
From Rails to Erlyweb - Part IV
IV. Support Mysql Spatial Extensions via erlydb_mysql
ErlyDB supports most database operations, and it can also be extended to support more complex operations. Here is an example, where I need to support Mysql spatial extensions.
Assume we have a table: places, which has a mysql point field: location, here is the code:
-module(places). -export([get_location_as_tuple/1, set_location_by_tuple/2 ]). %% @spec get_location_as_tuple(Id::integer()) -> {float(), float()} get_location_as_tuple(Id) -> ESQL = {esql, {select, 'AsText(location)', {from, ?MODULE}, {where, {'id', '=', Id}}}}, case erlydb_mysql:q(ESQL) of %% Example: %% {data, {mysql_result, [{<<>>, <<"AsText(location)">>, 8192, 'VAR_STRING'}], %% [[<<"POINT(-122.292 37.8341)">>]], %% 0, %% []}} {data, {mysql_result, [{_, <<"AsText(location)">>, _, _}], [[FirstResult]|_Rest], _, _}} -> case io_lib:fread("POINT(~f ~f)", binary_to_list(FirstResult)) of {ok, [X, Y], _} -> {X, Y}; _Else -> undefined end; _Error -> undefined end. %% @spec set_location_by_tuple(Id::integer(), {X::float(), Y::float()}) -> ok | error set_location_by_tuple(Id, {X, Y}) -> %% "UPDATE places SET location = PointFromText(\'POINT(1 1)\') WHERE (id = 1)" PointFromText = point_tuple_to_point_from_text({X, Y}), ESQL = {esql, {update, ?MODULE, [{location, PointFromText}], {'id', '=', Id}}}, Options = [{allow_unsafe_statements, true}], case erlydb_mysql:q(ESQL, Options) of {updated, {mysql_result, [], [], _UpdatedNum, []}} -> ok; _Error -> error end. point_tuple_to_point_from_text({X, Y}) -> %% as mysql support float in format of someting like 1.20002300000000005298e+02, %% we can just apply float_to_list/1 to X and Y PointFromText = lists:flatten(io_lib:fwrite("PointFromText('POINT(~f ~f)')", [X, Y])), list_to_atom(PointFromText). %% 'PointFromText(\'POINT(X Y)\')'
Now we can:
> places:set_location_by_tuple(6, {-11.11, 88.88}). > places:get_location_as_tuple(6). {-11.11, 88.88}
HTML Entity Refs and xmerl
According to [erlang-bugs] xmerl and standard HTML entity refs, currently xmerl_scan only recognizes the very limited set of entity references. In brief, if you try to xmerl:scan xml text that includes standard HTML entity refs, such as nbsp, iexcl, pound, frac14, etc. you'll encounter something like:
16> edoc:file("exprecs.erl"). 2670- fatal: {unknown_entity_ref,nbsp} 2580- fatal: error_scanning_entity_ref exprecs.erl, in module header: at line 28: error in XML parser: {fatal, {error_scanning_entity_ref, {file,file_name_unknown}, {line,86}, {col,18}}}. ** exited: error **
Ulf Wiger said:
... I realize that xmerl can be customized with a rules function which, for example, can handle entity references...
So I take a try by writing a piece of code (html_entity_refs.erl) to parse a HTML entity ref DTD file to ets rules, then:
xmerl_scan:string(XmlText, [{rules, html_entity_refs:get_xmerl_rules()}]).
Yes, this works. But for a 3MB testing file, the parsing took about 30 seconds.
How about convert these entity refs to utf-8 chars first, then apply xmerl_scan to it?
I wrote another piece of code, and now:
xmerl_scan:string(html_entity_refs:decode_for_xml(XmlText)).
This time, the decoding+parsing time is about 5 seconds, it's 6 times faster than ets rules solution.
The html_entity_refs.erl can be got from:
http://caoyuan.googlecode.com/svn/trunk/erlang/html_entity_refs.erl
From Rails to Erlyweb - Part III
3. The Magic Behind Erlyweb
With dynamic typing, hot code swapping, built-in parsing and compilation tools, Erlang is also suitable for dynamic meta-programming. Erlyweb uses a small convenient tool smerl to generate db record CRUD code automatically.
The music example on erlyweb.org shows the magic:
Define a simple musician.erl module (just one line here):
-module(musician).
Then, you will get a long list functions for musician module after erlyweb:compile("apps/music"). Sounds similar to Rails.
I like to watch magic show, but I can't stand that I do not know the things behind magic in programming. For Rails, the magic behind it always makes me headache, it's too difficult to follow who, where, some code are injected into a class or module. But in Erlang, it's super easy.
I add a simple function to erlyweb_app.erl (please see my previous post):
-export([decompile/2]). decompile(AppName, Beam) when is_atom(AppName) -> decompile(atom_to_list(AppName), Beam); decompile(AppName, Beam) when is_list(AppName) -> BinFilename = "./apps/" ++ AppName ++ "/ebin/" ++ atom_to_list(Beam), io:format("Beam file: ~s~n", [BinFilename]), {ok, {_, [{abstract_code, {_, AC}}]}} = beam_lib:chunks(BinFilename, [abstract_code]), SrcFilename = "./apps/" ++ AppName ++ "_" ++ atom_to_list(Beam), {ok, S} = file:open(SrcFilename ++ ".erl", write), io:fwrite(S, "~s~n", [erl_prettypr:format(erl_syntax:form_list(AC))]).
Now type erlyweb_decompile(music, musician) under the Erlang shell, I get a file: music_musician.erl under folder myproject/apps/ (I put these decompiled source files under /myproject/apps/ to avoid that they are auto-compiled to beams by erlyweb again ):
-file("musician", 1). -module(musician). -export([relations/0, fields/0, table/0, type_field/0, db_table/0, db_field/1, is_new/1, is_new/2, get_module/1, to_iolist/1, to_iolist/2, field_to_iolist/2, new/0, new_with/1, new_with/2, new_from_strings/1, set_fields/2, set_fields/3, set_fields_from_strs/2, field_from_string/2, save/1, insert/1, update/1, update/2, delete/1, delete_id/1, delete_where/1, delete_all/0, transaction/1, before_save/1, after_save/1, before_delete/1, after_delete/1, after_fetch/1, find/2, find_first/2, find_max/3, find_range/4, find_id/1, aggregate/4, count/0, get/2, set_related_one_to_many/2, find_related_one_to_many/2, find_related_many_to_one/4, aggregate_related_many_to_one/6, add_related_many_to_many/3, remove_related_many_to_many/3, remove_related_many_to_many_all/5, find_related_many_to_many/5, aggregate_related_many_to_many/7, find_related_many_first/4, find_related_many_max/5, find_related_many_range/6, aggregate_related_many/6, do_save/1, do_delete/1, field_names_for_query/0, field_names_for_query/1, field_to_iolist/1, set/3, db_pk_fields/0, get_pk_fk_fields/0, get_pk_fk_fields2/0, db_fields/0, db_field_names/0, db_field_names_str/0, db_field_names_bin/0, db_num_fields/0, id/1, id/2, name/1, name/2, birth_date/1, birth_date/2, instrument/1, instrument/2, bio/1, bio/2, new/4, driver/0, count/3, count/1, count/2, count_with/2, avg/3, avg/1, avg/2, avg_with/2, min/3, min/1, min/2, min_with/2, max/3, max/1, max/2, max_with/2, sum/3, sum/1, sum/2, sum_with/2, stddev/3, stddev/1, stddev/2, stddev_with/2, find/0, find/1, find_with/1, find_first/0, find_first/1, find_first_with/1, find_max/1, find_max/2, find_max_with/2, find_range/2, find_range/3, find_range_with/3]). relations() -> erlydb_base:relations(). fields() -> erlydb_base:fields(). table() -> erlydb_base:table(). type_field() -> erlydb_base:type_field(). db_table() -> erlydb_base:db_table(musician). db_field(FieldName) -> erlydb_base:db_field(musician, FieldName). is_new(Rec) -> erlydb_base:is_new(Rec). is_new(Rec, Val) -> erlydb_base:is_new(Rec, Val). get_module(Rec) -> erlydb_base:get_module(Rec). to_iolist(Recs) -> erlydb_base:to_iolist(musician, Recs). to_iolist(Recs, ToIolistFun) -> erlydb_base:to_iolist(musician, Recs, ToIolistFun). field_to_iolist(Val, _Field) -> erlydb_base:field_to_iolist(Val, _Field). new() -> erlydb_base:new(musician). new_with(Fields) -> erlydb_base:new_with(musician, Fields). new_with(Fields, ToFieldFun) -> erlydb_base:new_with(musician, Fields, ToFieldFun). new_from_strings(Fields) -> erlydb_base:new_from_strings(musician, Fields). set_fields(Record, Fields) -> erlydb_base:set_fields(musician, Record, Fields). set_fields(Record, Fields, ToFieldFun) -> erlydb_base:set_fields(musician, Record, Fields, ToFieldFun). set_fields_from_strs(Record, Fields) -> erlydb_base:set_fields_from_strs(musician, Record, Fields). field_from_string(ErlyDbField, undefined) -> erlydb_base:field_from_string(ErlyDbField, undefined). save(Rec) -> erlydb_base:save(Rec). insert(Recs) -> erlydb_base:insert(Recs). update(Props) -> erlydb_base:update(musician, Props). update(Props, Where) -> erlydb_base:update(musician, Props, Where). delete(Rec) -> erlydb_base:delete(Rec). delete_id(Id) -> erlydb_base:delete_id(musician, Id). delete_where(Where) -> erlydb_base:delete_where(musician, Where). delete_all() -> erlydb_base:delete_all(musician). transaction(Fun) -> erlydb_base:transaction(musician, Fun). before_save(Rec) -> erlydb_base:before_save(Rec). after_save(Rec) -> erlydb_base:after_save(Rec). before_delete(Rec) -> erlydb_base:before_delete(Rec). after_delete({_Rec, Num}) -> erlydb_base:after_delete({_Rec, Num}). after_fetch(Rec) -> erlydb_base:after_fetch(Rec). find(Where, Extras) -> erlydb_base:find(musician, Where, Extras). find_first(Where, Extras) -> erlydb_base:find_first(musician, Where, Extras). find_max(Max, Where, Extras) -> erlydb_base:find_max(musician, Max, Where, Extras). find_range(First, Max, Where, Extras) -> erlydb_base:find_range(musician, First, Max, Where, Extras). find_id(Id) -> erlydb_base:find_id(musician, Id). aggregate(AggFunc, Field, Where, Extras) -> erlydb_base:aggregate(musician, AggFunc, Field, Where, Extras). count() -> erlydb_base:count(musician). get(Idx, Rec) -> erlydb_base:get(Idx, Rec). set_related_one_to_many(Rec, Other) -> erlydb_base:set_related_one_to_many(Rec, Other). find_related_one_to_many(OtherModule, Rec) -> erlydb_base:find_related_one_to_many(OtherModule, Rec). find_related_many_to_one(OtherModule, Rec, Where, Extras) -> erlydb_base:find_related_many_to_one(OtherModule, Rec, Where, Extras). aggregate_related_many_to_one(OtherModule, AggFunc, Rec, Field, Where, Extras) -> erlydb_base:aggregate_related_many_to_one(OtherModule, AggFunc, Rec, Field, Where, Extras). add_related_many_to_many(JoinTable, Rec, OtherRec) -> erlydb_base:add_related_many_to_many(JoinTable, Rec, OtherRec). remove_related_many_to_many(JoinTable, Rec, OtherRec) -> erlydb_base:remove_related_many_to_many(JoinTable, Rec, OtherRec). remove_related_many_to_many_all(JoinTable, OtherTable, Rec, Where, Extras) -> erlydb_base:remove_related_many_to_many_all(JoinTable, OtherTable, Rec, Where, Extras). find_related_many_to_many(OtherModule, JoinTable, Rec, Where, Extras) -> erlydb_base:find_related_many_to_many(OtherModule, JoinTable, Rec, Where, Extras). aggregate_related_many_to_many(OtherModule, JoinTable, AggFunc, Rec, Field, Where, Extras) -> erlydb_base:aggregate_related_many_to_many(OtherModule, JoinTable, AggFunc, Rec, Field, Where, Extras). find_related_many_first(Func, Rec, Where, Extras) -> erlydb_base:find_related_many_first(Func, Rec, Where, Extras). find_related_many_max(Func, Rec, Num, Where, Extras) -> erlydb_base:find_related_many_max(Func, Rec, Num, Where, Extras). find_related_many_range(Func, Rec, First, Last, Where, Extras) -> erlydb_base:find_related_many_range(Func, Rec, First, Last, Where, Extras). aggregate_related_many(Func, AggFunc, Rec, Field, Where, Extras) -> erlydb_base:aggregate_related_many(Func, AggFunc, Rec, Field, Where, Extras). do_save(Rec) -> erlydb_base:do_save(Rec). do_delete(Rec) -> erlydb_base:do_delete(Rec). field_names_for_query() -> erlydb_base:field_names_for_query(musician). field_names_for_query(UseStar) -> erlydb_base:field_names_for_query(musician, UseStar). field_to_iolist(Val) -> erlydb_base:field_to_iolist(Val). set(Idx, Rec, Val) -> setelement(Idx, Rec, Val). db_pk_fields() -> erlydb_base:db_pk_fields([{erlydb_field, id, "id", <<105, 100>>, int, 11, integer, text_field, false, primary, undefined, identity}]). get_pk_fk_fields() -> erlydb_base:get_pk_fk_fields([{id, musician_id}]). get_pk_fk_fields2() -> erlydb_base:get_pk_fk_fields2([{id, musician_id1, musician_id2}]). db_fields() -> erlydb_base:db_fields([{erlydb_field, id, "id", <<105, 100>>, int, 11, integer, text_field, false, primary, undefined, identity}, {erlydb_field, name, "name", <<110, 97, 109, 101>>, varchar, 20, binary, text_field, true, undefined, undefined, undefined}, {erlydb_field, birth_date, "birth_date", <<98, 105, 114, 116, 104, 95, 100, 97, 116, 101>>, date, undefined, date, text_field, true, undefined, undefined, undefined}, {erlydb_field, instrument, "instrument", <<105, 110, 115, 116, 114, 117, 109, 101, 110, 116>>, enum, [<<103, 117, 105, 116, 97, 114>>, <<112, 105, 97, 110, 111>>, <<100, 114, 117, 109, 115>>, <<118, 111, 99, 97, 108, 115>>], binary, select, true, undefined, undefined, undefined}, {erlydb_field, bio, "bio", <<98, 105, 111>>, text, undefined, binary, text_area, true, undefined, undefined, undefined}]). db_field_names() -> erlydb_base:db_field_names([id, name, birth_date, instrument, bio]). db_field_names_str() -> erlydb_base:db_field_names_str(["id", "name", "birth_date", "instrument", "bio"]). db_field_names_bin() -> erlydb_base:db_field_names_bin([<<105, 100>>, <<110, 97, 109, 101>>, <<98, 105, 114, 116, 104, 95, 100, 97, 116, 101>>, <<105, 110, 115, 116, 114, 117, 109, 101, 110, 116>>, <<98, 105, 111>>]). db_num_fields() -> erlydb_base:db_num_fields(5). id(Rec) -> erlydb_base:get(3, Rec). id(Rec, Val) -> setelement(3, Rec, Val). name(Rec) -> erlydb_base:get(4, Rec). name(Rec, Val) -> setelement(4, Rec, Val). birth_date(Rec) -> erlydb_base:get(5, Rec). birth_date(Rec, Val) -> setelement(5, Rec, Val). instrument(Rec) -> erlydb_base:get(6, Rec). instrument(Rec, Val) -> setelement(6, Rec, Val). bio(Rec) -> erlydb_base:get(7, Rec). bio(Rec, Val) -> setelement(7, Rec, Val). new(name, birth_date, instrument, bio) -> {musician, true, undefined, name, birth_date, instrument, bio}. driver() -> erlydb_base:driver({erlydb_mysql, [{last_compile_time, {{1980, 1, 1}, {0, 0, 0}}}, {outdir, "apps/music/ebin"}, debug_info, report_errors, report_warnings, {erlydb_driver, mysql}]}). count(Field, Where, Extras) -> erlydb_base:aggregate(musician, count, Field, Where, Extras). count(Field) -> erlydb_base:aggregate(musician, count, Field, undefined, undefined). count(Field, Where) -> erlydb_base:aggregate(musician, count, Field, Where, undefined). count_with(Field, Extras) -> erlydb_base:aggregate(musician, count, Field, undefined, Extras). avg(Field, Where, Extras) -> erlydb_base:aggregate(musician, avg, Field, Where, Extras). avg(Field) -> erlydb_base:aggregate(musician, avg, Field, undefined, undefined). avg(Field, Where) -> erlydb_base:aggregate(musician, avg, Field, Where, undefined). avg_with(Field, Extras) -> erlydb_base:aggregate(musician, avg, Field, undefined, Extras). min(Field, Where, Extras) -> erlydb_base:aggregate(musician, min, Field, Where, Extras). min(Field) -> erlydb_base:aggregate(musician, min, Field, undefined, undefined). min(Field, Where) -> erlydb_base:aggregate(musician, min, Field, Where, undefined). min_with(Field, Extras) -> erlydb_base:aggregate(musician, min, Field, undefined, Extras). max(Field, Where, Extras) -> erlydb_base:aggregate(musician, max, Field, Where, Extras). max(Field) -> erlydb_base:aggregate(musician, max, Field, undefined, undefined). max(Field, Where) -> erlydb_base:aggregate(musician, max, Field, Where, undefined). max_with(Field, Extras) -> erlydb_base:aggregate(musician, max, Field, undefined, Extras). sum(Field, Where, Extras) -> erlydb_base:aggregate(musician, sum, Field, Where, Extras). sum(Field) -> erlydb_base:aggregate(musician, sum, Field, undefined, undefined). sum(Field, Where) -> erlydb_base:aggregate(musician, sum, Field, Where, undefined). sum_with(Field, Extras) -> erlydb_base:aggregate(musician, sum, Field, undefined, Extras). stddev(Field, Where, Extras) -> erlydb_base:aggregate(musician, stddev, Field, Where, Extras). stddev(Field) -> erlydb_base:aggregate(musician, stddev, Field, undefined, undefined). stddev(Field, Where) -> erlydb_base:aggregate(musician, stddev, Field, Where, undefined). stddev_with(Field, Extras) -> erlydb_base:aggregate(musician, stddev, Field, undefined, Extras). find() -> erlydb_base:find(musician, undefined, undefined). find(Where) -> erlydb_base:find(musician, Where, undefined). find_with(Extras) -> erlydb_base:find(musician, undefined, Extras). find_first() -> erlydb_base:find_first(musician, undefined, undefined). find_first(Where) -> erlydb_base:find_first(musician, Where, undefined). find_first_with(Extras) -> erlydb_base:find_first(musician, undefined, Extras). find_max(Max) -> erlydb_base:find_max(musician, Max, undefined, undefined). find_max(Max, Where) -> erlydb_base:find_max(musician, Max, Where, undefined). find_max_with(Max, Extras) -> erlydb_base:find_max(musician, Max, undefined, Extras). find_range(First, Max) -> erlydb_base:find_range(musician, First, Max, undefined, undefined). find_range(First, Max, Where) -> erlydb_base:find_range(musician, First, Max, Where, undefined). find_range_with(First, Max, Extras) -> erlydb_base:find_range(musician, First, Max, undefined, Extras).
With this decompiled file, you get all things clearly behind the magic, such as, you have pair getter/setter functions for each field, for example:
Musician = musician:find({name, 'like' "Caoyuan Mus"}), %% get the 'name' field of record Musician Name = musician:name(Musician), %% set the 'name' field to "new name" and bind to a new record Musician1. Musician1 = musician:name(Musician, "new name"), %% Or, Musician2 = musician:set_fields(Musician, {name, "new name"}, {birth_day, "1940/10/9"}), %% then save one of them musician:save(Musician2).
Finally, some notices for new comer to Erlang and Erlyweb:
- In Erlang, the Variable can only be bound(set value) once , so, only Musician1 and Musician2 have the "new name", Musician will keep the original value
- For efficiency reason, if the field is varchar/text type, the getter will return a binary rather than string, which can be printed on browser directly in Erlyweb, but, if you want to use it as a string, you can apply binary_to_list(Name) on it.
From Rails to Erlyweb - Part II
Updated Aug 23: Please see From Rails to Erlyweb - Part II Manage Project - Reloaded
Updated July 15: store the database configuration in <opaque> session of yaws.conf
Updated May 2: erlweb:compile(AppDir::string(), Options::[option()]) has option: {auto_compile, Val}, where Val is 'true', or 'false'. In case of development, you can turn on {auto_compile, true}. So, you only need to run erlyweb_app:boot(myapp) once.
2. Manage project
Erlyweb provides erlyweb:compile(App, ..) to compile the source files under app directory. To start an app, you usually should erlydb:start(mysql, ....) and compile app files first. To make life easy, you can put some scripting like code under myproject\script directory. Here's my project source tree:
myproject + apps | + myapp | + ebin | + include | + nbproject | + src | + components | + lib | + services | + test | + www + config | * yaws.conf + script + ebin + src * erlyweb_app.erl
Where, config/yaws.conf contains the confsiguration that will copy/paste to your real yaws.conf file. Here's mine:
## This is the configuration of apps that will copy/paste to your yaws.conf. ebin_dir = D:/myapp/trunk/script/ebin ebin_dir = D:/myapp/trunk/apps/myapp/ebin <server localhost> port = 8000 listen = 0.0.0.0 docroot = D:/myapp/trunk/apps/myapp/www appmods = </myapp, erlyweb> <opaque> appname = myapp hostname = "localhost" username = "mememe" password = "pwpwpw" database = "myapp_development" </opaque> </server>
You may have noticed, all beams under D:/myapp/trunk/script/ebin and D:/myapp/trunk/apps/myapp/ebin will be auto-loaded by yaws.
erlyweb_app.erl is the boot scripting code, which will be used to start db connection and compile the code. Currently I run these scripts manually. I'll talk later.
-module(erlyweb_app). -export([start/1]). -export([main/1, boot/1, build/1, decompile/2 ]). -include("yaws.hrl"). %% @doc call back funtion when yaws start an app %% @see man yaws.conf %% start_mod = Module %% Defines a user provided callback module. At startup of the %% server, Module:start/1 will be called. The #sconf{} record %% (defined in yaws.hrl) will be used as the input argument. This %% makes it possible for a user application to syncronize the %% startup with the yaws server as well as getting hold of user %% specific configuration data, see the explanation for the %%context. start(ServerConf) -> Opaque = ServerConf#sconf.opaque, AppName = proplists:get_value("appname", Opaque), Database = proplists:get_value("database", Opaque), DBConf = [{database, Database}, {hostname, proplists:get_value("hostname", Opaque)}, {username, proplists:get_value("username", Opaque)}, {password, proplists:get_value("password", Opaque)}], io:fwrite(user, "Starting app ~s using database:~n~s~n", [AppName, Database]), start_db(DBConf). start_db(DBConf) -> erlydb:start(mysql, DBConf). main([AppName]) -> boot(AppName); main(_) -> usage(). boot(AppName) -> build(AppName, true). build(AppName) -> build(AppName, false). build(AppName, AutoCompile) when is_atom(AppName) -> build(atom_to_list(AppName), AutoCompile); build(AppName, AutoCompile) when is_list(AppName) -> compile(AppName, [debug_info, {auto_compile, AutoCompile}]). compile(AppName, Options) -> % Retrieve source header paths from yaws server configuration. We don't know % how to get yaws.hrl here yet, so we manually write matching rule here. {ok, GC, _} = yaws_server:getconf(), {gconf, _, _, _, _, _, _, _, _, _, _, _, _, _, Incl, _, _, _, _, _} = GC, %?Debug("paths: ~p", [Incl]), erlyweb:compile( "./apps/" ++ AppName, [{erlydb_driver, mysql}, {i, Incl}] ++ Options). decompile(AppName, Beam) when is_atom(AppName) -> decompile(atom_to_list(AppName), Beam); decompile(AppName, Beam) when is_list(AppName) -> BinFilename = "./apps/" ++ AppName ++ "/ebin/" ++ atom_to_list(Beam), io:format("Beam file: ~s~n", [BinFilename]), {ok, {_, [{abstract_code, {_, AC}}]}} = beam_lib:chunks(BinFilename, [abstract_code]), SrcFilename = "./apps/" ++ AppName ++ "_" ++ atom_to_list(Beam), %% do not with ".erl" ext? otherwise will be compiled by ealyweb {ok, S} = file:open(SrcFilename ++ ".erl", write), io:fwrite(S, "~s~n", [erl_prettypr:format(erl_syntax:form_list(AC))]). usage() -> io:format("usage: erlyweb_app AppName\n"), halt(1).
To build it,
> erlc -I /opt/local/lib/yaws/include erlyweb_app.erl
The erlyweb_app.erl is almost escript ready, but I use it as module functions currently. It's pre-compiled and erlyweb_app.beam is placed under script/ebin
So, I start myapp by steps:
cd \myproject yaws -i -sname yaws 1> erlyweb_app:build(myapp).
The erlyweb_app.erl is almost escript ready, but I use it as module functions currently. It's pre-compiled and erlyweb_app.beam is placed under script/ebin
After I made changes to myapp, I run above erlyweb_app:build(myapp). again, then everything is up to date.
This is surely not the best way to handle the write-compile-run-test cycle, I'll improve the scripting to let starting yaws as a node, then hot-swap the compiled code to it.
It's a good experience to play with Rails, I like rake db:migrate, script, config folders of Rails. And Grails also brings some good idea to manage web app project tree. I'll try to bring them into project manager of ErlyBird.
Next part, I'll talk about the magic behind Erlyweb, and why I prefer the magic of Erlyweb to Rails.
From Rails to Erlyweb - Part I
Updated July 20 2007: new params.erl which uses dict to store params and converts to proper type(integer/float/string)
It's time to migrate our project from Rails to Erlyweb (It cost me one month to write an Erlang IDE before this happens :-)). I'll blog some tips of the procedure, focus on the differences between Rails and erlyweb.
1.How to handle params
Rails:page = params.fetch(:p, 1).to_i size = params.fetch(:s, @@RECORDS_PER_PAGE).to_iErlyweb:
-module(mymodel_controller).. -define(RECORDS_PER_PAGE, 9). list(A) -> P = params:from_yaws_arg(A) Page = params:get("p", P, 1), %% return integer Size = params:get("s", P, ?RECORDS_PER_PAGE), ....
You can also
Id = params:get("id", P, integer) %% return integer() Id = params:get("id", P) %% return string() %% Or, pass a fun to return the converted result Id = params:get("id", P, fun string:to_integer/1)To get the above code working, I wrote a params.erl and placed under folder: apps\myapp\lib.
-module(params). -export([ from_yaws_arg/1, from_list/1, get_page_opts/1, get_page_extras/1, get/2, get/3, get/4, convert/2, put/3, join/1 ]). %% @doc Convert yaws Arg#arg.querydata into a {key,value} stored in a dynamic hash. from_yaws_arg(Arg) -> Params = yaws_api:parse_query(Arg), dict:from_list(Params). from_list(PropsList) -> dict:from_list(PropsList). get_page_opts(Dict) -> [{page, get("p", Dict, 1)}, {page_size, get("s", Dict, ?DEFAULT_PAGESIZE)}]. get_page_extras(Dict) -> Page = get("p", Dict, 1), Size = get("s", Dict, ?DEFAULT_PAGESIZE), Offset = (Page - 1) * Size, [{limit, Offset, Size}]. get(Key, Dict) -> get(Key, Dict, undefined). %% @spec get(Key::term(), Arg:yawsArg(), Default::term() -> string() get(Key, Dict, Fun) when is_function(Fun) -> case get(Key, Dict, undefined) of undefined -> undefined; Value -> Fun(Value) end; get(Key, Dict, Type) when is_atom(Type) andalso Type /= undefined -> get(Key, Dict, undefined, Type); get(Key, Dict, Default) -> case dict:find(Key, Dict) of {ok, Value} when is_list(Default) -> convert(Value, string); {ok, Value} when is_integer(Default) -> convert(Value, integer); {ok, Value} when is_float(Default) -> convert(Value, float); {ok, Value} -> Value; error -> Default end. get(Key, Dict, Default, Type) -> Value = get(Key, Dict, Default), convert(Value, Type). convert(Value, _Type) when Value == undefined -> undefined; convert(Value, Type) -> try case Type of integer when is_list(Value) -> list_to_integer(Value); integer when is_integer(Value) -> Value; integer when is_float(Value) -> erlang:round(Value); float when is_list(Value) -> list_to_float(Value); float when is_integer(Value) -> Value * 1.0; float when is_float(Value) -> Value; string when is_list(Value) -> Value; string when is_integer(Value) -> integer_to_list(Value); string when is_float(Value) -> float_to_list(Value); number when is_list(Value) andalso Value /= "" andalso Value /= "NaN" -> case string:chr(Value, $.) > 0 of true -> list_to_float(Value); false -> list_to_integer(Value) end; number when is_integer(Value) -> Value; number when is_float(Value) -> Value end catch error:_ -> undefined end. put(Key, Value, Dict) -> dict:store(Key, Value, Dict). join(Params) -> ParamPairedStrs = dict:fold( fun (Key, Value, Acc) -> [Key ++ "=" ++ yaws_api:url_encode(Value)|Acc] end, [], Params), %% Join params string with "&": ["a=a1", "b=b1"] => "a=a1&b=b1" join("&", ParamPairedStrs). %% @spec join(string(), list(string())) -> string() join(String, List) -> join(String, List, []). join(_String, [], Joined) -> Joined; join(String, [H|[]], Joined) -> join(String, [], Joined ++ H); join(String, [H|T], Joined) -> join(String, T, Joined ++ H ++ String).