當我們說Actor生命周期的時候,我們能看到Actor能被很多種方式停掉(用ActorSystem.stop或ActorContext.stop或發送一個PoisonPill - 也有一個kill和gracefulstop)。
無論Actor是怎么死的,有些情況一些系統中的其他actor想要知道。讓我們舉一個Actor與數據庫交互的例子 - 我們叫它RepositoryActor。很明顯,會有一些其他actor會向這個RepositoryActor發送消息。這些有“興趣”的Actor很愿意留個eye或者看(watch)這個actor關閉時的消息。這個在Actor里面叫DeathWatch。這個用來watch和unwatch的方法就是ActorContext.watch和ActorContext.unwatch。如果已經監視了,這些watcher會在Actor關閉時收到一個Terminated消息,并可以很舒服的加到他們的receive功能中。
不像Supervision有一個嚴格的父子繼承關系,任何Actor都可以watch任何ActorSystem中的Actor。
讓我們看下。
代碼
QUOTEREPOSITORYACTOR
1.我們的QueryRepositoryActor格言查詢Actor保存了一個quote的列表并且在收到一個QuoteRepositoryRequest時隨機返回一條。
- 他記錄了收到消息的個數,如果收到超過3個消息,他用PoisonPill把自己殺掉
這沒啥神奇的。
package me.rerun.akkanotes.deathwatch
import akka.actor.{PoisonPill, Actor, ActorLogging, actorRef2Scala}
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol._
import scala.util.Random
class QuoteRepositoryActor() extends Actor with ActorLogging {
val quotes = List(
"Moderation is for cowards",
"Anything worth doing is worth overdoing",
"The trouble is you think you have time",
"You never gonna know if you never even try")
var repoRequestCount:Int=1
def receive = {
case QuoteRepositoryRequest => {
if (repoRequestCount>3){
self!PoisonPill
}
else {
//Get a random Quote from the list and construct a response
val quoteResponse = QuoteRepositoryResponse(quotes(Random.nextInt(quotes.size)))
log.info(s"QuoteRequest received in QuoteRepositoryActor. Sending response to Teacher Actor $quoteResponse")
repoRequestCount=repoRequestCount+1
sender ! quoteResponse
}
}
}
}
TEACHERACTORWATCHER
一樣的,TeacherActorWatcher也沒啥神奇的,除了他創建了一個QuoteRepositoryActor并且用context.watch觀察。
package me.rerun.akkanotes.deathwatch
import akka.actor.{Terminated, Props, Actor, ActorLogging}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol.QuoteRepositoryRequest
class TeacherActorWatcher extends Actor with ActorLogging {
val quoteRepositoryActor=context.actorOf(Props[QuoteRepositoryActor], "quoteRepositoryActor")
context.watch(quoteRepositoryActor)
def receive = {
case QuoteRequest => {
quoteRepositoryActor ! QuoteRepositoryRequest
}
case Terminated(terminatedActorRef)=>{
log.error(s"Child Actor {$terminatedActorRef} Terminated")
}
}
}
測試CASE
這里會有點意思。我從來沒想過這個可以被測試。akka-testkit。我們會分析下這三個測試CASE:
1. 斷言如果觀察到已經收到Terminated消息
QuoteRepositoryActor應該在收到第四條消息時給測試case發送一條Terminated消息。前三條應該是可以的。
"A QuoteRepositoryActor" must {
...
...
...
"send back a termination message to the watcher on 4th message" in {
val quoteRepository=TestActorRef[QuoteRepositoryActor]
val testProbe=TestProbe()
testProbe.watch(quoteRepository) //Let's watch the Actor
within (1000 millis) {
var receivedQuotes = List[String]()
(1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
receiveWhile() {
case QuoteRepositoryResponse(quoteString) => {
receivedQuotes = receivedQuotes :+ quoteString
}
}
receivedQuotes.size must be (3)
println(s"receiveCount ${receivedQuotes.size}")
//4th message
quoteRepository!QuoteRepositoryRequest
testProbe.expectTerminated(quoteRepository) //Expect a Terminated Message
}
}
2.如果沒有觀察(watched/unwatched)到則斷言沒收到Terminated消息
事實上,我們做這個只是演示下context.unwatch。如果我們移掉testProbe.watch和testProbe.unwatch這行,則測試case會運行的很正常。
"not send back a termination message on 4th message if not watched" in {
val quoteRepository=TestActorRef[QuoteRepositoryActor]
val testProbe=TestProbe()
testProbe.watch(quoteRepository) //watching
within (1000 millis) {
var receivedQuotes = List[String]()
(1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
receiveWhile() {
case QuoteRepositoryResponse(quoteString) => {
receivedQuotes = receivedQuotes :+ quoteString
}
}
testProbe.unwatch(quoteRepository) //not watching anymore
receivedQuotes.size must be (3)
println(s"receiveCount ${receivedQuotes.size}")
//4th message
quoteRepository!QuoteRepositoryRequest
testProbe.expectNoMsg() //Not Watching. No Terminated Message
}
}
3. 在TeacherActorWatcher中斷言收到了Terminated消息
我們訂閱了EventStream并通過檢查一個特殊的日志消息來斷言termination。
"end back a termination message to the watcher on 4th message to the TeacherActor" in {
//This just subscribes to the EventFilter for messages. We have asserted all that we need against the QuoteRepositoryActor in the previous testcase
val teacherActor=TestActorRef[TeacherActorWatcher]
within (1000 millis) {
(1 to 3).foreach (_=>teacherActor!QuoteRequest) //this sends a message to the QuoteRepositoryActor
EventFilter.error (pattern="""Child Actor .* Terminated""", occurrences = 1).intercept{
teacherActor!QuoteRequest //Send the dangerous 4th message
}
}
}
EventFilter中的pattern屬性,沒啥奇怪的,需要一個正則表達式。正則pattern="""Child Actor .* Terminated"""用來匹配一條格式是Child Actor {Actor[akka://TestUniversityMessageSystem/user/$$d/quoteRepositoryActor#-1905987636]} Terminated日志信息。
Github
與往常一樣,代碼在github。看下deathwatch的包。
文章來自微信平臺「麥芽面包」(微信掃描二維碼關注)。未經允許,禁止轉載。
文章列表
![]() |
不含病毒。www.avast.com |