close
文章出處

失敗更像是分布式系統的一個特性。因此Akka用一個容忍失敗的模型,在你的業務邏輯與失敗處理邏輯(supervision邏輯)中間你能有一個清晰的邊界。只需要一點點工作,這很贊。這就是我們要討論的主題。

ACTOR SUPERVISION

想象一個方法調用了你棧頂的方法但卻出了一個異常。那么在棧下的方法能做什么呢?

  1. 抓住異常并按順序處理恢復
  2. 抓住異常,也許記個日志并保持安靜。
  3. 下層的方法也可以選擇無視這個異常(或者抓住并重扔出來)

想象下一直扔到main方法仍然沒有處理這個異常。這種情況下,程序肯定會輸出一個異常給console后退出。

你可以把同樣的情況套用在線程上。如果一個子線程拋了異常而再假設run或**call*方法沒有處理它,那么這個異常就會期望放在父線程或主線程中解決,無論哪種情況,如果主線程沒有處理他,系統就會退出。

讓我們再看看 - 如果被context.actorof創建出來的子Actor因為一個異常失敗了。父actor(指supervisor)可以處理子actor的任何失敗。如果父actor做了,他可以處理并恢復(Restart/Resume)。另外,把異常傳遞(Escalate)給父actor。 還有一種做法,可以直接stop掉子actor - 這就是那個子actor結局了。 為什么我說父actor(那個supervisor)?這是因為akka的監護方式為家長監護 - 這意味著只有創建了actor的人才能監護他們。

就這么多了!我們已經覆蓋到所有監護指令(Directoives)了。

策略

我忘了說一點: 你已經知道一個Akka Actor可以創建子actor并且子actor也可以隨意創建他們自己的子actor。

現在,想下以下兩個場景:

1.OneForOneStrategy

你的actor創建了很多子actor并且每一個子actor都連接了不同的數據源。假設你運行的是一個將英語翻譯成多種語言的應用。

假設,一個子actor失敗了然而你可以接受在最終結果里跳過這個結果,你想怎么做?關掉這個服務?當然不,你可能想要重啟/關閉這個有問題的子actor。是吧?現在這個策略在Akka的監護策略中叫OneForOneStrategy策略 - 如果一個actor掛了,只單獨處理這個actor。

基于你的業務異常,你可能需要對不同的異常有不同的反應(停止,重啟,升級,恢復)。要配置你自己的策略,你只需要override你Actor類中的supervisorStrategy方法。

聲明OneForOneStrategy的例子

import akka.actor.Actor  
import akka.actor.ActorLogging  
import akka.actor.OneForOneStrategy  
import akka.actor.SupervisorStrategy.Stop


class TeacherActorOneForOne extends Actor with ActorLogging {

    ...
    ...
  override val supervisorStrategy=OneForOneStrategy() {

    case _: MinorRecoverableException     => Restart
    case _: Exception                   => Stop

  }
  ...
  ...  

2.AllForOneStrategy策略

假設你在做一個外部排序 (這是個又證明了我沒啥創造力的例子!),你的每個塊都被一個不同的actor處理。突然,一個Actor失敗了并拋了一個異常。這樣再往下處理就沒什么意思了因為最終結果肯定是錯的。所以,邏輯就是停止stop所有的actor。

我為什么說stop而不是重啟?因為在這個例子里重啟也沒用,每個actor的mailbox在重啟時并不會被清理。所以,如果我們重啟了,另外的chunk仍然會被處理。這不是我們想要的。重建actor并用新的mailbox在這里是個更合適的策略。

OneForOneStrategy一樣,只需要用AllForOneStrategy的實現覆寫supervisorStrategy

下面是例子

import akka.actor.{Actor, ActorLogging}  
import akka.actor.AllForOneStrategy  
import akka.actor.SupervisorStrategy.Escalate  
import akka.actor.SupervisorStrategy.Stop


class TeacherActorAllForOne extends Actor with ActorLogging {

  ...

  override val supervisorStrategy = AllForOneStrategy() {

    case _: MajorUnRecoverableException => Stop
    case _: Exception => Escalate

  }
  ...
  ...

指令 DIRECTIVES

AllForOneStrategyOneForOneStrategy的構造方法都接受一個叫DeciderPartialFunction[Throwable,Directive]方法,他把ThrowableDirective指令做了一個映射:

case _: MajorUnRecoverableException => Stop  

這就簡單的四個指令 - Stop,Resume,Escalate和Restart

Stop

在異常發生時子actor會停止,任何發給停止的actor的消息都會被轉到deadLetter隊列。

Resume

子actor會忽略拋出異常的消息并且繼續處理隊列中的其他消息。

Restart

子actor會停止并且一個新的actor會初始化。繼續處理mailbox中其他的消息。世界對這個是無感知的因為同樣的ActorRef指向了新的Actor。

Escalate

supervisor復制了失敗并讓他的supervisor處理這個異常。

缺省策略

如果我們的actor沒指定任何策略但是創建了子actor。他們會怎樣處理?Actor會有一個缺省的策略:

override val supervisorStrategy=OneForOneStrategy() {

    case _: ActorInitializationException=> Stop
    case _: ActorKilledException        => Stop
    case _: DeathPactException             => Stop
    case _: Exception                   => Restart

}

所以,缺省策略處理了四個case:

1. ACTORINITIALIZATIONEXCEPTION => STOP

當actor不能初始化,他會拋出一個ActorInitializationException。actor會被停止。讓我們在preStart調用中模擬下這個:

package me.rerun.akkanotes.supervision

import akka.actor.{ActorSystem, Props}  
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest  
import akka.actor.Actor  
import akka.actor.ActorLogging

object ActorInitializationExceptionApp extends App{

  val actorSystem=ActorSystem("ActorInitializationException")
  val actor=actorSystem.actorOf(Props[ActorInitializationExceptionActor], "initializationExceptionActor")
  actor!"someMessageThatWillGoToDeadLetter"
}

class ActorInitializationExceptionActor extends Actor with ActorLogging{  
  override def preStart={
    throw new Exception("Some random exception")
  }
  def receive={
    case _=>
  }
}
Ru

運行ActorInitializationExceptionApp會產生一個ActorInitializationException 異常然后所有的消息都會進deadLetterActor的消息隊列:

Log

[ERROR] [11/10/2014 16:08:46.569] [ActorInitializationException-akka.actor.default-dispatcher-2] [akka://ActorInitializationException/user/initializationExceptionActor] Some random exception
akka.actor.ActorInitializationException: exception during creation  
    at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
...
...
Caused by: java.lang.Exception: Some random exception  
    at me.rerun.akkanotes.supervision.ActorInitializationExceptionActor.preStart(ActorInitializationExceptionApp.scala:17)
...
...

[INFO] [11/10/2014 16:08:46.581] [ActorInitializationException-akka.actor.default-dispatcher-4] [akka://ActorInitializationException/user/initializationExceptionActor] Message [java.lang.String] from Actor[akka://ActorInitializationException/deadLetters] to Actor[akka://ActorInitializationException/user/initializationExceptionActor#-1290470495] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

2. ACTORKILLEDEXCEPTION => STOP

當Actor被kill消息關閉后,他會拋出一個ActorKilledException。如果拋這個異常,缺省策略會讓子actor停止。看起來停止一個被kill掉的actor沒什么意義。但想想這個:

  1. ActorKilledException 會被傳遞給supervisor。 那么之前我們在DeathWatch里面提到的Actor里的生命周期watchdeathwatchers。 直到Actor被停掉前watcher不會知道任何事情。

  2. 給Actor發送kill只會讓那個特定的監管actor知道。用stop處理會暫停那個actor的mailbox,暫停了子actor的mailbox,停止了子actor,發送了Terminated給所有子actor的watcher,發送給所有類一個Terminated,然后actor的watcher都會迅速失敗最終讓Actor自己停止、

package me.rerun.akkanotes.supervision

import akka.actor.{ActorSystem, Props}  
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest  
import akka.actor.Actor  
import akka.actor.ActorLogging  
import akka.actor.Kill

object ActorKilledExceptionApp extends App{

  val actorSystem=ActorSystem("ActorKilledExceptionSystem")
  val actor=actorSystem.actorOf(Props[ActorKilledExceptionActor])
  actor!"something"
  actor!Kill
  actor!"something else that falls into dead letter queue"
}

class ActorKilledExceptionActor extends Actor with ActorLogging{  
  def receive={
    case message:String=> log.info (message)
  }
}

Log

日志說只要ActorKilledException 進來,supervisor就會停掉actor并且消息會進入deadLetter隊列

INFO  m.r.a.s.ActorKilledExceptionActor - something

ERROR akka.actor.OneForOneStrategy - Kill  
akka.actor.ActorKilledException: Kill

INFO  akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ActorKilledExceptionSystem/deadLetters] to Actor[akka://ActorKilledExceptionSystem/user/$a#-1569063462] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

3. DEATHPACTEXCEPTION => STOP

DeathWatch文中,你可以看到當一個Actor觀察一個子Actor時,他期望在他的receive中處理Terminated消息。如果沒有呢?你會得到一個DeathPactException

代碼演示了supervisorwatch子actor但沒有從子actor處理Terminated消息。

package me.rerun.akkanotes.supervision

import akka.actor.{ActorSystem, Props}  
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest  
import akka.actor.Actor  
import akka.actor.ActorLogging  
import akka.actor.Kill  
import akka.actor.PoisonPill  
import akka.actor.Terminated

object DeathPactExceptionApp extends App{

  val actorSystem=ActorSystem("DeathPactExceptionSystem")
  val actor=actorSystem.actorOf(Props[DeathPactExceptionParentActor])
  actor!"create_child" //Throws DeathPactException
  Thread.sleep(2000) //Wait until Stopped
  actor!"someMessage" //Message goes to DeadLetters

}

class DeathPactExceptionParentActor extends Actor with ActorLogging{

  def receive={
    case "create_child"=> {
      log.info ("creating child")
      val child=context.actorOf(Props[DeathPactExceptionChildActor])
      context.watch(child) //Watches but doesnt handle terminated message. Throwing DeathPactException here.
      child!"stop"
    }
    case "someMessage" => log.info ("some message")
    //Doesnt handle terminated message
    //case Terminated(_) =>
  }
}

class DeathPactExceptionChildActor extends Actor with ActorLogging{  
  def receive={
    case "stop"=> {
      log.info ("Actor going to stop and announce that it's terminated")
      self!PoisonPill
    }
  }
}

Log

日志告訴我們DeathPactException 進來了,supervisor停止了actor然后消息都進入了deadLetter的隊列

INFO  m.r.a.s.DeathPactExceptionParentActor - creating child

INFO  m.r.a.s.DeathPactExceptionChildActor - Actor going to stop and announce that it's terminated

ERROR akka.actor.OneForOneStrategy - Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated  
akka.actor.DeathPactException: Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated

INFO  akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://DeathPactExceptionSystem/deadLetters] to Actor[akka://DeathPactExceptionSystem/user/$a#-1452955980] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

4. EXCEPTION => RESTART

對于其他的異常,缺省的指令重啟Actor。看下這個應用。只是要證明下Actor重啟了,OtherExceptionParentActor讓child拋出一個異常并立刻發送一條消息。消息在子actor重啟的時候到達了mailbox,并被處理了。真不錯!
[圖片描述][5]

package me.rerun.akkanotes.supervision

import akka.actor.Actor  
import akka.actor.ActorLogging  
import akka.actor.ActorSystem  
import akka.actor.OneForOneStrategy  
import akka.actor.Props  
import akka.actor.SupervisorStrategy.Stop

object OtherExceptionApp extends App{

  val actorSystem=ActorSystem("OtherExceptionSystem")
  val actor=actorSystem.actorOf(Props[OtherExceptionParentActor])
  actor!"create_child"

}

class OtherExceptionParentActor extends Actor with ActorLogging{

  def receive={
    case "create_child"=> {
      log.info ("creating child")
      val child=context.actorOf(Props[OtherExceptionChildActor])

      child!"throwSomeException"
      child!"someMessage"
    }
  }
}

class OtherExceptionChildActor extends akka.actor.Actor with ActorLogging{

  override def preStart={
    log.info ("Starting Child Actor")
  }

  def receive={
    case "throwSomeException"=> {
      throw new Exception ("I'm getting thrown for no reason") 
    }
    case "someMessage" => log.info ("Restarted and printing some Message")
  }

  override def postStop={
    log.info ("Stopping Child Actor")
  }

}

Log

1.異常拋出了,我們能在trace中看到

  1. 子類重啟了 - stop和start被調用了(我們稍后能看到preRestart和postRestart
  2. 消息在重啟開始前被發送給子actor。
INFO  m.r.a.s.OtherExceptionParentActor - creating child

INFO  m.r.a.s.OtherExceptionChildActor - Starting Child Actor

ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason

java.lang.Exception: I'm getting thrown for no reason  
    at me.rerun.akkanotes.supervision.OtherExceptionChildActor$$anonfun$receive$2.applyOrElse(OtherExceptionApp.scala:39) ~[classes/:na]
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) ~[akka-actor_2.11-2.3.4.jar:na]
...
...    

INFO  m.r.a.s.OtherExceptionChildActor - Stopping Child Actor

INFO  m.r.a.s.OtherExceptionChildActor - Starting Child Actor

INFO  m.r.a.s.OtherExceptionChildActor - Restarted and printing some Message

ESCALATE AND RESUME

我們在defaultStrategy中看到stop和restart的例子。現在讓我們快速看下Escalate

Resume忽略了異常并處理mailbox中的下條消息。這就像是抓住了異常但什么事也沒做。

Escalating更像是異常是致命的而supervisor不能處理它。所以,他要向他的supervisor求救。讓我們看個例子。

假設有三個Actor - EscalateExceptionTopLevelActor, EscalateExceptionParentActor 和 EscalateExceptionChildActor。 如果一個子actor拋出一個日常并且父級別actor不能處理它,他可以Escalate這個異常到頂級actor。頂級actor也可以選擇對哪些指令做出響應。在我們的例子里,我們只是做了stopstop會立即停掉child(這里是EscalateExceptionParentActor)。我們知道,當一個actor執行stop時,他的所有子類都會在actor自己停掉前先停止。

package me.rerun.akkanotes.supervision

import akka.actor.Actor  
import akka.actor.ActorLogging  
import akka.actor.ActorSystem  
import akka.actor.OneForOneStrategy  
import akka.actor.Props  
import akka.actor.SupervisorStrategy.Escalate  
import akka.actor.SupervisorStrategy.Stop  
import akka.actor.actorRef2Scala

object EscalateExceptionApp extends App {

  val actorSystem = ActorSystem("EscalateExceptionSystem")
  val actor = actorSystem.actorOf(Props[EscalateExceptionTopLevelActor], "topLevelActor")
  actor ! "create_parent"
}

class EscalateExceptionTopLevelActor extends Actor with ActorLogging {

  override val supervisorStrategy = OneForOneStrategy() {
    case _: Exception => {
      log.info("The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.")
      Stop //Stop will stop the Actor that threw this Exception and all its children
    }
  }

  def receive = {
    case "create_parent" => {
      log.info("creating parent")
      val parent = context.actorOf(Props[EscalateExceptionParentActor], "parentActor")
      parent ! "create_child" //Sending message to next level
    }
  }
}

class EscalateExceptionParentActor extends Actor with ActorLogging {

  override def preStart={
    log.info ("Parent Actor started")
  }

  override val supervisorStrategy = OneForOneStrategy() {
    case _: Exception => {
      log.info("The exception is ducked by the Parent Actor. Escalating to TopLevel Actor")
      Escalate
    }
  }

  def receive = {
    case "create_child" => {
      log.info("creating child")
      val child = context.actorOf(Props[EscalateExceptionChildActor], "childActor")
      child ! "throwSomeException"
    }
  }

  override def postStop = {
    log.info("Stopping parent Actor")
  }
}

class EscalateExceptionChildActor extends akka.actor.Actor with ActorLogging {

  override def preStart={
    log.info ("Child Actor started")
  }

  def receive = {
    case "throwSomeException" => {
      throw new Exception("I'm getting thrown for no reason.")
    }
  }
  override def postStop = {
    log.info("Stopping child Actor")
  }
}

Log

可以在log中看到,

  1. 子actor拋了異常。
  2. supervisor(EscalateExceptionParentActor)升級了(escalate)異常拋給了他的supervisor(EscalateExceptionTopLevelActor
  3. EscalateExceptionTopLevelActor 的指令是關閉actor。在順序上,子actor先停止。
  4. 父actor之后再停止(在watcher被通知后)
INFO  m.r.a.s.EscalateExceptionTopLevelActor - creating parent

INFO  m.r.a.s.EscalateExceptionParentActor - Parent Actor started

INFO  m.r.a.s.EscalateExceptionParentActor - creating child

INFO  m.r.a.s.EscalateExceptionChildActor - Child Actor started

INFO  m.r.a.s.EscalateExceptionParentActor - The exception is ducked by the Parent Actor. Escalating to TopLevel Actor

INFO  m.r.a.s.EscalateExceptionTopLevelActor - The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.

ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason.  
java.lang.Exception: I'm getting thrown for no reason.  
    at me.rerun.akkanotes.supervision.EscalateExceptionChildActor$$anonfun$receive$3.applyOrElse(EscalateExceptionApp.scala:71) ~[classes/:na]
    ...
    ...

INFO  m.r.a.s.EscalateExceptionChildActor - Stopping child Actor

INFO  m.r.a.s.EscalateExceptionParentActor - Stopping parent Actor

請記住無論哪個指令發出只會使用在被escalated的子類上。 例如,一個restart指令從頂層發出,只有父類會被重啟并且在構造函數中/preStart中的都會被執行。如果一個父actor的子類在構造函數中唄創建,他們就會被創建。然而,在消息中創建的child的父actor仍然會在Terminated狀態。

TRIVIA

實際上,你可以控制是否preStart被調用。我們可以在下節看到。如果你好奇,可以看下Actor中的**postRestart*方法

def postRestart(reason: Throwable): Unit = {  
  preStart()
}

代碼

跟往常一樣,代碼在github


文章來自微信平臺「麥芽面包」(微信掃描二維碼關注)。未經允許,禁止轉載。


文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 AutoPoster 的頭像
    AutoPoster

    互聯網 - 大數據

    AutoPoster 發表在 痞客邦 留言(0) 人氣()