akka.actor.FSMを永続化出来るPersistentFSM
2016-08-30
QiitaScalaAkkaactor状態の保持/状態遷移に長けたFSMと、その状態/状態遷移を永続化するAkka-Persistentの組み合わせとしてPersistentFSMがある。
状態を持つ Actor でイベントを永続化したいケース(EventSourcing など)には強力な武器となる。
ただ、FSM
と同様に普通の Actor(PersistentActor)でもbecome
を使えば状態は表現できるので使いどころは難しいかもしれない。
ドキュメントにもあるように experimental なものなので注意(v2.4.9 時点)
実装の仕方
akka.actor.Actor
の代わりにakka.persistence.fsm.PersistentFSM
を extends してやればよい。
通常の FSM は型パラメータとして State と Data にあたる型を 2 つ要求したが、PersistentFSM
では型パラメータを 3 つ、State/Data/Event に当たる型を要求する。
また、State はPersistentFSM.FSMState
を継承した型でなければならない。
trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with PersistentFSMBase[S, D, E] with ActorLogging {
...
}
State, Data, Event を用意する
ドキュメントのサンプルにあわせてショッピングカートっぽいのを実装してみる。 サンプルコード全体はGithubにあげてある。
まず、ショッピングの対象となる商品にあたるデータ型を用意する。
case class Item(name: String, price: Int)
State
State は FSM の状態を表現する型にあたり、PersistentFSM.FSMState
を extends しておく。
identifier: String
を override する必要があるので適当に実装しておく。
sealed trait ShoppingState extends PersistentFSM.FSMState {
override def identifier: String = s"my-event: ${getClass.getSimpleName}"
}
case object Looking extends ShoppingState
case object Shopping extends ShoppingState
case object Purchased extends ShoppingState
Data
FSM の内部に保持するデータとなる。
今回はSet[Item]
とした。
case class ShoppingData private[ShoppingData] (items: Set[Item]) {
// 合計price
def price: Int = items.map(_.price).sum
}
object ShoppingData {
// 初期状態となるもの
val empty = ShoppingData(Set.empty)
}
Event
FSM の状態を変化させるイベントとなる。 状態だけでなく内部データにも影響を及ぼすイベントも用意する。
sealed trait ShoppingEvent
// 内部データ(ShoppingData)に影響する
case class AddItem(item: Item) extends ShoppingEvent
case class RemoveItem(item: Item) extends ShoppingEvent
// 状態遷移するだけ
case object Purchase extends ShoppingEvent
case object Leave extends ShoppingEvent
PersistentFSM を実装する
State/Data/Event にあたる型が用意出来たので、ようやくPersistentFSM
を実装できる。
実装全体をのせる。
class Customer(implicit val domainEventClassTag: ClassTag[ShoppingEvent])
extends PersistentFSM[ShoppingState, ShoppingData, ShoppingEvent] {
// PersistentActor#persistenceIdと同じもの
override def persistenceId: String = "example-persistence-FSM"
// 状態変化する際に適用されるメソッドで、内部データを変化させる
// `[stay|goty] applying ???`とした時に呼ばれる
override def applyEvent(domainEvent: ShoppingEvent, currentData: ShoppingData): ShoppingData = {
log.info(s"\n***applyEvent: $domainEvent, $currentData, state: $stateName")
domainEvent match {
case AddItem(item) =>
// カートにItemを追加
currentData.copy(items = currentData.items + item)
case RemoveItem(item) =>
// カートからItemを除去
currentData.copy(items = currentData.items - item)
case Purchase =>
// 購入する。カートは空になる。
println(s"price => ${currentData.price}")
ShoppingData.empty
case Leave =>
// ショッピング終了。
ShoppingData.empty
}
}
// 初期Stateと初期Data
startWith(Looking, ShoppingData.empty)
// FSMと同様。`when(現在の状態) { イベントハンドラ }`という構造
when(Looking) {
case Event(addItem: AddItem, _) =>
// Looking状態からAddItemでShopping状態に移行
gotoLogging(Shopping) applying addItem andThen {
// andThenでapplyEvent後のイベントハンドラを設定できる
case afterAddItem => log.info(s"after => $afterAddItem")
}
}
when(Shopping) {
case Event(addItem: AddItem, _) =>
// Shopping状態でAddItemしても状態は変化しないが、AddItemは適用する
stay applying addItem
case Event(removeItem: RemoveItem, _) =>
// Shopping状態でRemoveItemしても状態は変化しないが、RemoveItemは適用する
stay applying removeItem
case Event(Purchase, _) =>
// Shopping状態でPurchaseするとPurchased状態に移行
goto(Purchased) applying Purchase
}
when(Purchased) {
case Event(Leave, _) =>
// Purchased状態からLeaveすることでLooking状態に戻る
goto(Looking) applying Leave
}
// 状態遷移ハンドラ
onTransition {
case Purchased -> Looking =>
// ショッピング終了時にsnapshotをとっておく
saveStateSnapshot()
}
}
domainEventClassTag
を以下のように内部で定義しようとすると StackOverFlow が発生したので外から渡す形とした。
...
// StackOverFlowが発生してしまう
implicit val domainEventClassTag: ClassTag[ShoppingEvent] = classTag[ShoppingEvent]
...
実装自体を見ると、akka.actor.FSM
がベースになっていて、when
,onTransition
で状態遷移とそのハンドラを定義すれば良い。
ただし、FSM
とはusing
の代わりにapplying
を使用するという点が異なる
また、PersistentActor
のようにpersist
を明示的に呼ばなくてもapplyEvent
に渡ってくるdomainEvent
は自動的にpersist
されている模様。
サンプルではinitialize
を実行しているが、ver2.4.5 からは内部 API となったため不要。
andThen の注意点
カート内のItem
が 1 つの時にRemoveItem
されたらLooking
に戻したい。
イベント適用後のイベントハンドラを設定するためのandThen
という API があるためそれが使えそうなので実装してみる。
case Event(removeItem: RemoveItem, _) =>
stay applying removeItem andThen {
// currentDataに対するイベントハンドラ(`afterTransitionDo`)を設定
case ShoppingData.empty => goto(Looking)
}
しかし、実際にはandThen
内のgoto
やstay
では状態遷移させることが出来なかった...。
解決するにはwhen
に対するハンドラとしてそれを実装しなければならない。
when(Shopping) {
...
case Event(removeItem: RemoveItem, ShoppingData.empty) =>
// currentDataがemptyなら許可されていない処理
throw new IllegalStateException("ShoppingCart is Empty")
case Event(removeItem @ RemoveItem(item), ShoppingData(items)) if items.size == 1 && (items contains item) =>
// 1つしかないitemがremoveされる場合
gotoLogging(Looking) applying removeItem
case Event(removeItem: RemoveItem, _) =>
// 通常の動作
stayLogging applying removeItem
...
}
何のためのandThen
なのか...と言いたくなるが、
恐らく状態遷移を促すようなドメインイベントをメッセージとして受け取らずに内部的に状態遷移すると、永続化してあるメッセージを replay しても状態の復元が出来なくなってしまうからではないかと思われる。
今回のようなケースではwhen
のハンドラで事前に検査するしかない(はず)。
Shopping
からLooking
に戻すようなイベントを定義してself
に!
(tell
)するのも考えられるが、メッセージが直後に処理されるとは限らないため上手くいかないケースも多い。
Recovery
Akka-Persistence で大事な状態の復元。
PersistentFSM
の場合、Actor 起動時に Snapshot と journal から自動で状態を recovery してくれる。
普通のPersistentActor
と同様に、最新の Snapshot を適用した後に、その Snapshot 採取後のイベントを journal から読みだしてそれも適用してくれる。
recovery が完了したらコールバックとしてonRecoveryCompleted
が実行される。
from: https://qiita.com/petitviolet/items/604fe55dc85a4ead5c0c