ActorでCircuitBreakerを実装してみた話
2016-12-05
QiitaScalaAkkaactor実装してみる理由
ReactiveSystem や Microservices 等の文脈で登場する CircuitBreaker について以前調べた。
CircuitBreaker は内部にClose, Open, Half-Openの 3 つの状態を持ち、さらに非同期処理の実行とその監視という 2 つの責務を持つ。
状態の管理と責務の分割には Actor が適しているんじゃないかと思ったのがきっかけ。
リポジトリはここ。
petitviolet/supervisor
Actor 同士の supervise している状態に似ている(と感じた)のでsupervisorという名前にしてある。
気分が乗ったので MavenCentral にも公開している。
使い方
まずは CircuitBreaker にあたるものを作成する。
Supervisorというakka.actor.Actorがあり、それを普通にActorSystem#actorOfでActorRefを作成する。
// 事前準備
implicit val system = ActorSystem(s"SupervisorPrac")
implicit val dispatcher: ExecutionContext = ExecutionContext.fromExecutor(new ForkJoinPool(1))
// `Supervisor`なactorRefを作成
import scala.concurrent.duration._
val supervisorActor = system.actorOf(Supervisor.props(
maxFailCount = 2,
runTimeout = 1000.milliseconds,
resetWait = 3000.milliseconds
))
外部サービス呼び出しを模倣するscala.util.Futureを CircuitBreaker で監視しつつ実行して結果を取り出す。
// 何かしらの非同期処理
val future = Future.apply { val i = Random.nextInt(2000); Thread.sleep(i); i }
// `Execute`にいれて`supervisorActor`に送りつける
supervisorActor ? Execute(future) onComplete {
case Success(x) => println(s"success => $x")
case Failure(t) => println(s"fail => $t")
}
実行してみると
- 成功時は
success => 348 - 失敗時は
fail => java.util.concurrent.TimeoutException: Futures timed out after [1000 milliseconds]
といった感じのログが出力される。
akka.pattern.CircuitBreaker#withCircuitBreakerと同じようにscala.util.Futureを監視対象として、
専用のメッセージ(Execute)に入れてSupervisorなActorRefに対して送ると CircuitBreaker らしく動く。
どうやって作るか
大きく、状態の管理とFutureの監視に分けられる。
状態の管理
Actorが持つbecomeを使う。
akka.actor.FSMを使うのも良さそうだが、まずは普通のActorでやってみる。
実装の全体はこのへん。
抜き出すには量が多いのでリンク先を参照してもらいたい。
基本的には状態としてCloseとOpenにあたるreceiveがあれば良くて、ざっくりと以下の 2 つを用意すれば良い。
- 受け取った
Futureから値を取り出すreceive - 例外を投げ続ける
receive
Futureの失敗回数を数えておき、Supervisorの初期化時に設定したmaxFailCountやrunTimeoutに応じて状態を変更する。
OpenからHalf-Openへの遷移にはakka.actor.Schedulerが使える。
Schedulerで指定されたresetWait時間後に自分自身にHalf-Openへ戻るためのメッセージを送信するように設定しておけば、処理したタイミングでHalf-Openになれる。
Half-OpenはClose状態と同じreceiveでまかなえるが、次にFutureが失敗したら即座にOpenに戻るように仕込んでおく。
無駄な処理をさせない {#lazy-evaluation}
Supervisorの状態がOpenの時に外部サービス呼び出しなどを行っても無駄になってしまう。
そこで、Executeメッセージとして渡されるFutureを実行してしまわないように遅延評価する。
コンストラクタは call-by-name な=> Future[T]にしておいて内部的には() => Future[T]な関数として扱うようにした。
object 側でapplyとunapplyを用意して便利に扱えるようにしておく。実装
Future の監視
FutureがrunTimeoutで指定した時間内に成功するかどうかはAwait.resultでブロックしてFutureから値を取り出す。
そのブロックするのはSupervisor自身ではなくて、その子アクターに委譲している。
子アクターのreceiveを抜粋するとこんな感じ。
override def receive: Actor.Receive = {
case Run =>
log.debug(s"ExecutorActor: $message")
Try { Await.result(message.run.apply, timeout) } match {
case Success(result) =>
respondSuccessToParent(originalSender, result)
case Failure(t) =>
message match {
case ExecuteWithFallback(_, fallback) =>
respondSuccessToParent(originalSender, fallback.apply)
case _ =>
respondFailureToParent(originalSender, t)
}
}
成功と失敗を区別しつつ親アクターとなるSupervisorにメッセージを送り返す。
失敗していたらSupervisor側でその回数を記録しておき、状態遷移判定を行えば良い。
テストを書く
受け取ったメッセージによって状態が変わり、タイムアウトなど時間経過を含むActorのテストを書くには Testkit が非常に便利だった。
Testing Actor Systems — Akka Documentation
実際に使ってSupervisorのテストを書いた。
テストしたい対象をTestActorRefで wrap すれば自由に内部状態にアクセス出来るようになるため、以下のように書ける。
val supervisor = TestActorRef.apply[Supervisor[_]](Supervisor.props(maxFailCount, runTimeout, resetWait))
supervisor.underlyingActor.becomeOpen()
supervisor.underlyingActor.state shouldBe Open
普段は Actor そのものではなくてActorRefとなることで内部状態がカプセル化されているが、
Testkit のおかげでテストは非常に楽に書けた。
問題点
Supervisorが出来ることはFutureを監視するだけなのでシンプルだが、使い方には注意点がある。
上で説明したように、監視対象とするためのExecuteのコンストラクタ(apply)は call-by-name なパラメータとしてあるが、
Execute#applyの前にFuture.applyを呼んでいると外部サービス呼び出し等の非同期処理が走り出してしまう。
従って、Supervisorの状態がOpenなら完全に無駄になってしまう処理を実行しようとすることになる。
つまり、これと
val future = Future.apply { ??? }
supervisor ? Execute(future)
これが違う。
supervisor ? Execute(Future.apply { ??? })
上のように使用すると無駄な処理が走るが、下なら何も起きない。 テストとして書くとこんな感じ?
解決策は、Futureそのものを拡張してOpenなら実行しないようにしなければならない。
Akka の CircuitBreaker も同じような問題点がある(はず)。
そうなると Hystrix のように Command という形で外部サービス呼び出しと監視をまとめて実装する方が安全になりそう。
from: https://qiita.com/petitviolet/items/79c63859036e872dc0af