Changeset 260:4a9f47a6afbb
- Timestamp:
- 04/30/2010 09:25:46 AM (4 months ago)
- Author:
- dcaoyuan
- Branch:
- default
- Message:
-
Simplied the logic of SelectActor?
- Location:
- blogbird/src/main/scala/org/aiotrade/httpd
- Files:
-
Legend:
- Unmodified
- Added
- Removed
-
|
r259
|
r260
|
|
| 7 | 7 | import scala.actors.Actor |
| 8 | 8 | import Debugger._ |
| 9 | | |
| 10 | 9 | |
| 11 | 10 | /** |
| … |
… |
|
| 35 | 34 | def act = loop { |
| 36 | 35 | react { |
| 37 | | case Read(callback) => |
| | 36 | case Read(selector) => |
| 38 | 37 | try { |
| 39 | 38 | val hasRequest = readMoreInput |
| 40 | | callback(hasRequest) |
| | 39 | |
| | 40 | // add back this connection to the read selector. |
| | 41 | selector.addListener(this) |
| | 42 | |
| | 43 | // Was enough input read to complete a request? |
| | 44 | if (hasRequest) { |
| | 45 | selector.asInstanceOf[{def produce(o: Request)}].produce(takeRequest) |
| | 46 | } |
| 41 | 47 | } catch {case ex: IOException => debugln(ex.getMessage)} |
| 42 | 48 | |
| 43 | | case Write(callback) => |
| | 49 | case Write(selector) => |
| | 50 | debugln(" ** Sending reply!") |
| 44 | 51 | try { |
| 45 | 52 | val finished = flushWriteBuffers |
| 46 | | callback(finished) |
| | 53 | if (finished) { |
| | 54 | close |
| | 55 | } else { |
| | 56 | // add back this connection to the write selector. |
| | 57 | selector.addListener(this) |
| | 58 | } |
| 47 | 59 | } catch {case ex: IOException => debugln(ex.getMessage)} |
| 48 | 60 | } |
-
|
r259
|
r260
|
|
| 66 | 66 | |
| 67 | 67 | private object selectActor extends SelectActor[ClientConnection](SelectionKey.OP_READ) { |
| 68 | | |
| 69 | | def fireEvent(key: SelectionKey, conn: ClientConnection) { |
| 70 | | // Read whatever is available. |
| 71 | | conn ! Read((hasRequest) => { |
| 72 | | // Return this connection to the read selector. |
| 73 | | addListener(conn) |
| 74 | | |
| 75 | | // Was enough input read to complete a request? |
| 76 | | if (hasRequest) { |
| 77 | | produce(conn.takeRequest) |
| 78 | | } |
| 79 | | }) |
| 80 | | } |
| | 68 | def produce(o: Request) = ConnectionManager.produce(o) |
| 81 | 69 | } |
| 82 | 70 | |
| … |
… |
|
| 127 | 115 | } |
| 128 | 116 | |
| 129 | | private object selectActor extends SelectActor[ClientConnection](SelectionKey.OP_WRITE) { |
| 130 | | |
| 131 | | def fireEvent(key: SelectionKey, conn: ClientConnection) { |
| 132 | | if (!conn.isOpen) { |
| 133 | | conn.close |
| 134 | | } |
| 135 | | |
| 136 | | debugln(" ** Sending reply!") |
| 137 | | conn ! Write(finished => { |
| 138 | | if (!finished) { |
| 139 | | addListener(conn) |
| 140 | | } else { |
| 141 | | conn.close |
| 142 | | } |
| 143 | | }) |
| 144 | | } |
| 145 | | } |
| | 117 | private object selectActor extends SelectActor[ClientConnection](SelectionKey.OP_WRITE) |
| 146 | 118 | |
| 147 | 119 | protected def consume(reply: Reply) { |
-
|
r259
|
r260
|
|
| 3 | 3 | import java.nio.channels.CancelledKeyException |
| 4 | 4 | import java.nio.channels.SelectableChannel |
| 5 | | import java.nio.channels.SelectionKey |
| 6 | 5 | import java.nio.channels.Selector |
| 7 | 6 | import java.nio.channels.spi.SelectorProvider |
| … |
… |
|
| 10 | 9 | import scala.collection.JavaConversions._ |
| 11 | 10 | |
| 12 | | |
| 13 | | case class Read (callback: Boolean => Unit) |
| 14 | | case class Write(callback: Boolean => Unit) |
| | 11 | abstract class SelectionEvent[T <: WithSelectableChannel](sender: SelectActor[T]) |
| | 12 | case class Read [T <: WithSelectableChannel](sender: SelectActor[T]) extends SelectionEvent(sender) |
| | 13 | case class Write [T <: WithSelectableChannel](sender: SelectActor[T]) extends SelectionEvent(sender) |
| | 14 | case class Unknown[T <: WithSelectableChannel](sender: SelectActor[T]) extends SelectionEvent(sender) |
| 15 | 15 | |
| 16 | 16 | abstract class SelectActor[T <: WithSelectableChannel](ops: Int) extends Actor { |
| 17 | | def fireEvent(key: SelectionKey, listerner: T) |
| 18 | 17 | |
| 19 | 18 | private var done = false |
| | 19 | // @Note tried using ConcurrentMap here, but no success |
| 20 | 20 | private val newListeners = new LinkedBlockingQueue[T] |
| 21 | 21 | |
| … |
… |
|
| 33 | 33 | |
| 34 | 34 | def addListener(listener: T) { |
| 35 | | newListeners.put(listener) |
| | 35 | newListeners put listener |
| 36 | 36 | selector.wakeup |
| 37 | 37 | } |
| … |
… |
|
| 43 | 43 | def act { |
| 44 | 44 | while (!done) { |
| 45 | | selector.select // blocking call |
| | 45 | selector.select // blocking call, you can think it's something like actor's receive |
| 46 | 46 | |
| 47 | 47 | val keys = selector.selectedKeys.iterator |
| … |
… |
|
| 52 | 52 | if (key.isValid) { |
| 53 | 53 | val listener = key.attachment.asInstanceOf[T] |
| | 54 | val event = |
| | 55 | if (key.isReadable) { |
| | 56 | Read(this) |
| | 57 | } else if (key.isWritable) { |
| | 58 | Write(this) |
| | 59 | } else Unknown(this) |
| 54 | 60 | |
| 55 | | // Remove the key so we don't immediately loop around to |
| 56 | | // race on the same connection. |
| | 61 | // Remove the key so we don't immediately loop around to race on the same connection. |
| 57 | 62 | key.cancel |
| 58 | 63 | |
| 59 | | fireEvent(key, listener) |
| | 64 | // fire event |
| | 65 | listener ! event |
| 60 | 66 | } |
| 61 | 67 | } |
| … |
… |
|
| 73 | 79 | } |
| 74 | 80 | |
| | 81 | /** |
| | 82 | * override these methods to let me can do something according to |
| | 83 | */ |
| | 84 | protected def postRead (condition: Boolean) {} |
| | 85 | protected def postWrite(condition: Boolean) {} |
| | 86 | |
| 75 | 87 | } |
| 76 | 88 | |
-
|
r259
|
r260
|
|
| 63 | 63 | import java.nio.channels.SelectionKey |
| 64 | 64 | import org.aiotrade.httpd.EmptyHostRouter |
| 65 | | import org.aiotrade.httpd.Write |
| 66 | 65 | import org.aiotrade.httpd.HostRouter |
| 67 | 66 | import org.aiotrade.httpd.Process |
| 68 | | import org.aiotrade.httpd.Read |
| 69 | 67 | import org.aiotrade.httpd.Reply |
| 70 | 68 | import org.aiotrade.httpd.Request |
| … |
… |
|
| 128 | 126 | |
| 129 | 127 | private object selectActor extends SelectActor[ClientConnection](SelectionKey.OP_READ) { |
| 130 | | |
| 131 | | def fireEvent(key: SelectionKey, conn: ClientConnection) { |
| 132 | | // Read whatever is available. |
| 133 | | conn ! Read((hasRequest) => { |
| 134 | | // Return this connection to the read selector. |
| 135 | | addListener(conn) |
| 136 | | |
| 137 | | // Was enough input read to complete a request? |
| 138 | | if (hasRequest) { |
| 139 | | produce(conn.takeRequest) |
| 140 | | } |
| 141 | | }) |
| 142 | | } |
| | 128 | def produce(o: Request) = ConnectionManager.produce(o) |
| 143 | 129 | } |
| 144 | 130 | |
| … |
… |
|
| 203 | 189 | } |
| 204 | 190 | |
| 205 | | private object selectActor extends SelectActor[ClientConnection](SelectionKey.OP_WRITE) { |
| 206 | | val intersOpts = SelectionKey.OP_WRITE |
| 207 | | |
| 208 | | def fireEvent(key: SelectionKey, conn: ClientConnection) { |
| 209 | | if (!conn.isOpen) { |
| 210 | | conn.close |
| 211 | | } |
| 212 | | |
| 213 | | debugln(" ** Sending reply!") |
| 214 | | conn ! Write(finished => { |
| 215 | | if (!finished) { |
| 216 | | addListener(conn) |
| 217 | | } else { |
| 218 | | conn.close |
| 219 | | } |
| 220 | | }) |
| 221 | | } |
| 222 | | } |
| | 191 | private object selectActor extends SelectActor[ClientConnection](SelectionKey.OP_WRITE) |
| 223 | 192 | |
| 224 | 193 | def run { |