petitviolet blog

    akka.actor.FSMを永続化出来るPersistentFSM

    2016-08-30

    QiitaScalaAkkaactor

    状態の保持/状態遷移に長けたFSMと、その状態/状態遷移を永続化するAkka-Persistentの組み合わせとしてPersistentFSMがある。

    状態を持つ Actor でイベントを永続化したいケース(EventSourcing など)には強力な武器となる。 ただ、FSMと同様に普通の Actor(PersistentActor)でもbecomeを使えば状態は表現できるので使いどころは難しいかもしれない。

    ドキュメントにもあるように experimental なものなので注意(v2.4.9 時点)

    実装の仕方

    akka.actor.Actorの代わりにakka.persistence.fsm.PersistentFSMを extends してやればよい。

    通常の FSM は型パラメータとして State と Data にあたる型を 2 つ要求したが、PersistentFSMでは型パラメータを 3 つ、State/Data/Event に当たる型を要求する。 また、State はPersistentFSM.FSMStateを継承した型でなければならない。

    trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with PersistentFSMBase[S, D, E] with ActorLogging {
      ...
    }
    

    State, Data, Event を用意する

    ドキュメントのサンプルにあわせてショッピングカートっぽいのを実装してみる。 サンプルコード全体はGithubにあげてある。

    まず、ショッピングの対象となる商品にあたるデータ型を用意する。

    case class Item(name: String, price: Int)
    

    State

    State は FSM の状態を表現する型にあたり、PersistentFSM.FSMStateを extends しておく。 identifier: Stringを override する必要があるので適当に実装しておく。

    sealed trait ShoppingState extends PersistentFSM.FSMState {
      override def identifier: String = s"my-event: ${getClass.getSimpleName}"
    }
    case object Looking extends ShoppingState
    case object Shopping extends ShoppingState
    case object Purchased extends ShoppingState
    

    Data

    FSM の内部に保持するデータとなる。 今回はSet[Item]とした。

    case class ShoppingData private[ShoppingData] (items: Set[Item]) {
      // 合計price
      def price: Int = items.map(_.price).sum
    }
    object ShoppingData {
      // 初期状態となるもの
      val empty = ShoppingData(Set.empty)
    }
    

    Event

    FSM の状態を変化させるイベントとなる。 状態だけでなく内部データにも影響を及ぼすイベントも用意する。

    sealed trait ShoppingEvent
    // 内部データ(ShoppingData)に影響する
    case class AddItem(item: Item) extends ShoppingEvent
    case class RemoveItem(item: Item) extends ShoppingEvent
    // 状態遷移するだけ
    case object Purchase extends ShoppingEvent
    case object Leave extends ShoppingEvent
    

    PersistentFSM を実装する

    State/Data/Event にあたる型が用意出来たので、ようやくPersistentFSMを実装できる。 実装全体をのせる。

    class Customer(implicit val domainEventClassTag: ClassTag[ShoppingEvent])
      extends PersistentFSM[ShoppingState, ShoppingData, ShoppingEvent] {
    
      // PersistentActor#persistenceIdと同じもの
      override def persistenceId: String = "example-persistence-FSM"
    
      // 状態変化する際に適用されるメソッドで、内部データを変化させる
      // `[stay|goty] applying ???`とした時に呼ばれる
      override def applyEvent(domainEvent: ShoppingEvent, currentData: ShoppingData): ShoppingData = {
        log.info(s"\n***applyEvent: $domainEvent, $currentData, state: $stateName")
        domainEvent match {
          case AddItem(item) =>
            // カートにItemを追加
            currentData.copy(items = currentData.items + item)
          case RemoveItem(item) =>
            // カートからItemを除去
            currentData.copy(items = currentData.items - item)
          case Purchase =>
            // 購入する。カートは空になる。
            println(s"price => ${currentData.price}")
            ShoppingData.empty
          case Leave =>
            // ショッピング終了。
            ShoppingData.empty
        }
      }
    
      // 初期Stateと初期Data
      startWith(Looking, ShoppingData.empty)
    
      // FSMと同様。`when(現在の状態) { イベントハンドラ }`という構造
      when(Looking) {
        case Event(addItem: AddItem, _) =>
          // Looking状態からAddItemでShopping状態に移行
          gotoLogging(Shopping) applying addItem andThen {
            // andThenでapplyEvent後のイベントハンドラを設定できる
            case afterAddItem => log.info(s"after => $afterAddItem")
          }
      }
    
      when(Shopping) {
        case Event(addItem: AddItem, _) =>
          // Shopping状態でAddItemしても状態は変化しないが、AddItemは適用する
          stay applying addItem
    
        case Event(removeItem: RemoveItem, _) =>
          // Shopping状態でRemoveItemしても状態は変化しないが、RemoveItemは適用する
            stay applying removeItem
    
        case Event(Purchase, _) =>
          // Shopping状態でPurchaseするとPurchased状態に移行
          goto(Purchased) applying Purchase
      }
    
      when(Purchased) {
        case Event(Leave, _) =>
          // Purchased状態からLeaveすることでLooking状態に戻る
          goto(Looking) applying Leave
      }
    
      // 状態遷移ハンドラ
      onTransition {
        case Purchased -> Looking =>
          // ショッピング終了時にsnapshotをとっておく
          saveStateSnapshot()
      }
    
    }
    

    domainEventClassTagを以下のように内部で定義しようとすると StackOverFlow が発生したので外から渡す形とした。

      ...
      // StackOverFlowが発生してしまう
      implicit val domainEventClassTag: ClassTag[ShoppingEvent] = classTag[ShoppingEvent]
      ...
    

    実装自体を見ると、akka.actor.FSMがベースになっていて、when,onTransitionで状態遷移とそのハンドラを定義すれば良い。 ただし、FSMとはusingの代わりにapplyingを使用するという点が異なる また、PersistentActorのようにpersistを明示的に呼ばなくてもapplyEventに渡ってくるdomainEventは自動的にpersistされている模様。

    サンプルではinitializeを実行しているが、ver2.4.5 からは内部 API となったため不要。

    andThen の注意点

    カート内のItemが 1 つの時にRemoveItemされたらLookingに戻したい。

    イベント適用後のイベントハンドラを設定するためのandThenという API があるためそれが使えそうなので実装してみる。

    case Event(removeItem: RemoveItem, _) =>
      stay applying removeItem andThen {
        // currentDataに対するイベントハンドラ(`afterTransitionDo`)を設定
        case ShoppingData.empty => goto(Looking)
      }
    

    しかし、実際にはandThen内のgotostayでは状態遷移させることが出来なかった...。 解決するにはwhenに対するハンドラとしてそれを実装しなければならない。

    when(Shopping) {
      ...
      case Event(removeItem: RemoveItem, ShoppingData.empty) =>
        // currentDataがemptyなら許可されていない処理
        throw new IllegalStateException("ShoppingCart is Empty")
    
      case Event(removeItem @ RemoveItem(item), ShoppingData(items)) if items.size == 1 && (items contains item) =>
        // 1つしかないitemがremoveされる場合
        gotoLogging(Looking) applying removeItem
    
      case Event(removeItem: RemoveItem, _) =>
        // 通常の動作
        stayLogging applying removeItem
      ...
    }
    

    何のためのandThenなのか...と言いたくなるが、 恐らく状態遷移を促すようなドメインイベントをメッセージとして受け取らずに内部的に状態遷移すると、永続化してあるメッセージを replay しても状態の復元が出来なくなってしまうからではないかと思われる。

    今回のようなケースではwhenのハンドラで事前に検査するしかない(はず)。 ShoppingからLookingに戻すようなイベントを定義してself!(tell)するのも考えられるが、メッセージが直後に処理されるとは限らないため上手くいかないケースも多い。

    Recovery

    Akka-Persistence で大事な状態の復元。 PersistentFSMの場合、Actor 起動時に Snapshot と journal から自動で状態を recovery してくれる。 普通のPersistentActorと同様に、最新の Snapshot を適用した後に、その Snapshot 採取後のイベントを journal から読みだしてそれも適用してくれる。 recovery が完了したらコールバックとしてonRecoveryCompletedが実行される。

    from: https://qiita.com/petitviolet/items/604fe55dc85a4ead5c0c