Akka_Note_1           

Akka_Note_1


-----------


1 Ping-Pong Example

import akka.actor._

trait gMessage
case class startGame() extends gMessage
case class stopGame() extends gMessage
case class move() extends gMessage

class Ping(pong:ActorRef) extends Actor{
  
  var count : Int = _
  def increate = {
    count += 1
    println("move : " + count)
  }
  
  def receive = {
    case  msg:startGame => {
      count = 0
      pong ! new move
      increate
    }  
    case msg:move => {
      if(count > 100) {
        sender ! new stopGame
        context.stop(self)
      }
      else {
        increate
        sender ! new move
        
      } 
    }
  } 
}

class Pong() extends Actor{
  
  def receive = {
    case msg:move => {
      sender ! new move
      println("fuck !")
    }
    case msg:stopGame => {
      context.stop(self)
    }
  }
}



object MMain {

  def main(args: Array[String]) {
    val system = ActorSystem("pingpong")
    val pongActor = system.actorOf(Props[Pong],name="pong")
    val pingActor = system.actorOf(Props(new Ping(pongActor)),name="ping")
    
    pingActor ! new startGame
    
  }
}

-----------


2 a "future" example

--1

// please refer : http://www.scala-lang.org/api/current/#scala.concurrent.package 
// for more detail

// please refer : http://docs.scala-lang.org/overviews/core/futures.html
// for more example

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random
import scala.util.{ Success, Failure }

object MMain {
  def main(args: Array[String]): Unit = {

    println("1-Main - Thread Name : " + Thread.currentThread().getName)

    val firstAardvark: Future[Int] = Future {
      val wordSource = scala.io.Source.fromFile("/etc/dictionaries-common/words")
      println("First AardVark - Thread Name : " + Thread.currentThread().getName)
      wordSource.toSeq.indexOfSlice("aardvark")
    }

    val firstZebra: Future[Int] = Future {
      val workSource = scala.io.Source.fromFile("/etc/dictionaries-common/words")
      println("First Source - Thread Name : " + Thread.currentThread().getName)
      workSource.toSeq.indexOfSlice("zebra")
    }

    val animalRange = for {
      aardvark <- firstAardvark
      zebra <- firstZebra
    } yield zebra - aardvark

    println("2- Main - Thread Name : " + Thread.currentThread().getName)

    animalRange.onSuccess {
      case result if result > 50000 => {
        println(result + "It too long between aardvark & zebra")
        println("onSuccess - Thread Name : " + Thread.currentThread().getName)
      }
    }

    //让MainThread sleep 防止其他线程还没获得执行权限就退出了
    Thread.sleep(1000)

  }
}

Output ::

1-Main - Thread Name : main
2- Main - Thread Name : main
First Source - Thread Name : ForkJoinPool-1-worker-3
First AardVark - Thread Name : ForkJoinPool-1-worker-5
797292It too long between aardvark & zebra
onSuccess - Thread Name : ForkJoinPool-1-worker-3


-----------

Promise


    
    //通过Promise创建Future实例:一个Future实例总是和一个(也只能和一个)Promise实例关联
    
    //构建一个Promise
    val taxcut: Promise[TaxCut] = Promise[TaxCut]()
    //获取Promise的Future
    val taxcutF: Future[TaxCut] =taxcut.future
    
    //返回的Future可能并不和Promise一样,但同一个Promise上调用future方法总能返回同一个对象
    //以确保Promise和Future之间是一对一的关系
    
    //结束承诺
    // 兑现承诺,调用success方法,并传递一个大家期许的结果
    taxcut.success(TaxCut(20))
    // 这样之后,Promise就无法再写入其他值了,如果偏要写,会产生异常
    // 此时,和Promise关联的Future也成功完成,注册的回调会开始执行,或者说对这个Future进行了映射,
    // 那这个时候,映射函数也该执行了
    
    // 一般来说,Promise的完成和对返回的Future的处理发生在不同的线程.很可能你创建了Promise,
    // 并立即返回了和它关联的Future给调用者,而实际上,另外一个线程还在计算它



example : 

    case class TaxCut(reduction: Int)


object Government {

  def redeemCampaginPledge(): Future[TaxCut] = {
    val p = Promise[TaxCut]()
     //调用Future伴生对象,新Thread
    Future {
      println("[Thread :" + Thread.currentThread().getName + "  ] : starting the new legislative period")
      Thread.sleep(2000)
      if (Random.nextBoolean()) {
        p.success(TaxCut(20))
        println("[Thread :" + Thread.currentThread().getName + "  ] : We reduced the taxes ! You must reelect us !!! 1111")
      } else {
        p.failure(new Exception("no money"))
        println("[Thread :" + Thread.currentThread().getName + "  ] :We did not reduce the taxes ! www .....")
      }
    }
    p.future
  }
}



object MMain {
  def main(args: Array[String]): Unit = {
    
    //Main Thread获取Future引用,但计算过程在上面伴生对象创建的线程,并非调用者所在线程
    val taxCutF = Government.redeemCampaginPledge()

    println("[Thread :" + Thread.currentThread().getName + "  ] : Now that they're elected,let's see if they remember their promises ...")

    taxCutF.onComplete {
      case Success(TaxCut(reduction)) =>
        println(s"[Thread : ${Thread.currentThread().getName} ]A miracle ! They really cut our taxes by $reduction percentage points")
      case Failure(ex) =>
        println(s"[Thread : ${Thread.currentThread().getName} ]They broke their promises! Agian ! Because of a ${ex.getMessage}")
    }
    
    Thread.sleep(4000)
    
    println("[Thread :" + Thread.currentThread().getName + "  ] : they are government !!")
  }


Output :

一种可能:

[Thread :main  ] : Now that they're elected,let's see if they remember their promises ...
[Thread :ForkJoinPool-1-worker-5  ] : starting the new legislative period
[Thread :ForkJoinPool-1-worker-5  ] :We did not reduce the taxes ! www .....
[Thread : ForkJoinPool-1-worker-3 ]They broke their promises! Agian ! Because of a no money
[Thread :main  ] : they are government !!

令一种可能:
[Thread :main  ] : Now that they're elected,let's see if they remember their promises ...
[Thread :ForkJoinPool-1-worker-5  ] : starting the new legislative period
[Thread :ForkJoinPool-1-worker-5  ] :We did not reduce the taxes ! www .....
[Thread : ForkJoinPool-1-worker-3 ]They broke their promises! Agian ! Because of a no money
[Thread :main  ] : they are government !!

感觉一句话,Promise只是许下的承诺pRef,承诺的实现过程是在另一个线程[Future{.. p.success/failure...}]里并行计算的,结果出来后回调pRef.onComplete






pre>