Akka-PersistenceのserializerとしてKryoを使う
2017-01-23
QiitaScalaAkkaactorとりあえず使えるようにする
Akka-Persistence で Kryo をとりあえず使えるようにする設定については以前書いた。 Akka-Persistence コトハジメ#serializer として kryo を使う
serializer 選択について
Akka-Persistence のPersistentActor#persist
で永続化するメッセージは serialize してから永続化している。
その際に使用できる Serializer を公式が提示している。
External Akka Serializers
以下のページでも serializer 選択は大事だと強調しており、こちらでは Google Protobuf/Apache Thrift/Apache Avro が提示されている。 Picking the right serialization format
Protobuf は何となく使うのがめんどくさそうなイメージがあったので楽そうな Kryo を使う。
他にも JSON フォーマットな Serializer 実装としてscalapenos/staminaがある。
serializer のカスタマイズ方法
serializer のカスタマイズ方法は 2 パターンありそうだった。
- akka のやり方に自分で従うパターン
- romix/akka-kryo-serializationをがっつり使うパターン
- kryo 以外についてはここでは触れない
それぞれやってみる。
serialize(persist)する型
以下のサンプルで使う型。
Todo アプリにおいて新しくTask
を追加するRegister
メッセージを Akka-Persistence でpersist
する対象とする。
sealed trait CommandEvent
case class Register(task: Task) extends CommandEvent
Task
の定義はこんな感じでid
とtitle
とstate
を持つ。
若干 DDD っぽくそれぞれにちゃんと型を用意した。
case class Task(id: TaskId, title: TaskTitle, state: TaskState = TaskState.Todo)
case class TaskId(value: String) extends AnyVal
case class TaskTitle(value: String) extends AnyVal
// enumlike
sealed abstract class TaskState(val value: Int) extends Serializable
object TaskState {
case object Completed extends TaskState(1)
case object Todo extends TaskState(0)
private val values = Completed :: Todo :: Nil
// [[TaskState]] factory
val from: Int => TaskState = n => values.find(_.value == n).get
}
akka のやり方に従って独自 serializer を実装する
Customizationを参照。
akka.serialization.Serializer
を実装してやれば良い。
twitter/chill を使う
Kryo を直接使うのではなぜかうまくいかなかったので、twitter/chillを利用する。 参考:takezoux2 のブログ: chill を使ってクラス定義変更に強いシリアライズを実現
build.sbt に依存を追加。
libraryDependencies += "com.twitter" %% "chill-bijection" % "0.8.0"
twitter/chill を使ってシンプルにRegister
のSerializer
を実装する。
class RegisterKryoSerializer extends akka.serialization.Serializer {
private val CLAZZ = classOf[Register]
override def identifier: Int = 1000
override def includeManifest: Boolean = true
override def toBinary(o: AnyRef): Array[Byte] = {
// これではうまくいかなかった
// val kryo = new Kryo()
// val baos = new ByteArrayOutputStream()
// kryo.writeObject(new Output(baos), o)
// baos.toByteArray
KryoInjection.apply(o)
}
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
manifest match {
case Some(CLAZZ) =>
// これではうまくいかなかった
// val kryo = new Kryo()
// val input = new Input(new ByteArrayInputStream(bytes))
// kryo.readObject(input, CLAZZ)
KryoInjection.invert(bytes).get
case _ => sys.error(s"unknown manifest: $manifest")
}
}
}
このようにKryoInjection#apply
とKryoInjection#invert
を使ったらSerializer
はお手軽に実装できる。
しかしこれではKryoInjection
に serialize/deserialize を任せっきりになってしまい、そもそもやりたかったカスタマイズが出来ない。
なので、次のセクションでうまくいったもう一つの方法について書く。
コメントにも書いたように自分なりにArray[Byte]
とRegister
の serialize/deserialize を実装することがうまく出来なかった。
その際の StackTrace には以下のようなものが出力される。
[ERROR] swallowing exception during message send
com.esotericsoftware.kryo.KryoException: Buffer underflow.
at com.esotericsoftware.kryo.io.Input.require(Input.java:199)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:470)
...
これが解決出来たら教えて下さい...。
romix/akka-kryo-serialization を使う
こちらの場合はcom.esotericsoftware.kryo.Serializer
を実装すればよい。
// (`accesptsNull` = false, `immutable` = true)
abstract class KryoSerializerBase[T] extends com.esotericsoftware.kryo.Serializer[T](false, true)
class RegisterKryoSerializer extends KryoSerializerBase[Register] {
override def write(kryo: Kryo, output: Output, `object`: Register): Unit = {
output.writeString(`object`.task.id.value)
output.writeString(`object`.task.name.value)
output.writeInt(`object`.task.state.value)
}
override def read(kryo: Kryo, input: Input, `type`: Class[Register]): Register = {
val taskId = TaskId(input.readString)
val taskTitle = TaskTitle(input.readString)
val taskState = TaskState.from(input.readInt)
Register(Task(taskId, taskTitle, taskState))
}
}
ここで実装したRegisterKryoSerializer
を使うためには以下の様に実行できればいいが、
kryo
インスタンスをどうやって手に入れるかが難しい。
kryo.addDefaultSerializer(classOf[Register], classOf[RegisterKryoSerializer])
ここにあるように Kryo の initializer を独自に実装して初期化処理に追加すればよい。 How to create a custom initializer for Kryo
initializer を用意する
extends する trait などはなく、customize(kryo: Kryo)
なメソッドを持つ class を実装するだけ。
リフレクションを使って実現されていてこの辺りにその実装がある。
kryo
インスタンスのaddDefaultSerializer
を実行するだけの class を用意した。
class CustomKryoSerializerInitializer {
def customize(kryo: Kryo) = {
kryo.setDefaultSerializer(classOf[CompatibleFieldSerializer[Any]])
kryo.addDefaultSerializer(classOf[Register], classOf[RegisterKryoSerializer])
println(s"after - ${kryo.getSerializer(classOf[Register])}")
}
}
initializer を conf に追加する
application.conf に追加する。 該当箇所はこんな感じ。
extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]
akka {
actor {
serializers {
java = "akka.serialization.JavaSerializer"
kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
}
warn-about-java-serializer-usage = false
serialization-bindings {
"java.io.Serializable" = java
"net.petitviolet.ex.persistence.task.actor.Register" = kryo
}
kryo {
type = "graph"
idstrategy = "default"
kryo-trace = true
// ここ!
kryo-custom-serializer-init = "net.petitviolet.ex.persistence.task.model.support.CustomKryoSerializerInitializer"
}
}
}
これでアプリケーションを起動すると以下のようなログが出力される。
[info] 00:00 TRACE: [kryo] Registration required: false [info] 00:00 TRACE: [kryo] Register class name: net.petitviolet.ex.persistence.task.actor.Register (net.petitviolet.ex.persistence.task.model.support.RegisterKryoSerializer) [info] after - net.petitviolet.ex.persistence.task.model.support.RegisterKryoSerializer@550c6251
kryo-custom-serializer-init
に用意した initializer を設定してやればcustomize
が実行されて、Register
に対する serializer を登録する事が出来た。
感想
Akka-Persistence を使うにあたって恐らく重要な意思決定をしなければならない Serializer の選択。
プロダクト開発においてはpersist
の対象となるイベントもリファクタリングしていくことが想定されるため、
先まで見据えて Serializer は導入段階から精査してきっちり実装していくことが必要になりそう。
ちなみにイベントのリファクタリングについての Akka 公式な記事。 Persistence - Schema Evolution 本当はここまでやりたかったが、Akka-Persistence の Serializer として Kryo は向いてないんじゃないかというところで挫折。 イベントのバージョニングに対応した Kryo な Serializer 実装はあるんだろうか...?
from: https://qiita.com/petitviolet/items/9cfe7e30b9108620eae4