blog.petitviolet.net

Akka-PersistenceのserializerとしてKryoを使う

2017-01-23

QiitaScalaAkkaactor

とりあえず使えるようにする

Akka-Persistence で Kryo をとりあえず使えるようにする設定については以前書いた。 Akka-Persistence コトハジメ#serializer として kryo を使う

serializer 選択について

Akka-Persistence のPersistentActor#persistで永続化するメッセージは serialize してから永続化している。 その際に使用できる Serializer を公式が提示している。 External Akka Serializers

以下のページでも serializer 選択は大事だと強調しており、こちらでは Google Protobuf/Apache Thrift/Apache Avro が提示されている。 Picking the right serialization format

Protobuf は何となく使うのがめんどくさそうなイメージがあったので楽そうな Kryo を使う。

他にも JSON フォーマットな Serializer 実装としてscalapenos/staminaがある。

serializer のカスタマイズ方法

serializer のカスタマイズ方法は 2 パターンありそうだった。

  • akka のやり方に自分で従うパターン
  • romix/akka-kryo-serializationをがっつり使うパターン

    • kryo 以外についてはここでは触れない

それぞれやってみる。

serialize(persist)する型

以下のサンプルで使う型。 Todo アプリにおいて新しくTaskを追加するRegisterメッセージを Akka-Persistence でpersistする対象とする。

sealed trait CommandEvent
case class Register(task: Task) extends CommandEvent

Taskの定義はこんな感じでidtitlestateを持つ。 若干 DDD っぽくそれぞれにちゃんと型を用意した。

case class Task(id: TaskId, title: TaskTitle, state: TaskState = TaskState.Todo)

case class TaskId(value: String) extends AnyVal
case class TaskTitle(value: String) extends AnyVal

// enumlike
sealed abstract class TaskState(val value: Int) extends Serializable
object TaskState {
  case object Completed extends TaskState(1)
  case object Todo extends TaskState(0)

  private val values = Completed :: Todo :: Nil
  // [[TaskState]] factory
  val from: Int => TaskState = n => values.find(_.value == n).get
}

akka のやり方に従って独自 serializer を実装する

Customizationを参照。 akka.serialization.Serializerを実装してやれば良い。

twitter/chill を使う

Kryo を直接使うのではなぜかうまくいかなかったので、twitter/chillを利用する。 参考:takezoux2 のブログ: chill を使ってクラス定義変更に強いシリアライズを実現

build.sbt に依存を追加。

libraryDependencies += "com.twitter" %% "chill-bijection" % "0.8.0"

twitter/chill を使ってシンプルにRegisterSerializerを実装する。

class RegisterKryoSerializer extends akka.serialization.Serializer {
  private val CLAZZ = classOf[Register]

  override def identifier: Int = 1000

  override def includeManifest: Boolean = true

  override def toBinary(o: AnyRef): Array[Byte] = {
    // これではうまくいかなかった
    // val kryo = new Kryo()
    // val baos = new ByteArrayOutputStream()
    // kryo.writeObject(new Output(baos), o)
    // baos.toByteArray

    KryoInjection.apply(o)
  }

  override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
    manifest match {
      case Some(CLAZZ) =>
        // これではうまくいかなかった
        // val kryo = new Kryo()
        // val input = new Input(new ByteArrayInputStream(bytes))
        // kryo.readObject(input, CLAZZ)
        KryoInjection.invert(bytes).get
      case _ => sys.error(s"unknown manifest: $manifest")
    }
  }
}

このようにKryoInjection#applyKryoInjection#invertを使ったらSerializerはお手軽に実装できる。 しかしこれではKryoInjectionに serialize/deserialize を任せっきりになってしまい、そもそもやりたかったカスタマイズが出来ない。 なので、次のセクションでうまくいったもう一つの方法について書く。

コメントにも書いたように自分なりにArray[Byte]Registerの serialize/deserialize を実装することがうまく出来なかった。 その際の StackTrace には以下のようなものが出力される。

[ERROR] swallowing exception during message send
com.esotericsoftware.kryo.KryoException: Buffer underflow.
 at com.esotericsoftware.kryo.io.Input.require(Input.java:199)
 at com.esotericsoftware.kryo.io.Input.readString(Input.java:470)
 ...

これが解決出来たら教えて下さい…。

romix/akka-kryo-serialization を使う

こちらの場合はcom.esotericsoftware.kryo.Serializerを実装すればよい。

// (`accesptsNull` = false, `immutable` = true)
abstract class KryoSerializerBase[T] extends com.esotericsoftware.kryo.Serializer[T](false, true)

class RegisterKryoSerializer extends KryoSerializerBase[Register] {
  override def write(kryo: Kryo, output: Output, `object`: Register): Unit = {
    output.writeString(`object`.task.id.value)
    output.writeString(`object`.task.name.value)
    output.writeInt(`object`.task.state.value)
  }

  override def read(kryo: Kryo, input: Input, `type`: Class[Register]): Register = {
    val taskId = TaskId(input.readString)
    val taskTitle = TaskTitle(input.readString)
    val taskState = TaskState.from(input.readInt)
    Register(Task(taskId, taskTitle, taskState))
  }
}

ここで実装したRegisterKryoSerializerを使うためには以下の様に実行できればいいが、 kryoインスタンスをどうやって手に入れるかが難しい。

kryo.addDefaultSerializer(classOf[Register], classOf[RegisterKryoSerializer])

ここにあるように Kryo の initializer を独自に実装して初期化処理に追加すればよい。 How to create a custom initializer for Kryo

initializer を用意する

extends する trait などはなく、customize(kryo: Kryo)なメソッドを持つ class を実装するだけ。

リフレクションを使って実現されていてこの辺りにその実装がある

kryoインスタンスのaddDefaultSerializerを実行するだけの class を用意した。

class CustomKryoSerializerInitializer {
  def customize(kryo: Kryo) = {
    kryo.setDefaultSerializer(classOf[CompatibleFieldSerializer[Any]])
    kryo.addDefaultSerializer(classOf[Register], classOf[RegisterKryoSerializer])
    println(s"after - ${kryo.getSerializer(classOf[Register])}")
  }
}

initializer を conf に追加する

application.conf に追加する。 該当箇所はこんな感じ。

extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]

akka {
  actor {
    serializers {
      java = "akka.serialization.JavaSerializer"
      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
    }
    warn-about-java-serializer-usage = false
    serialization-bindings {
      "java.io.Serializable" = java
      "net.petitviolet.ex.persistence.task.actor.Register" = kryo
    }
    kryo {
      type = "graph"
      idstrategy = "default"
      kryo-trace = true
      // ここ!
      kryo-custom-serializer-init = "net.petitviolet.ex.persistence.task.model.support.CustomKryoSerializerInitializer"
    }
  }
}

これでアプリケーションを起動すると以下のようなログが出力される。

[info] 00:00 TRACE: [kryo] Registration required: false [info] 00:00 TRACE: [kryo] Register class name: net.petitviolet.ex.persistence.task.actor.Register (net.petitviolet.ex.persistence.task.model.support.RegisterKryoSerializer) [info] after - net.petitviolet.ex.persistence.task.model.support.RegisterKryoSerializer@550c6251

kryo-custom-serializer-initに用意した initializer を設定してやればcustomizeが実行されて、Registerに対する serializer を登録する事が出来た。

感想

Akka-Persistence を使うにあたって恐らく重要な意思決定をしなければならない Serializer の選択。 プロダクト開発においてはpersistの対象となるイベントもリファクタリングしていくことが想定されるため、 先まで見据えて Serializer は導入段階から精査してきっちり実装していくことが必要になりそう。

ちなみにイベントのリファクタリングについての Akka 公式な記事。 Persistence - Schema Evolution 本当はここまでやりたかったが、Akka-Persistence の Serializer として Kryo は向いてないんじゃないかというところで挫折。 イベントのバージョニングに対応した Kryo な Serializer 実装はあるんだろうか…?

from: https://qiita.com/petitviolet/items/9cfe7e30b9108620eae4