Posts for the month of April 2009

AIOTrade: Progress of Migrating to Scala - DSL and Parallel Computing

Well, I've migrated the core math, indicator parts of AIOTrade to Scala. There are two noticeable advances for new AIOTrade.

The first is the look and feel of indicator writing. Since Scala is suitable for DSL, now the indicator looks like:

class ARBRIndicator extends ContIndicator {
    _sname = "AR/BR"
    _grids = Array(50f, 200f)
    
    val period = Factor("Period", 10)
    
    val up = Var[Float]("up")
    val dn = Var[Float]("dn")
    val bs = Var[Float]("bs")
    val ss = Var[Float]("ss")
    
    val ar = Var[Float]("AR", Plot.Line)
    val br = Var[Float]("BR", Plot.Line)
    
    def computeCont(begIdx:Int, size:Int) {
        for (i <- begIdx until size) {
            up(i) = H(i) - O(i)
            val up_sum_i = sum(i, up, period)
            
            dn(i) = O(i) - L(i)
            val dn_sum_i = sum(i, dn, period)
            
            ar(i) = up_sum_i / dn_sum_i * 100
            
            val bs_tmp = H(i) - C(i)
            bs(i) = Math.max(0, bs_tmp)
            val bs_sum_i = sum(i, bs, period)
            
            val ss_tmp = C(i) - L(i)
            ss(i) = Math.max(0, ss_tmp)
            val ss_sum_i = sum(i, ss, period)
            
            br(i) = bs_sum_i / ss_sum_i * 100
        }
    }   
}  

Vs Java one:

public class ARBRIndicator extends ContIndicator {
    _sname = "AR/BR";
    _grids = new Float[] {50f, 200f};
    
    Opt period = new DefaultOpt("Period", 10);
    
    Var<Float> up = new DefaultVar("up");
    Var<Float> dn = new DefaultVar("dn");
    Var<Float> bs = new DefaultVar("bs");
    Var<Float> ss = new DefaultVar("ss");
    
    Var<Float> ar = new DefaultVar("AR", Plot.Line);
    Var<Float> br = new DefaultVar("BR", Plot.Line);
    
    
    void computeCont(int begIdx) {
        for (int i = begIdx; i < _itemSize; i++) {
            
            up.set(i, H.get(i) - O.get(i));
            float up_sum_i = sum(i, up, period);
            
            dn.set(i, O.get(i) - L.get(i));
            float dn_sum_i = sum(i, dn, period);
            
            ar.set(i, up_sum_i / dn_sum_i * 100);
            
            float bs_tmp = H.get(i) - C.get(i);
            bs.set(i, Math.max(0, bs_tmp));
            float bs_sum_i = sum(i, bs, period);
            
            float ss_tmp = C.get(i) - L.get(i);
            ss.set(i, Math.max(0, ss_tmp));
            float ss_sum_i = sum(i, ss, period);
            
            br.set(i, bs_sum_i / ss_sum_i * 100);
        }
    }
}

The apply method from Scala simplifies setter/getter, which makes the formulator looks more natural.

The second is by implementing each indicator as Actor, the computing procedure of indicators can be easily distributed to multiple CPU cores, with so few code modification:

case object Compute
trait Computable extends Actor {

    // ----- actor's implementation
    def act = loop {
        react {
            case (Compute, fromTime:Long) => computeFrom(fromTime)
            case _ =>
        }
    }
    // ----- end of actor's implementation

    def computeFrom(time:Long) :Unit
    def computedTime :Long
    
    def factors :ArrayBuffer[Factor]
    def factors_=(factors:ArrayBuffer[Factor]) :Unit
    def factors_=(values:Array[Number]) :Unit
    
    def dispose :Unit
}

Computable is an Interface/Trait with sync method: computeFrom(Long), now by extending Computable with Actor, implementing a simple function act with react message processing block, all indicators (which extended Computable) can benefit from parallel computing now by calling:

indicator ! (Compute, fromTime)

instead of

indicator.computeFrom(time)

I've done some testing on my 4-core machine, which occupied about 380% CPU usage during running. This is, of course, a most easily implementation for parallel computing under JVM so far.

Another bonus is, I do not need to worry about concurrent calling on computeFrom(Long) now, since all calls will be triggered by Compute messages that are sent to actor's message queue, then be processed sequentially, there is no lock needed any more. The key:

Parallel computing actors + Sequential message driven computing per actor

AIOTrade Is Migrating to Scala

Finally, after evaluated Erlang, Scala etc, wrote IDE tools for these languages, I began to migrate AIOTrade from Java to Scala, with the help of Scala for NetBeans of course.

The first step is re-writting basic modules to Scala smoothly, with little functional style code; Then, I'll re-design the APIs by Scala's advanced features, including function, trait, actors etc.

Since AIOTrade is a NetBeans suite project, with several NetBeans style modules integrated, I need a general purpose build.xml to get Scala based modules working. Here are build.xml and scala-build.xml, which can be used to write Scala based NetBeans platform modules.

First, you should create a regular NetBeans module project, then put/replace these ant files under your project's base director. You also need to create 2 NetBeans lib wrapper modules, one is for scala-library.jar, another is for scala-compile.jar as parts of your NetBeans suite project (or, check them out from AIOTrade's source repository)

build.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project name="lib.math" default="netbeans" basedir=".">
    <import file="scala-build.xml"/>
</project>

scala-build.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project name="scala-module" default="netbeans" basedir=".">
    <import file="nbproject/build-impl.xml"/>

    <target name="scala-taskdef" depends="init">
        <property name="scala.library" value="${cluster}/modules/ext/scala-library.jar"/>
        <property name="scala.compiler" value="${cluster}/modules/ext/scala-compiler.jar"/>
        <property name="scala.libs" value="${scala.library}:${scala.compiler}"/>

        <echo message="cluster: ${cluster}"/>
        <echo message="Compiling scala sources via ${scala.library}, ${scala.compiler}"/>

        <taskdef resource="scala/tools/ant/antlib.xml">
            <classpath>
                <pathelement location="${cluster}/modules/ext/scala-library.jar"/>
                <pathelement location="${cluster}/modules/ext/scala-compiler.jar"/>
            </classpath>
        </taskdef>
    </target>

    <property name="jar-excludes" value="**/*.java,**/*.form,**/package.html,**/doc-files/,**/*.scala"/>

    <target name="compile" depends="init,up-to-date,scala-taskdef" unless="is.jar.uptodate">
        <!-- javac's classpath should include scala.library and all these paths of "cp" -->
        <path id="javac.cp">
            <pathelement path="${scala.libs}"/>
            <pathelement path="${module.classpath}"/>
            <pathelement path="${cp.extra}"/>
        </path>
        <!-- scalac will check class dependencies deeply, so we can not rely on public package only which is refed by ${module.classpath} -->
        <path id="scalac.cp">
            <pathelement path="${scala.libs}"/>
            <pathelement path="${module.run.classpath}"/>
            <pathelement path="${cp.extra}"/>
        </path>
        <mkdir dir="${build.classes.dir}"/>
        <depend srcdir="${src.dir}" destdir="${build.classes.dir}" cache="build/depcache">
            <classpath refid="scalac.cp"/>
        </depend>
        <!-- scalac -->
        <scalac srcdir="${src.dir}" destdir="${build.classes.dir}" encoding="UTF-8" target="jvm-${javac.target}">
            <classpath refid="scalac.cp"/>
        </scalac>
        <!-- javac -->
        <nb-javac srcdir="${src.dir}" destdir="${build.classes.dir}" debug="${build.compiler.debug}" debuglevel="${build.compiler.debuglevel}" encoding="UTF-8"
                deprecation="${build.compiler.deprecation}" optimize="${build.compiler.optimize}" source="${javac.source}" target="${javac.target}" includeantruntime="false">
            <classpath refid="javac.cp"/>
            <compilerarg line="${javac.compilerargs}"/>
            <processorpath refid="processor.cp"/>
        </nb-javac>
        <!-- Sanity check: -->
        <pathconvert pathsep=":" property="class.files.in.src">
            <path>
                <fileset dir="${src.dir}">
                    <include name="**/*.class"/>
                </fileset>
            </path>
        </pathconvert>
        <fail>
            <condition>
                <not>
                    <equals arg1="${class.files.in.src}" arg2=""/>
                </not>
            </condition>
            You have stray *.class files in ${src.dir} which you must remove.
            Probably you failed to clean your sources before updating them.
        </fail>
        <!-- OK, continue: -->
        <copy todir="${build.classes.dir}">
            <fileset dir="${src.dir}" excludes="${jar-excludes}"/>
        </copy>
    </target>

    <target name="do-test-build" depends="projectized-common.do-test-build">
        <scalac srcdir="${test.unit.src.dir}" destdir="${build.test.unit.classes.dir}" excludes="${test.excludes}"
               encoding="UTF-8">
            <classpath refid="test.unit.cp"/>
        </scalac>
    </target>
</project>

BTW, the new source code of AIOTrade is controlled under Mercurial version control system on sourceforge.net, you can clone or brows the code at: AIOTrade source repository

Note: The whole project can not be successfully built yet.