注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

分享,态度 ·~~

—— 十年太长,五年;如果可以回到五年前,你最想对那时候的自己说什么?

 
 
 

日志

 
 

Scala Actors: A Short Tutorial  

2009-09-04 14:26:09|  分类: IT/Net |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

Created by phaller on 2007-05-24. Updated: 2008-08-28, 09:46

Introduction
With the advent of multi-core processors concurrent programming is becoming indispensable. Scala's primary concurrency construct is actors. Actors are basically concurrent processes that communicate by exchanging messages. Actors can also be seen as a form of active objects where invoking a method corresponds to sending a message.

The Scala Actors library provides both asynchronous and synchronous message sends (the latter are implemented by exchanging several asynchronous messages). Moreover, actors may communicate using futures where requests are handled asynchronously, but return a representation (the future) that allows to await the reply.

This tutorial is mainly designed as a walk-through of several complete example programs that can be readily compiled and run using Scala 2.4 or newer.
First Example
Our first example consists of two actors that exchange a bunch of messages and then terminate. The first actor sends "ping" messages to the second actor, which in turn sends "pong" messages back (for each received "ping" message one "pong" message).

We start off by defining the messages that are sent and received by our actors. In this case, we can use singleton objects (in more advanced programs, messages are usually parameterized). Since we want to use pattern matching, each message is a case object:

case object Ping
case object Pong
case object Stop
The ping actor starts the message exchange by sending a Ping message to the pong actor. The Pong message is the response from the pong actor. When the ping actor has sent a certain number of Ping messages, it sends a Stop message to the pong actor.

All classes, objects and traits of the Scala actors library reside in the scala.actors package. From this package we import the Actor class that we are going to extend to define our custom actors. Furthermore, we import all members of the Actor object since it contains many useful actor operations:

import scala.actors.Actor
import scala.actors.Actor._
Actors are normal objects that are created by instantiating subclasses of the Actor class. We define the behavior of ping actors by subclassing Actor and implementing its abstract act method:

class Ping(count: int, pong: Actor) extends Actor {
  def act() {
    var pingsLeft = count - 1
    pong ! Ping
    while (true) {
      receive {
        case Pong =>
          if (pingsLeft % 1000 == 0)
            Console.println("Ping: pong")
          if (pingsLeft > 0) {
            pong ! Ping
            pingsLeft -= 1
          } else {
            Console.println("Ping: stop")
            pong ! Stop
            exit()
          }
      }
    }
  }
}
The number of Ping messages to be sent and the pong actor are passed as arguments to the constructor. The call to the receive method inside the infinite loop suspends the actor until a Pong message is sent to the actor. In that case the message is removed from the actor's mailbox and the corresponding action on the right side of the arrow is executed.

In the case where pingsLeft is greater than zero we send a Ping message to pong using the send operator !, and decrement the pingsLeft counter. If the pingsLeft counter has reached zero, we send a Stop message to pong, and terminate the execution of the current actor by calling exit().

The class for our pong actor is defined similarly:

class Pong extends Actor {
  def act() {
    var pongCount = 0
    while (true) {
      receive {
        case Ping =>
          if (pongCount % 1000 == 0)
            Console.println("Pong: ping "+pongCount)
          sender ! Pong
          pongCount = pongCount + 1
        case Stop =>
          Console.println("Pong: stop")
          exit()
      }
    }
  }
}
There is one interesting point to notice, however. When receiving a Ping message, a Pong message is sent to the sender actor, which is not defined anywhere in our class! In fact, it is a method of the Actor class. Using sender, one can refer to the actor that sent the message that the current actor last received. This avoids having to explicitly pass the sender as arguments to messages.

After having defined our actor classes, we are now ready to create a Scala application that uses them:

object pingpong extends Application {
  val pong = new Pong
  val ping = new Ping(100000, pong)
  ping.start
  pong.start
}
Analogous to Java threads, actors have to be started by calling their start method.

Let's run it!
The complete example is included in the Scala distribution under doc/scala-devel/scala/examples/actors/pingpong.scala. Here is how you compile and run it:

$ scalac pingpong.scala
$ scala -cp . examples.actors.pingpong
Pong: ping 0
Ping: pong
Pong: ping 1000
Ping: pong
Pong: ping 2000
...
Ping: stop
Pong: stop
Make it Thread-less!
Actors are executed on a thread pool. Initially, there are 4 worker threads. The thread pool grows if all worker threads are blocked but there are still remaining tasks to be processed. Ideally, the size of the thread pool corresponds to the number of processor cores of the machine.

When actors call thread-blocking operations, such as receive (or even wait), the worker thread that is executing the current actor (self) is blocked. This means basically that the actor is represented as a blocked thread. Depending on the number of actors you want to use, you might want to avoid this, since most JVMs cannot handle more than a few thousand threads on standard hardware.

Thread-blocking operations can be avoided by using react to wait for new messages (the event-based pendant of receive). However, there is a (usually small) price to pay: react never returns. In practice, this means that at the end of a reaction to a message, one has to call some function that contains the rest of the actor's computation. Note that using react inside a while loop does not work! However, since loops are common there is special library support for it in form of a loop function. It can be used like this:

loop {
  react {
    case A => ...
    case B => ...
  }
}
Note that react calls can be nested. This allows to receive several messages in sequence, like this:

react {
  case A => ...
  case B => react { // if we get a B we also want a C
    case C => ...
  }
}

To make our ping and pong actors thread-less, it suffices to simply replace while (true) with loop, and receive with react. For example, here is the modified act method of our pong actor:

def act() {
  var pongCount = 0
  loop {
    react {
      case Ping =>
        if (pongCount % 1000 == 0)
          Console.println("Pong: ping "+pongCount)
        sender ! Pong
        pongCount = pongCount + 1
      case Stop =>
        Console.println("Pong: stop")
        exit()
    }
  }
}
Second Example
In this example, we are going to write an abstraction of producers which provide a standard iterator interface to retrieve a sequence of produced values.

Specific producers are defined by implementing an abstract produceValues method. Individual values are generated using the produce method. Both methods are inherited from class Producer. For example, a producer that generates the values contained in a tree in pre-order can be defined like this:

class PreOrder(n: Tree) extends Producer[int] {
  def produceValues = traverse(n)
  def traverse(n: Tree) {
    if (n != null) {
      produce(n.elem)
      traverse(n.left)
      traverse(n.right)
    }
  }
}

Producers are implemented in terms of two actors, a producer actor, and a coordinator actor. Here is how we can implement the producer actor:

abstract class Producer[T] {
  protected def produceValues: unit

  protected def produce(x: T) {
    coordinator ! Some(x)
    receive { case Next => }
  }

  private val producer: Actor = actor {
    receive {
      case Next =>
        produceValues
        coordinator ! None
    }
  }
  ...
}
Note how the producer actor is defined! This time we did not bother to create an extra subclass of Actor and implement its act method. Instead, we simply define the actor's behavior inline using the actor function. Arguably, this is much more concise! Moreover, actors defined using actor are started automatically--no need to invoke their start method!

So, how does the producer work? When receiving a Next message, it runs the (abstract) produceValues method, which, in turn, calls the produce method. This results in sending a sequence of values, wrapped in Some messages, to the coordinator. The sequence is terminated by a None message. Some and None are the two cases of Scala's standard Option class.

The coordinator synchronizes requests from clients and values coming from the producer. We can implement it like this:

private val coordinator: Actor = actor {
  loop {
    react {
      case Next =>
        producer ! Next
        reply {
          receive { case x: Option[_] => x }
        }
      case Stop => exit('stop)
    }
  }
}
Note that inside the handler for the Next message, we use reply to return a received Option value to some requesting actor. We are going to explain this in the next section, so stay tuned...

The Iterator Interface
We want to make producers usable as normal iterators. For this, we implement an iterator method that returns--surprise!--an iterator. Its hasNext and next methods send messages to the coordinator actor to accomplish their task. Take a look:

def iterator = new Iterator[T] {
  private var current: Any = Undefined
  private def lookAhead = {
    if (current == Undefined) current = coordinator !? Next
    current
  }

  def hasNext: boolean = lookAhead match {
    case Some(x) => true
    case None => { coordinator ! Stop; false }
  }

  def next: T = lookAhead match {
    case Some(x) => current = Undefined; x.asInstanceOf[T]
  }
}

We use a private lookAhead method to implement the iterator logic. As long as the next value has not yet been looked-up, the current variable has value Undefined which is simply a place-holder object:

private val Undefined = new Object
The interesting bit is in the lookAhead method. When the current value is Undefined it means we have to get hold of the next value. For this, we use the synchronous message send operator !?. It sends the Next message to the coordinator, but instead of returning like a normal (asynchronous) message send, it waits for a reply from the coordinator. The reply is the return value of !?. A message that was sent using !? is replied to using reply. Note that simply sending a message to sender does not work! That's because !? waits to receive a message from a private reply channel instead of the mailbox. This is necessary to distinguish "true" replies from "fake" ones resulting from old messages that happen to be in the mailbox.

The producers example is also included in the Scala distribution under doc/scala-devel/scala/examples/actors/producers.scala.

【from http://www.scala-lang.org/node/242

  评论这张
 
阅读(644)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017