blog.petitviolet.net

Akka - CircuitBreakerはどう動くのか

2016-11-28

QiitaScalaAkka

CircuitBreaker とは

原典にあたるのはMartin Fowler 氏の postになるはず。 リモートへのリクエストが失敗した時にうまいこと障害が伝搬しないようにするためのデザイン。

昨今の Microservice アーキテクチャな文脈だと考えやすい。 特定のサービスがダウンしてしまった時に、 それに依存するサービスが引きずられてダウンしてしまわないようにするもの。 Reactive なシステムを作る上で欠かせない存在になりそう。

CircuitBreaker の状態

監視対象とするサービスの状態に応じて Close, Open, Half-Open の 3 つをとる。 それぞれの状態の意味はおおざっぱに以下。

  • Close

    • 正常に動作していて、対象サービスにリクエストを送る状態
  • Open

    • 対象サービスに異常があり、リクエストを送らない状態
  • Half-Open

    • そろそろ直ったかな、とちょっとリクエストを送ろうとしてみる状態

Close が正常で Open が異常というのは何となく直感に反するが、 CircuitBreakerの語源が電気回路にあることを考慮すれば自然。

Close 状態でサービスの失敗が重なると Open に移行、一定時間後に Half-Open となり、 サービスに一度リクエストを送信して成功したら Close、失敗したら Open に戻る、という状態遷移。

Akka の CircuitBreaker

Akka も CircuitBreaker を提供している。 Circuit Breaker — Akka Documentation ちなみに実装はここにある。 この記事ではakka.pattern.CircuitBreakerを使って CircuitBreaker の動作を追って理解する。

動作を確認するためにサンプルを実装してみる

コードはgithubに置いた。

サービスにあたるものを実装

まず、障害が起こりうるサービスを Actor で用意する。

case class Message(value: String)
case object PanicMessage
case object HeavyMessage

class UnstableActor extends Actor {
  override def receive: Receive = {
    case Message(value) =>
      println(value)
      sender ! s"receive: $value"
    case PanicMessage =>
      // just fail
      sys.error("Oops...")
    case HeavyMessage =>
      // sleep over callTimeout
      Thread.sleep(2000)
      sender ! s"finish $HeavyMessage"
  }
}
  • Message(value)は正常処理出来るメッセージ
  • PanicMessageは例外を throw してしまうメッセージ
  • HeavyMessageは時間のかかる処理をするメッセージ

となっている。

CircuitBreaker を用意

implicit val system = ActorSystem(s"AkkaCircuitBreakerPrac")
val circuitBreaker = CircuitBreaker(
  system.scheduler,
  maxFailures = 2,
  callTimeout =  1.seconds,
  resetTimeout =  3.seconds)
  .onOpen(println(s"OPEN"))
  .onClose(println(s"CLOSE"))
  .onHalfOpen(println(s"HALF-OPEN"))

状態遷移時のコールバックを登録できるので標準出力に表示するようにしている。 CircuitBreaker.applyの引数それぞれの意味は以下。

  • maxFailures

    • Close 状態でmaxFailures回、サービスの実行に失敗したら Open 状態に移行する
  • callTimeout

    • サービスを実行しようとしてcallTimeout時間経過しても結果が返ってこなければ失敗とみなす
  • resetTimeout

    • Open 状態でresetTimeout時間経過したら Half-Open に移行する

CircuitBreaker の動作を見る

まずはActorRefとかの準備

val actorRef = system.actorOf(Props[UnstableActor])
implicit val dispatcher: ExecutionContext = ExecutionContext.Implicits.global
implicit val timeout = Timeout(5.seconds)

CircuitBreaker#withCircuitBreakerで監視しつつActorRefへの?を実行。 初期状態では Close になっているので正常に処理される。

// 状態はClose
circuitBreaker.withCircuitBreaker(actorRef ? Message("1"))
circuitBreaker.withCircuitBreaker(actorRef ? Message("2"))

次に一度Failureになる処理を実行する。 maxFailureが 2 なので Close なまま。

// 1回失敗
circuitBreaker.withCircuitBreaker(actorRef ? PanicMessage)
circuitBreaker.withCircuitBreaker(actorRef ? Message("3"))
// Closeのまま

2 回Failureさせて Open にする。

// `maxFailure`を超える2回の失敗
circuitBreaker.withCircuitBreaker(actorRef ? PanicMessage)
circuitBreaker.withCircuitBreaker(actorRef ? HeavyMessage)
// `HeavyMesage`の処理が終わるまで待つ
Thread.sleep(3000)

// Openになっているためメッセージは処理されない
circuitBreaker.withCircuitBreaker(actorRef ? Message("4"))

resetTimeout以上の時間が経つと Half-Open に移行する。

// `resetTimeout`以上待つ
Thread.sleep(3500)

Half-Open の状態で処理に成功すると Close に戻る。

circuitBreaker.withCircuitBreaker(actorRef ? Message("5"))
// Closeに戻る

// 以下は成功する
circuitBreaker.withCircuitBreaker(actorRef ? Message("6"))
circuitBreaker.withCircuitBreaker(actorRef ? Message("7"))
circuitBreaker.withCircuitBreaker(actorRef ? Message("8"))

感想

CircuitBreaker が持つ Close,Half-Open,Open の 3 つの状態と、 失敗する可能性のある外部サービスへのアクセスなどをマッピングがうまく出来ていれば、 いわゆるレジリエントなシステムになる気がする。

from: https://qiita.com/petitviolet/items/777be810e3c5756a393d