Akka-Persistenceコトハジメ
2016-08-28
QiitaScalaAkkaactorAkka-Persistenceは stateful な actor の内部状態を永続化することによって、生成時、再開時、migration 時などに状態を復旧することを可能とするもの。
内部状態に至るまでのイベント(メッセージ)を永続化しておくことで、そのイベントを再生すれば全く同じ状態を再現させることが出来るため、CQRS/ES の文脈で最近はよく出てきている(気がする)。
build.sbt
まずは build.sbt に依存を追加する。 バージョンは適宜変更すること。(Release Versions) サンプルでの永続化先としてLevelDB を利用する。
val akkaVersion = "2.4.9-RC2"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"org.iq80.leveldb" % "leveldb" % "0.7",
"org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"
)
fork := true
Akka-Persistence をとりあえず使う
基本的にはakka.actor.Actor
と同様な使い方が出来て、akka.persistence.PersistentActor
を extends していくつかメソッドを実装すれば良い。
状態とメッセージを定義する
まず Actor に送るためのメッセージと、Actor の内部状態となるクラス群を用意する。
今回は内部状態としてSeq[Int]
を持つこととする。
// メッセージ
private sealed trait MyCommand
private case object MySnapshot extends MyCommand
private case object MyPrint extends MyCommand
private case class AppendCommand(data: Int) extends MyCommand
// 内部状態
private case class MyState(events: Seq[Int] = Seq.empty) {
def updated(evt: AppendCommand) = copy(evt.data +: events)
def state = toString
override def toString: String = events.reverse.mkString(" :: ")
}
PersistentActor を実装する
定義した内部状態を保持し、メッセージを受け付ける Actor をPersistentActor
を extends しつつ実装する。
private class ExamplePersistentActor extends PersistentActor {
// メッセージを永続化する際のID
override def persistenceId = "example-id"
// 内部状態
private var state = MyState()
// 状態を復元する際に実行される
override def receiveRecover: Receive = {
case command: AppendCommand =>
// メッセージからの復元
state = state.updated(command)
case SnapshotOffer(_, snapshot: MyState) =>
// Snapshotからの復元
state = snapshot
}
// Actorのreceiveにあたるもの。何かしらのcommandに対する処理
override def receiveCommand: Receive = {
case command: AppendCommand =>
// メッセージを永続化している
persist(command) { _command =>
state = state.updated(_command)
}
case MySnapshot => saveSnapshot(state)
case MyPrint => println(state.state)
}
}
実装としてはおおよそこれだけで良い。
実行してみる
とりあえず Actor を実行するための App を用意する。
object PersistentActorExample extends App {
val system = ActorSystem("PersistentActorExample")
val persistentActor = system.actorOf(Props[ExamplePersistentActor], "my-example")
// send messages to target actor
persistentActor ! AppendCommand(-1)
persistentActor ! MySnapshot
persistentActor ! AppendCommand(3)
persistentActor ! MyPrint
Thread.sleep(1000)
system.terminate()
}
これだけで実行すると以下のようにakka/reference.confを見ろ、というエラーメッセージが吐かれて永続化されない。
conf を追加する
必要っぽい conf を適当に application.conf に記述する。
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
// 永続化用ファイルの置き場所
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
// build.sbtに`fork := true`を書かないなら必要
akka.persistence.journal.leveldb.native = off
これで繰り返し実行してみると[info] -1 :: 3 :: -1 :: 3 :: -1 :: 3
のように表示されて、永続化に成功していることが分かる。
Serializer について
ドキュメントはこちら Serialization — Akka Documentation
先ほどの App をsbt run
してみると以下の様な warn が出力される。
Using the default Java serializer for class [net.petitviolet.ex.persistence.practice.AppendCommand] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
デフォルトで使用される Java の serializer はパフォーマンス上おすすめ出来ないとのこと。 とりあえず warn を抑制したければ、メッセージにならって application.conf を修正すればよい。
akka.actor.warn-about-java-serializer-usage = false
serializer として kryo を使う
Java の serializer 以外の選択肢として protocol buffer や kryo が候補として挙げられる。(自作も可) protocol buffer は設定が面倒らしいので、kryo を使ってみる。
まず build.sbt のlibraryDependencies
に kryo な serializer を追加する。
libraryDependencies ++= Seq(
...
"com.github.romix.akka" %% "akka-kryo-serialization" % "0.4.1",
...
)
次に application.conf に serializer 周りの設定を追加。
akka {
actor {
warn-about-java-serializer-usage = true
serializers {
// `kryo`という名前で`KryoSerializer`を指定
kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
}
serialization-bindings {
// kryoでserializeする対象となるクラスを指定する
"net.petitviolet.ex.persistence.practice.AppendCommand" = kryo
"net.petitviolet.ex.persistence.practice.MyState" = kryo
}
kryo {
type = "graph"
idstrategy = "default"
}
}
}
これで kryo を使った serialize が出来る。
akka.actor.kryo
の設定については以下を参照。
Configuration of akka-kryo-serialization
from: https://qiita.com/petitviolet/items/92826357bd10aace14f5