close
文章出處

當我們說Actor生命周期的時候,我們能看到Actor能被很多種方式停掉(用ActorSystem.stop或ActorContext.stop或發送一個PoisonPill - 也有一個killgracefulstop)。

無論Actor是怎么死的,有些情況一些系統中的其他actor想要知道。讓我們舉一個Actor與數據庫交互的例子 - 我們叫它RepositoryActor。很明顯,會有一些其他actor會向這個RepositoryActor發送消息。這些有“興趣”的Actor很愿意留個eye或者看(watch)這個actor關閉時的消息。這個在Actor里面叫DeathWatch。這個用來watchunwatch的方法就是ActorContext.watchActorContext.unwatch。如果已經監視了,這些watcher會在Actor關閉時收到一個Terminated消息,并可以很舒服的加到他們的receive功能中。

不像Supervision有一個嚴格的父子繼承關系,任何Actor都可以watch任何ActorSystem中的Actor。

讓我們看下。

代碼

QUOTEREPOSITORYACTOR

1.我們的QueryRepositoryActor格言查詢Actor保存了一個quote的列表并且在收到一個QuoteRepositoryRequest時隨機返回一條。

  1. 他記錄了收到消息的個數,如果收到超過3個消息,他用PoisonPill把自己殺掉

這沒啥神奇的。

package me.rerun.akkanotes.deathwatch

import akka.actor.{PoisonPill, Actor, ActorLogging, actorRef2Scala}  
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol._  
import scala.util.Random

class QuoteRepositoryActor() extends Actor with ActorLogging {

  val quotes = List(
    "Moderation is for cowards",
    "Anything worth doing is worth overdoing",
    "The trouble is you think you have time",
    "You never gonna know if you never even try")

  var repoRequestCount:Int=1

  def receive = {

    case QuoteRepositoryRequest => {

      if (repoRequestCount>3){
        self!PoisonPill
      }
      else {
        //Get a random Quote from the list and construct a response
        val quoteResponse = QuoteRepositoryResponse(quotes(Random.nextInt(quotes.size)))

        log.info(s"QuoteRequest received in QuoteRepositoryActor. Sending response to Teacher Actor $quoteResponse")
        repoRequestCount=repoRequestCount+1
        sender ! quoteResponse
      }

    }

  }

}

TEACHERACTORWATCHER

一樣的,TeacherActorWatcher也沒啥神奇的,除了他創建了一個QuoteRepositoryActor并且用context.watch觀察。

package me.rerun.akkanotes.deathwatch

import akka.actor.{Terminated, Props, Actor, ActorLogging}  
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest  
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol.QuoteRepositoryRequest

class TeacherActorWatcher extends Actor with ActorLogging {

  val quoteRepositoryActor=context.actorOf(Props[QuoteRepositoryActor], "quoteRepositoryActor")
  context.watch(quoteRepositoryActor)


  def receive = {
    case QuoteRequest => {
      quoteRepositoryActor ! QuoteRepositoryRequest
    }
    case Terminated(terminatedActorRef)=>{
      log.error(s"Child Actor {$terminatedActorRef} Terminated")
    }
  }
}

測試CASE

這里會有點意思。我從來沒想過這個可以被測試。akka-testkit。我們會分析下這三個測試CASE:

1. 斷言如果觀察到已經收到Terminated消息

QuoteRepositoryActor應該在收到第四條消息時給測試case發送一條Terminated消息。前三條應該是可以的。

"A QuoteRepositoryActor" must {
    ...
    ...
    ...

    "send back a termination message to the watcher on 4th message" in {
      val quoteRepository=TestActorRef[QuoteRepositoryActor]

      val testProbe=TestProbe()
      testProbe.watch(quoteRepository) //Let's watch the Actor

      within (1000 millis) {
        var receivedQuotes = List[String]()
        (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
        receiveWhile() {
          case QuoteRepositoryResponse(quoteString) => {
            receivedQuotes = receivedQuotes :+ quoteString
          }
        }

        receivedQuotes.size must be (3)
        println(s"receiveCount ${receivedQuotes.size}")

        //4th message
        quoteRepository!QuoteRepositoryRequest
        testProbe.expectTerminated(quoteRepository)  //Expect a Terminated Message
      }
    }

2.如果沒有觀察(watched/unwatched)到則斷言沒收到Terminated消息

事實上,我們做這個只是演示下context.unwatch。如果我們移掉testProbe.watch和testProbe.unwatch這行,則測試case會運行的很正常。

    "not send back a termination message on 4th message if not watched" in {
      val quoteRepository=TestActorRef[QuoteRepositoryActor]

      val testProbe=TestProbe()
      testProbe.watch(quoteRepository) //watching

      within (1000 millis) {
        var receivedQuotes = List[String]()
        (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
        receiveWhile() {
          case QuoteRepositoryResponse(quoteString) => {
            receivedQuotes = receivedQuotes :+ quoteString
          }
        }

        testProbe.unwatch(quoteRepository) //not watching anymore
        receivedQuotes.size must be (3)
        println(s"receiveCount ${receivedQuotes.size}")

        //4th message
        quoteRepository!QuoteRepositoryRequest
        testProbe.expectNoMsg() //Not Watching. No Terminated Message
      }
    }

3. 在TeacherActorWatcher中斷言收到了Terminated消息

我們訂閱了EventStream并通過檢查一個特殊的日志消息來斷言termination。

   "end back a termination message to the watcher on 4th message to the TeacherActor" in {

      //This just subscribes to the EventFilter for messages. We have asserted all that we need against the QuoteRepositoryActor in the previous testcase
      val teacherActor=TestActorRef[TeacherActorWatcher]

      within (1000 millis) {
        (1 to 3).foreach (_=>teacherActor!QuoteRequest) //this sends a message to the QuoteRepositoryActor

        EventFilter.error (pattern="""Child Actor .* Terminated""", occurrences = 1).intercept{
          teacherActor!QuoteRequest //Send the dangerous 4th message
        }
      }
    }

EventFilter中的pattern屬性,沒啥奇怪的,需要一個正則表達式。正則pattern="""Child Actor .* Terminated"""用來匹配一條格式是Child Actor {Actor[akka://TestUniversityMessageSystem/user/$$d/quoteRepositoryActor#-1905987636]} Terminated日志信息。

Github

與往常一樣,代碼在github。看下deathwatch的包。


文章來自微信平臺「麥芽面包」(微信掃描二維碼關注)。未經允許,禁止轉載。


文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 AutoPoster 的頭像
    AutoPoster

    互聯網 - 大數據

    AutoPoster 發表在 痞客邦 留言(0) 人氣()