失敗更像是分布式系統的一個特性。因此Akka用一個容忍失敗的模型,在你的業務邏輯與失敗處理邏輯(supervision邏輯)中間你能有一個清晰的邊界。只需要一點點工作,這很贊。這就是我們要討論的主題。
ACTOR SUPERVISION
想象一個方法調用了你棧頂的方法但卻出了一個異常。那么在棧下的方法能做什么呢?
- 抓住異常并按順序處理恢復
- 抓住異常,也許記個日志并保持安靜。
- 下層的方法也可以選擇無視這個異常(或者抓住并重扔出來)
想象下一直扔到main方法仍然沒有處理這個異常。這種情況下,程序肯定會輸出一個異常給console后退出。
你可以把同樣的情況套用在線程上。如果一個子線程拋了異常而再假設run或**call*方法沒有處理它,那么這個異常就會期望放在父線程或主線程中解決,無論哪種情況,如果主線程沒有處理他,系統就會退出。
讓我們再看看 - 如果被context.actorof創建出來的子Actor因為一個異常失敗了。父actor(指supervisor)可以處理子actor的任何失敗。如果父actor做了,他可以處理并恢復(Restart/Resume)。另外,把異常傳遞(Escalate)給父actor。 還有一種做法,可以直接stop掉子actor - 這就是那個子actor結局了。 為什么我說父actor(那個supervisor)?這是因為akka的監護方式為家長監護 - 這意味著只有創建了actor的人才能監護他們。
就這么多了!我們已經覆蓋到所有監護指令(Directoives)了。
策略
我忘了說一點: 你已經知道一個Akka Actor可以創建子actor并且子actor也可以隨意創建他們自己的子actor。
現在,想下以下兩個場景:
1.OneForOneStrategy
你的actor創建了很多子actor并且每一個子actor都連接了不同的數據源。假設你運行的是一個將英語翻譯成多種語言的應用。
假設,一個子actor失敗了然而你可以接受在最終結果里跳過這個結果,你想怎么做?關掉這個服務?當然不,你可能想要重啟/關閉這個有問題的子actor。是吧?現在這個策略在Akka的監護策略中叫OneForOneStrategy策略 - 如果一個actor掛了,只單獨處理這個actor。
基于你的業務異常,你可能需要對不同的異常有不同的反應(停止,重啟,升級,恢復)。要配置你自己的策略,你只需要override你Actor類中的supervisorStrategy方法。
聲明OneForOneStrategy的例子
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Stop
class TeacherActorOneForOne extends Actor with ActorLogging {
...
...
override val supervisorStrategy=OneForOneStrategy() {
case _: MinorRecoverableException => Restart
case _: Exception => Stop
}
...
...
2.AllForOneStrategy策略
假設你在做一個外部排序 (這是個又證明了我沒啥創造力的例子!),你的每個塊都被一個不同的actor處理。突然,一個Actor失敗了并拋了一個異常。這樣再往下處理就沒什么意思了因為最終結果肯定是錯的。所以,邏輯就是停止stop所有的actor。
我為什么說stop而不是重啟?因為在這個例子里重啟也沒用,每個actor的mailbox在重啟時并不會被清理。所以,如果我們重啟了,另外的chunk仍然會被處理。這不是我們想要的。重建actor并用新的mailbox在這里是個更合適的策略。
與OneForOneStrategy一樣,只需要用AllForOneStrategy的實現覆寫supervisorStrategy
下面是例子
import akka.actor.{Actor, ActorLogging}
import akka.actor.AllForOneStrategy
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.SupervisorStrategy.Stop
class TeacherActorAllForOne extends Actor with ActorLogging {
...
override val supervisorStrategy = AllForOneStrategy() {
case _: MajorUnRecoverableException => Stop
case _: Exception => Escalate
}
...
...
指令 DIRECTIVES
AllForOneStrategy和OneForOneStrategy的構造方法都接受一個叫Decider的PartialFunction[Throwable,Directive]方法,他把Throwable與Directive指令做了一個映射:
case _: MajorUnRecoverableException => Stop
這就簡單的四個指令 - Stop,Resume,Escalate和Restart
Stop
在異常發生時子actor會停止,任何發給停止的actor的消息都會被轉到deadLetter隊列。
Resume
子actor會忽略拋出異常的消息并且繼續處理隊列中的其他消息。
Restart
子actor會停止并且一個新的actor會初始化。繼續處理mailbox中其他的消息。世界對這個是無感知的因為同樣的ActorRef指向了新的Actor。
Escalate
supervisor復制了失敗并讓他的supervisor處理這個異常。
缺省策略
如果我們的actor沒指定任何策略但是創建了子actor。他們會怎樣處理?Actor會有一個缺省的策略:
override val supervisorStrategy=OneForOneStrategy() {
case _: ActorInitializationException=> Stop
case _: ActorKilledException => Stop
case _: DeathPactException => Stop
case _: Exception => Restart
}
所以,缺省策略處理了四個case:
1. ACTORINITIALIZATIONEXCEPTION => STOP
當actor不能初始化,他會拋出一個ActorInitializationException。actor會被停止。讓我們在preStart調用中模擬下這個:
package me.rerun.akkanotes.supervision
import akka.actor.{ActorSystem, Props}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import akka.actor.Actor
import akka.actor.ActorLogging
object ActorInitializationExceptionApp extends App{
val actorSystem=ActorSystem("ActorInitializationException")
val actor=actorSystem.actorOf(Props[ActorInitializationExceptionActor], "initializationExceptionActor")
actor!"someMessageThatWillGoToDeadLetter"
}
class ActorInitializationExceptionActor extends Actor with ActorLogging{
override def preStart={
throw new Exception("Some random exception")
}
def receive={
case _=>
}
}
Ru
運行ActorInitializationExceptionApp會產生一個ActorInitializationException 異常然后所有的消息都會進deadLetterActor的消息隊列:
Log
[ERROR] [11/10/2014 16:08:46.569] [ActorInitializationException-akka.actor.default-dispatcher-2] [akka://ActorInitializationException/user/initializationExceptionActor] Some random exception
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
...
...
Caused by: java.lang.Exception: Some random exception
at me.rerun.akkanotes.supervision.ActorInitializationExceptionActor.preStart(ActorInitializationExceptionApp.scala:17)
...
...
[INFO] [11/10/2014 16:08:46.581] [ActorInitializationException-akka.actor.default-dispatcher-4] [akka://ActorInitializationException/user/initializationExceptionActor] Message [java.lang.String] from Actor[akka://ActorInitializationException/deadLetters] to Actor[akka://ActorInitializationException/user/initializationExceptionActor#-1290470495] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
2. ACTORKILLEDEXCEPTION => STOP
當Actor被kill消息關閉后,他會拋出一個ActorKilledException。如果拋這個異常,缺省策略會讓子actor停止。看起來停止一個被kill掉的actor沒什么意義。但想想這個:
ActorKilledException 會被傳遞給supervisor。 那么之前我們在DeathWatch里面提到的Actor里的生命周期watch或deathwatchers。 直到Actor被停掉前watcher不會知道任何事情。
給Actor發送kill只會讓那個特定的監管actor知道。用stop處理會暫停那個actor的mailbox,暫停了子actor的mailbox,停止了子actor,發送了Terminated給所有子actor的watcher,發送給所有類一個Terminated,然后actor的watcher都會迅速失敗最終讓Actor自己停止、
package me.rerun.akkanotes.supervision
import akka.actor.{ActorSystem, Props}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Kill
object ActorKilledExceptionApp extends App{
val actorSystem=ActorSystem("ActorKilledExceptionSystem")
val actor=actorSystem.actorOf(Props[ActorKilledExceptionActor])
actor!"something"
actor!Kill
actor!"something else that falls into dead letter queue"
}
class ActorKilledExceptionActor extends Actor with ActorLogging{
def receive={
case message:String=> log.info (message)
}
}
Log
日志說只要ActorKilledException 進來,supervisor就會停掉actor并且消息會進入deadLetter隊列
INFO m.r.a.s.ActorKilledExceptionActor - something
ERROR akka.actor.OneForOneStrategy - Kill
akka.actor.ActorKilledException: Kill
INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ActorKilledExceptionSystem/deadLetters] to Actor[akka://ActorKilledExceptionSystem/user/$a#-1569063462] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
3. DEATHPACTEXCEPTION => STOP
在DeathWatch文中,你可以看到當一個Actor觀察一個子Actor時,他期望在他的receive中處理Terminated消息。如果沒有呢?你會得到一個DeathPactException
代碼演示了supervisorwatch子actor但沒有從子actor處理Terminated消息。
package me.rerun.akkanotes.supervision
import akka.actor.{ActorSystem, Props}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Kill
import akka.actor.PoisonPill
import akka.actor.Terminated
object DeathPactExceptionApp extends App{
val actorSystem=ActorSystem("DeathPactExceptionSystem")
val actor=actorSystem.actorOf(Props[DeathPactExceptionParentActor])
actor!"create_child" //Throws DeathPactException
Thread.sleep(2000) //Wait until Stopped
actor!"someMessage" //Message goes to DeadLetters
}
class DeathPactExceptionParentActor extends Actor with ActorLogging{
def receive={
case "create_child"=> {
log.info ("creating child")
val child=context.actorOf(Props[DeathPactExceptionChildActor])
context.watch(child) //Watches but doesnt handle terminated message. Throwing DeathPactException here.
child!"stop"
}
case "someMessage" => log.info ("some message")
//Doesnt handle terminated message
//case Terminated(_) =>
}
}
class DeathPactExceptionChildActor extends Actor with ActorLogging{
def receive={
case "stop"=> {
log.info ("Actor going to stop and announce that it's terminated")
self!PoisonPill
}
}
}
Log
日志告訴我們DeathPactException 進來了,supervisor停止了actor然后消息都進入了deadLetter的隊列
INFO m.r.a.s.DeathPactExceptionParentActor - creating child
INFO m.r.a.s.DeathPactExceptionChildActor - Actor going to stop and announce that it's terminated
ERROR akka.actor.OneForOneStrategy - Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated
akka.actor.DeathPactException: Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated
INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://DeathPactExceptionSystem/deadLetters] to Actor[akka://DeathPactExceptionSystem/user/$a#-1452955980] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
4. EXCEPTION => RESTART
對于其他的異常,缺省的指令是重啟Actor。看下這個應用。只是要證明下Actor重啟了,OtherExceptionParentActor讓child拋出一個異常并立刻發送一條消息。消息在子actor重啟的時候到達了mailbox,并被處理了。真不錯!
[圖片描述][5]
package me.rerun.akkanotes.supervision
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy.Stop
object OtherExceptionApp extends App{
val actorSystem=ActorSystem("OtherExceptionSystem")
val actor=actorSystem.actorOf(Props[OtherExceptionParentActor])
actor!"create_child"
}
class OtherExceptionParentActor extends Actor with ActorLogging{
def receive={
case "create_child"=> {
log.info ("creating child")
val child=context.actorOf(Props[OtherExceptionChildActor])
child!"throwSomeException"
child!"someMessage"
}
}
}
class OtherExceptionChildActor extends akka.actor.Actor with ActorLogging{
override def preStart={
log.info ("Starting Child Actor")
}
def receive={
case "throwSomeException"=> {
throw new Exception ("I'm getting thrown for no reason")
}
case "someMessage" => log.info ("Restarted and printing some Message")
}
override def postStop={
log.info ("Stopping Child Actor")
}
}
Log
1.異常拋出了,我們能在trace中看到
- 子類重啟了 - stop和start被調用了(我們稍后能看到preRestart和postRestart)
- 消息在重啟開始前被發送給子actor。
INFO m.r.a.s.OtherExceptionParentActor - creating child
INFO m.r.a.s.OtherExceptionChildActor - Starting Child Actor
ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason
java.lang.Exception: I'm getting thrown for no reason
at me.rerun.akkanotes.supervision.OtherExceptionChildActor$$anonfun$receive$2.applyOrElse(OtherExceptionApp.scala:39) ~[classes/:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:465) ~[akka-actor_2.11-2.3.4.jar:na]
...
...
INFO m.r.a.s.OtherExceptionChildActor - Stopping Child Actor
INFO m.r.a.s.OtherExceptionChildActor - Starting Child Actor
INFO m.r.a.s.OtherExceptionChildActor - Restarted and printing some Message
ESCALATE AND RESUME
我們在defaultStrategy中看到stop和restart的例子。現在讓我們快速看下Escalate。
Resume忽略了異常并處理mailbox中的下條消息。這就像是抓住了異常但什么事也沒做。
Escalating更像是異常是致命的而supervisor不能處理它。所以,他要向他的supervisor求救。讓我們看個例子。
假設有三個Actor - EscalateExceptionTopLevelActor, EscalateExceptionParentActor 和 EscalateExceptionChildActor。 如果一個子actor拋出一個日常并且父級別actor不能處理它,他可以Escalate這個異常到頂級actor。頂級actor也可以選擇對哪些指令做出響應。在我們的例子里,我們只是做了stop。stop會立即停掉child(這里是EscalateExceptionParentActor)。我們知道,當一個actor執行stop時,他的所有子類都會在actor自己停掉前先停止。
package me.rerun.akkanotes.supervision
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.SupervisorStrategy.Stop
import akka.actor.actorRef2Scala
object EscalateExceptionApp extends App {
val actorSystem = ActorSystem("EscalateExceptionSystem")
val actor = actorSystem.actorOf(Props[EscalateExceptionTopLevelActor], "topLevelActor")
actor ! "create_parent"
}
class EscalateExceptionTopLevelActor extends Actor with ActorLogging {
override val supervisorStrategy = OneForOneStrategy() {
case _: Exception => {
log.info("The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.")
Stop //Stop will stop the Actor that threw this Exception and all its children
}
}
def receive = {
case "create_parent" => {
log.info("creating parent")
val parent = context.actorOf(Props[EscalateExceptionParentActor], "parentActor")
parent ! "create_child" //Sending message to next level
}
}
}
class EscalateExceptionParentActor extends Actor with ActorLogging {
override def preStart={
log.info ("Parent Actor started")
}
override val supervisorStrategy = OneForOneStrategy() {
case _: Exception => {
log.info("The exception is ducked by the Parent Actor. Escalating to TopLevel Actor")
Escalate
}
}
def receive = {
case "create_child" => {
log.info("creating child")
val child = context.actorOf(Props[EscalateExceptionChildActor], "childActor")
child ! "throwSomeException"
}
}
override def postStop = {
log.info("Stopping parent Actor")
}
}
class EscalateExceptionChildActor extends akka.actor.Actor with ActorLogging {
override def preStart={
log.info ("Child Actor started")
}
def receive = {
case "throwSomeException" => {
throw new Exception("I'm getting thrown for no reason.")
}
}
override def postStop = {
log.info("Stopping child Actor")
}
}
Log
可以在log中看到,
- 子actor拋了異常。
- supervisor(EscalateExceptionParentActor)升級了(escalate)異常拋給了他的supervisor(EscalateExceptionTopLevelActor)
- EscalateExceptionTopLevelActor 的指令是關閉actor。在順序上,子actor先停止。
- 父actor之后再停止(在watcher被通知后)
INFO m.r.a.s.EscalateExceptionTopLevelActor - creating parent
INFO m.r.a.s.EscalateExceptionParentActor - Parent Actor started
INFO m.r.a.s.EscalateExceptionParentActor - creating child
INFO m.r.a.s.EscalateExceptionChildActor - Child Actor started
INFO m.r.a.s.EscalateExceptionParentActor - The exception is ducked by the Parent Actor. Escalating to TopLevel Actor
INFO m.r.a.s.EscalateExceptionTopLevelActor - The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.
ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason.
java.lang.Exception: I'm getting thrown for no reason.
at me.rerun.akkanotes.supervision.EscalateExceptionChildActor$$anonfun$receive$3.applyOrElse(EscalateExceptionApp.scala:71) ~[classes/:na]
...
...
INFO m.r.a.s.EscalateExceptionChildActor - Stopping child Actor
INFO m.r.a.s.EscalateExceptionParentActor - Stopping parent Actor
請記住無論哪個指令發出只會使用在被escalated的子類上。 例如,一個restart指令從頂層發出,只有父類會被重啟并且在構造函數中/preStart中的都會被執行。如果一個父actor的子類在構造函數中唄創建,他們就會被創建。然而,在消息中創建的child的父actor仍然會在Terminated狀態。
TRIVIA
實際上,你可以控制是否preStart被調用。我們可以在下節看到。如果你好奇,可以看下Actor中的**postRestart*方法
def postRestart(reason: Throwable): Unit = {
preStart()
}
代碼
跟往常一樣,代碼在github
文章來自微信平臺「麥芽面包」(微信掃描二維碼關注)。未經允許,禁止轉載。
文章列表
不含病毒。www.avast.com |
留言列表