Changeset 260:4a9f47a6afbb

Show
Ignore:
Timestamp:
04/30/2010 09:25:46 AM (4 months ago)
Author:
dcaoyuan
Branch:
default
Message:

Simplied the logic of SelectActor?

Location:
blogbird/src/main/scala/org/aiotrade/httpd
Files:
4 modified

Legend:

Unmodified
Added
Removed
  • blogbird/src/main/scala/org/aiotrade/httpd/ClientConnection.scala

    r259 r260  
    77import scala.actors.Actor 
    88import Debugger._ 
    9  
    109 
    1110/** 
     
    3534  def act = loop { 
    3635    react { 
    37       case Read(callback) => 
     36      case Read(selector) => 
    3837        try { 
    3938          val hasRequest = readMoreInput 
    40           callback(hasRequest) 
     39 
     40          // add back this connection to the read selector. 
     41          selector.addListener(this) 
     42 
     43          // Was enough input read to complete a request? 
     44          if (hasRequest) { 
     45            selector.asInstanceOf[{def produce(o: Request)}].produce(takeRequest) 
     46          } 
    4147        } catch {case ex: IOException => debugln(ex.getMessage)} 
    4248         
    43       case Write(callback) => 
     49      case Write(selector) => 
     50        debugln(" ** Sending reply!") 
    4451        try { 
    4552          val finished = flushWriteBuffers 
    46           callback(finished) 
     53          if (finished) { 
     54            close 
     55          } else { 
     56          // add back this connection to the write selector. 
     57            selector.addListener(this) 
     58          } 
    4759        } catch {case ex: IOException => debugln(ex.getMessage)} 
    4860    } 
  • blogbird/src/main/scala/org/aiotrade/httpd/HttpdActors.scala

    r259 r260  
    6666 
    6767  private object selectActor extends SelectActor[ClientConnection](SelectionKey.OP_READ) { 
    68  
    69     def fireEvent(key: SelectionKey, conn: ClientConnection) { 
    70       // Read whatever is available. 
    71       conn ! Read((hasRequest) => { 
    72           // Return this connection to the read selector. 
    73           addListener(conn) 
    74  
    75           // Was enough input read to complete a request? 
    76           if (hasRequest) { 
    77             produce(conn.takeRequest) 
    78           } 
    79         }) 
    80     } 
     68    def produce(o: Request) = ConnectionManager.produce(o) 
    8169  } 
    8270 
     
    127115  } 
    128116 
    129   private object selectActor extends SelectActor[ClientConnection](SelectionKey.OP_WRITE) { 
    130  
    131     def fireEvent(key: SelectionKey, conn: ClientConnection) { 
    132       if (!conn.isOpen) { 
    133         conn.close 
    134       } 
    135  
    136       debugln(" ** Sending reply!") 
    137       conn ! Write(finished => { 
    138           if (!finished) { 
    139             addListener(conn) 
    140           } else { 
    141             conn.close 
    142           } 
    143         }) 
    144     } 
    145   } 
     117  private object selectActor extends SelectActor[ClientConnection](SelectionKey.OP_WRITE) 
    146118 
    147119  protected def consume(reply: Reply) { 
  • blogbird/src/main/scala/org/aiotrade/httpd/SelectActor.scala

    r259 r260  
    33import java.nio.channels.CancelledKeyException 
    44import java.nio.channels.SelectableChannel 
    5 import java.nio.channels.SelectionKey 
    65import java.nio.channels.Selector 
    76import java.nio.channels.spi.SelectorProvider 
     
    109import scala.collection.JavaConversions._ 
    1110 
    12  
    13 case class Read (callback: Boolean => Unit) 
    14 case class Write(callback: Boolean => Unit) 
     11abstract class SelectionEvent[T <: WithSelectableChannel](sender: SelectActor[T]) 
     12case class Read   [T <: WithSelectableChannel](sender: SelectActor[T]) extends SelectionEvent(sender) 
     13case class Write  [T <: WithSelectableChannel](sender: SelectActor[T]) extends SelectionEvent(sender) 
     14case class Unknown[T <: WithSelectableChannel](sender: SelectActor[T]) extends SelectionEvent(sender) 
    1515 
    1616abstract class SelectActor[T <: WithSelectableChannel](ops: Int) extends Actor { 
    17   def fireEvent(key: SelectionKey, listerner: T) 
    1817 
    1918  private var done = false 
     19  // @Note tried using ConcurrentMap here, but no success 
    2020  private val newListeners = new LinkedBlockingQueue[T] 
    2121 
     
    3333 
    3434  def addListener(listener: T) { 
    35     newListeners.put(listener) 
     35    newListeners put listener 
    3636    selector.wakeup 
    3737  } 
     
    4343  def act { 
    4444    while (!done) { 
    45       selector.select // blocking call 
     45      selector.select // blocking call, you can think it's something like actor's receive 
    4646 
    4747      val keys = selector.selectedKeys.iterator 
     
    5252        if (key.isValid) { 
    5353          val listener = key.attachment.asInstanceOf[T] 
     54          val event = 
     55            if (key.isReadable) { 
     56              Read(this) 
     57            } else if (key.isWritable) { 
     58              Write(this) 
     59            } else Unknown(this) 
    5460 
    55           // Remove the key so we don't immediately loop around to 
    56           // race on the same connection. 
     61          // Remove the key so we don't immediately loop around to race on the same connection. 
    5762          key.cancel 
    5863 
    59           fireEvent(key, listener) 
     64          // fire event 
     65          listener ! event 
    6066        } 
    6167      } 
     
    7379  } 
    7480 
     81  /** 
     82   * override these methods to let me can do something according to 
     83   */ 
     84  protected def postRead (condition: Boolean) {} 
     85  protected def postWrite(condition: Boolean) {} 
     86 
    7587} 
    7688 
  • blogbird/src/main/scala/org/aiotrade/httpd/coroutine/HttpdRoutines.scala

    r259 r260  
    6363import java.nio.channels.SelectionKey 
    6464import org.aiotrade.httpd.EmptyHostRouter 
    65 import org.aiotrade.httpd.Write 
    6665import org.aiotrade.httpd.HostRouter 
    6766import org.aiotrade.httpd.Process 
    68 import org.aiotrade.httpd.Read 
    6967import org.aiotrade.httpd.Reply 
    7068import org.aiotrade.httpd.Request 
     
    128126 
    129127  private object selectActor extends SelectActor[ClientConnection](SelectionKey.OP_READ) { 
    130  
    131     def fireEvent(key: SelectionKey, conn: ClientConnection) { 
    132       // Read whatever is available. 
    133       conn ! Read((hasRequest) => { 
    134           // Return this connection to the read selector. 
    135           addListener(conn) 
    136  
    137           // Was enough input read to complete a request? 
    138           if (hasRequest) { 
    139             produce(conn.takeRequest) 
    140           } 
    141         }) 
    142     } 
     128    def produce(o: Request) = ConnectionManager.produce(o) 
    143129  } 
    144130 
     
    203189  } 
    204190 
    205   private object selectActor extends SelectActor[ClientConnection](SelectionKey.OP_WRITE) { 
    206     val intersOpts = SelectionKey.OP_WRITE 
    207  
    208     def fireEvent(key: SelectionKey, conn: ClientConnection) { 
    209       if (!conn.isOpen) { 
    210         conn.close 
    211       } 
    212  
    213       debugln(" ** Sending reply!") 
    214       conn ! Write(finished => { 
    215           if (!finished) { 
    216             addListener(conn) 
    217           } else { 
    218             conn.close 
    219           } 
    220         }) 
    221     } 
    222   } 
     191  private object selectActor extends SelectActor[ClientConnection](SelectionKey.OP_WRITE) 
    223192 
    224193  def run {