Akka - CircuitBreakerはどう動くのか
2016-11-28
QiitaScalaAkkaCircuitBreaker とは
原典にあたるのはMartin Fowler 氏の postになるはず。 リモートへのリクエストが失敗した時にうまいこと障害が伝搬しないようにするためのデザイン。
昨今の Microservice アーキテクチャな文脈だと考えやすい。 特定のサービスがダウンしてしまった時に、 それに依存するサービスが引きずられてダウンしてしまわないようにするもの。 Reactive なシステムを作る上で欠かせない存在になりそう。
CircuitBreaker の状態
監視対象とするサービスの状態に応じて Close, Open, Half-Open の 3 つをとる。 それぞれの状態の意味はおおざっぱに以下。
- Close
- 正常に動作していて、対象サービスにリクエストを送る状態
- Open
- 対象サービスに異常があり、リクエストを送らない状態
- Half-Open
- そろそろ直ったかな、とちょっとリクエストを送ろうとしてみる状態
Close が正常で Open が異常というのは何となく直感に反するが、
CircuitBreaker
の語源が電気回路にあることを考慮すれば自然。
Close 状態でサービスの失敗が重なると Open に移行、一定時間後に Half-Open となり、 サービスに一度リクエストを送信して成功したら Close、失敗したら Open に戻る、という状態遷移。
Akka の CircuitBreaker
Akka も CircuitBreaker を提供している。
Circuit Breaker — Akka Documentation
ちなみに実装はここにある。
この記事ではakka.pattern.CircuitBreaker
を使って CircuitBreaker の動作を追って理解する。
動作を確認するためにサンプルを実装してみる
コードはgithubに置いた。
サービスにあたるものを実装
まず、障害が起こりうるサービスを Actor で用意する。
case class Message(value: String)
case object PanicMessage
case object HeavyMessage
class UnstableActor extends Actor {
override def receive: Receive = {
case Message(value) =>
println(value)
sender ! s"receive: $value"
case PanicMessage =>
// just fail
sys.error("Oops...")
case HeavyMessage =>
// sleep over callTimeout
Thread.sleep(2000)
sender ! s"finish $HeavyMessage"
}
}
Message(value)
は正常処理出来るメッセージPanicMessage
は例外を throw してしまうメッセージHeavyMessage
は時間のかかる処理をするメッセージ
となっている。
CircuitBreaker を用意
implicit val system = ActorSystem(s"AkkaCircuitBreakerPrac")
val circuitBreaker = CircuitBreaker(
system.scheduler,
maxFailures = 2,
callTimeout = 1.seconds,
resetTimeout = 3.seconds)
.onOpen(println(s"OPEN"))
.onClose(println(s"CLOSE"))
.onHalfOpen(println(s"HALF-OPEN"))
状態遷移時のコールバックを登録できるので標準出力に表示するようにしている。
CircuitBreaker.apply
の引数それぞれの意味は以下。
- maxFailures
- Close 状態で
maxFailures
回、サービスの実行に失敗したら Open 状態に移行する
- Close 状態で
- callTimeout
- サービスを実行しようとして
callTimeout
時間経過しても結果が返ってこなければ失敗とみなす
- サービスを実行しようとして
- resetTimeout
- Open 状態で
resetTimeout
時間経過したら Half-Open に移行する
- Open 状態で
CircuitBreaker の動作を見る
まずはActorRef
とかの準備
val actorRef = system.actorOf(Props[UnstableActor])
implicit val dispatcher: ExecutionContext = ExecutionContext.Implicits.global
implicit val timeout = Timeout(5.seconds)
CircuitBreaker#withCircuitBreaker
で監視しつつActorRef
への?
を実行。
初期状態では Close になっているので正常に処理される。
// 状態はClose
circuitBreaker.withCircuitBreaker(actorRef ? Message("1"))
circuitBreaker.withCircuitBreaker(actorRef ? Message("2"))
次に一度Failure
になる処理を実行する。
maxFailure
が 2 なので Close なまま。
// 1回失敗
circuitBreaker.withCircuitBreaker(actorRef ? PanicMessage)
circuitBreaker.withCircuitBreaker(actorRef ? Message("3"))
// Closeのまま
2 回Failure
させて Open にする。
// `maxFailure`を超える2回の失敗
circuitBreaker.withCircuitBreaker(actorRef ? PanicMessage)
circuitBreaker.withCircuitBreaker(actorRef ? HeavyMessage)
// `HeavyMesage`の処理が終わるまで待つ
Thread.sleep(3000)
// Openになっているためメッセージは処理されない
circuitBreaker.withCircuitBreaker(actorRef ? Message("4"))
resetTimeout
以上の時間が経つと Half-Open に移行する。
// `resetTimeout`以上待つ
Thread.sleep(3500)
Half-Open の状態で処理に成功すると Close に戻る。
circuitBreaker.withCircuitBreaker(actorRef ? Message("5"))
// Closeに戻る
// 以下は成功する
circuitBreaker.withCircuitBreaker(actorRef ? Message("6"))
circuitBreaker.withCircuitBreaker(actorRef ? Message("7"))
circuitBreaker.withCircuitBreaker(actorRef ? Message("8"))
感想
CircuitBreaker が持つ Close,Half-Open,Open の 3 つの状態と、 失敗する可能性のある外部サービスへのアクセスなどをマッピングがうまく出来ていれば、 いわゆるレジリエントなシステムになる気がする。
from: https://qiita.com/petitviolet/items/777be810e3c5756a393d