petitviolet blog

    Akka-Persistenceコトハジメ

    2016-08-28

    QiitaScalaAkkaactor

    Akka-Persistenceは stateful な actor の内部状態を永続化することによって、生成時、再開時、migration 時などに状態を復旧することを可能とするもの。

    内部状態に至るまでのイベント(メッセージ)を永続化しておくことで、そのイベントを再生すれば全く同じ状態を再現させることが出来るため、CQRS/ES の文脈で最近はよく出てきている(気がする)。

    build.sbt

    まずは build.sbt に依存を追加する。 バージョンは適宜変更すること。(Release Versions) サンプルでの永続化先としてLevelDB を利用する

    build.sbt
    val akkaVersion = "2.4.9-RC2"
    
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-actor" % akkaVersion,
      "com.typesafe.akka" %% "akka-persistence" % akkaVersion,
    
      "org.iq80.leveldb"  % "leveldb"          % "0.7",
      "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"
    )
    
    fork := true
    

    Akka-Persistence をとりあえず使う

    基本的にはakka.actor.Actorと同様な使い方が出来て、akka.persistence.PersistentActorを extends していくつかメソッドを実装すれば良い。

    状態とメッセージを定義する

    まず Actor に送るためのメッセージと、Actor の内部状態となるクラス群を用意する。 今回は内部状態としてSeq[Int]を持つこととする。

    // メッセージ
    private sealed trait MyCommand
    private case object MySnapshot extends MyCommand
    private case object MyPrint extends MyCommand
    private case class AppendCommand(data: Int) extends MyCommand
    
    // 内部状態
    private case class MyState(events: Seq[Int] = Seq.empty) {
      def updated(evt: AppendCommand) = copy(evt.data +: events)
      def state = toString
      override def toString: String = events.reverse.mkString(" :: ")
    }
    

    PersistentActor を実装する

    定義した内部状態を保持し、メッセージを受け付ける Actor をPersistentActorを extends しつつ実装する。

    private class ExamplePersistentActor extends PersistentActor {
      // メッセージを永続化する際のID
      override def persistenceId = "example-id"
    
      // 内部状態
      private var state = MyState()
    
      // 状態を復元する際に実行される
      override def receiveRecover: Receive = {
        case command: AppendCommand =>
          // メッセージからの復元
          state = state.updated(command)
        case SnapshotOffer(_, snapshot: MyState) =>
          // Snapshotからの復元
          state = snapshot
      }
    
      // Actorのreceiveにあたるもの。何かしらのcommandに対する処理
      override def receiveCommand: Receive = {
        case command: AppendCommand  =>
          // メッセージを永続化している
          persist(command) { _command =>
            state = state.updated(_command)
          }
        case MySnapshot => saveSnapshot(state)
        case MyPrint    => println(state.state)
      }
    
    }
    

    実装としてはおおよそこれだけで良い。

    実行してみる

    とりあえず Actor を実行するための App を用意する。

    object PersistentActorExample extends App {
    
      val system = ActorSystem("PersistentActorExample")
      val persistentActor = system.actorOf(Props[ExamplePersistentActor], "my-example")
    
      // send messages to target actor
      persistentActor ! AppendCommand(-1)
      persistentActor ! MySnapshot
      persistentActor ! AppendCommand(3)
      persistentActor ! MyPrint
    
      Thread.sleep(1000)
      system.terminate()
    }
    

    これだけで実行すると以下のようにakka/reference.confを見ろ、というエラーメッセージが吐かれて永続化されない。

    conf を追加する

    必要っぽい conf を適当に application.conf に記述する。

    akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
    akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
    
    // 永続化用ファイルの置き場所
    akka.persistence.journal.leveldb.dir = "target/example/journal"
    akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
    
    // build.sbtに`fork := true`を書かないなら必要
    akka.persistence.journal.leveldb.native = off
    

    これで繰り返し実行してみると[info] -1 :: 3 :: -1 :: 3 :: -1 :: 3のように表示されて、永続化に成功していることが分かる。

    Serializer について

    ドキュメントはこちら Serialization — Akka Documentation

    先ほどの App をsbt runしてみると以下の様な warn が出力される。

    Using the default Java serializer for class [net.petitviolet.ex.persistence.practice.AppendCommand] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
    

    デフォルトで使用される Java の serializer はパフォーマンス上おすすめ出来ないとのこと。 とりあえず warn を抑制したければ、メッセージにならって application.conf を修正すればよい。

    akka.actor.warn-about-java-serializer-usage = false
    

    serializer として kryo を使う

    Java の serializer 以外の選択肢として protocol buffer や kryo が候補として挙げられる。(自作も可) protocol buffer は設定が面倒らしいので、kryo を使ってみる。

    まず build.sbt のlibraryDependenciesに kryo な serializer を追加する。

    libraryDependencies ++= Seq(
      ...
      "com.github.romix.akka" %% "akka-kryo-serialization" % "0.4.1",
      ...
    )
    

    次に application.conf に serializer 周りの設定を追加。

    akka {
      actor {
        warn-about-java-serializer-usage = true
    
        serializers {
          // `kryo`という名前で`KryoSerializer`を指定
          kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
        }
    
        serialization-bindings {
          // kryoでserializeする対象となるクラスを指定する
          "net.petitviolet.ex.persistence.practice.AppendCommand" = kryo
          "net.petitviolet.ex.persistence.practice.MyState" = kryo
        }
    
        kryo {
          type = "graph"
          idstrategy = "default"
        }
      }
    }
    

    これで kryo を使った serialize が出来る。 akka.actor.kryoの設定については以下を参照。 Configuration of akka-kryo-serialization

    from: https://qiita.com/petitviolet/items/92826357bd10aace14f5