petitviolet blog

    Akka-PersistenceのserializerとしてKryoを使う

    2017-01-23

    QiitaScalaAkkaactor

    とりあえず使えるようにする

    Akka-Persistence で Kryo をとりあえず使えるようにする設定については以前書いた。 Akka-Persistence コトハジメ#serializer として kryo を使う

    serializer 選択について

    Akka-Persistence のPersistentActor#persistで永続化するメッセージは serialize してから永続化している。 その際に使用できる Serializer を公式が提示している。 External Akka Serializers

    以下のページでも serializer 選択は大事だと強調しており、こちらでは Google Protobuf/Apache Thrift/Apache Avro が提示されている。 Picking the right serialization format

    Protobuf は何となく使うのがめんどくさそうなイメージがあったので楽そうな Kryo を使う。

    他にも JSON フォーマットな Serializer 実装としてscalapenos/staminaがある。

    serializer のカスタマイズ方法

    serializer のカスタマイズ方法は 2 パターンありそうだった。

    • akka のやり方に自分で従うパターン
    • romix/akka-kryo-serializationをがっつり使うパターン
      • kryo 以外についてはここでは触れない

    それぞれやってみる。

    serialize(persist)する型

    以下のサンプルで使う型。 Todo アプリにおいて新しくTaskを追加するRegisterメッセージを Akka-Persistence でpersistする対象とする。

    sealed trait CommandEvent
    case class Register(task: Task) extends CommandEvent
    

    Taskの定義はこんな感じでidtitlestateを持つ。 若干 DDD っぽくそれぞれにちゃんと型を用意した。

    case class Task(id: TaskId, title: TaskTitle, state: TaskState = TaskState.Todo)
    
    case class TaskId(value: String) extends AnyVal
    case class TaskTitle(value: String) extends AnyVal
    
    // enumlike
    sealed abstract class TaskState(val value: Int) extends Serializable
    object TaskState {
      case object Completed extends TaskState(1)
      case object Todo extends TaskState(0)
    
      private val values = Completed :: Todo :: Nil
      // [[TaskState]] factory
      val from: Int => TaskState = n => values.find(_.value == n).get
    }
    

    akka のやり方に従って独自 serializer を実装する

    Customizationを参照。 akka.serialization.Serializerを実装してやれば良い。

    twitter/chill を使う

    Kryo を直接使うのではなぜかうまくいかなかったので、twitter/chillを利用する。 参考:takezoux2 のブログ: chill を使ってクラス定義変更に強いシリアライズを実現

    build.sbt に依存を追加。

    libraryDependencies += "com.twitter" %% "chill-bijection" % "0.8.0"
    

    twitter/chill を使ってシンプルにRegisterSerializerを実装する。

    class RegisterKryoSerializer extends akka.serialization.Serializer {
      private val CLAZZ = classOf[Register]
    
      override def identifier: Int = 1000
    
      override def includeManifest: Boolean = true
    
      override def toBinary(o: AnyRef): Array[Byte] = {
        // これではうまくいかなかった
        // val kryo = new Kryo()
        // val baos = new ByteArrayOutputStream()
        // kryo.writeObject(new Output(baos), o)
        // baos.toByteArray
    
        KryoInjection.apply(o)
      }
    
      override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
        manifest match {
          case Some(CLAZZ) =>
            // これではうまくいかなかった
            // val kryo = new Kryo()
            // val input = new Input(new ByteArrayInputStream(bytes))
            // kryo.readObject(input, CLAZZ)
            KryoInjection.invert(bytes).get
          case _ => sys.error(s"unknown manifest: $manifest")
        }
      }
    }
    

    このようにKryoInjection#applyKryoInjection#invertを使ったらSerializerはお手軽に実装できる。 しかしこれではKryoInjectionに serialize/deserialize を任せっきりになってしまい、そもそもやりたかったカスタマイズが出来ない。 なので、次のセクションでうまくいったもう一つの方法について書く。

    コメントにも書いたように自分なりにArray[Byte]Registerの serialize/deserialize を実装することがうまく出来なかった。 その際の StackTrace には以下のようなものが出力される。

    [ERROR] swallowing exception during message send
    com.esotericsoftware.kryo.KryoException: Buffer underflow.
     at com.esotericsoftware.kryo.io.Input.require(Input.java:199)
     at com.esotericsoftware.kryo.io.Input.readString(Input.java:470)
     ...
    

    これが解決出来たら教えて下さい...。

    romix/akka-kryo-serialization を使う

    こちらの場合はcom.esotericsoftware.kryo.Serializerを実装すればよい。

    // (`accesptsNull` = false, `immutable` = true)
    abstract class KryoSerializerBase[T] extends com.esotericsoftware.kryo.Serializer[T](false, true)
    
    class RegisterKryoSerializer extends KryoSerializerBase[Register] {
      override def write(kryo: Kryo, output: Output, `object`: Register): Unit = {
        output.writeString(`object`.task.id.value)
        output.writeString(`object`.task.name.value)
        output.writeInt(`object`.task.state.value)
      }
    
      override def read(kryo: Kryo, input: Input, `type`: Class[Register]): Register = {
        val taskId = TaskId(input.readString)
        val taskTitle = TaskTitle(input.readString)
        val taskState = TaskState.from(input.readInt)
        Register(Task(taskId, taskTitle, taskState))
      }
    }
    

    ここで実装したRegisterKryoSerializerを使うためには以下の様に実行できればいいが、 kryoインスタンスをどうやって手に入れるかが難しい。

    kryo.addDefaultSerializer(classOf[Register], classOf[RegisterKryoSerializer])
    

    ここにあるように Kryo の initializer を独自に実装して初期化処理に追加すればよい。 How to create a custom initializer for Kryo

    initializer を用意する

    extends する trait などはなく、customize(kryo: Kryo)なメソッドを持つ class を実装するだけ。

    リフレクションを使って実現されていてこの辺りにその実装がある

    kryoインスタンスのaddDefaultSerializerを実行するだけの class を用意した。

    class CustomKryoSerializerInitializer {
      def customize(kryo: Kryo) = {
        kryo.setDefaultSerializer(classOf[CompatibleFieldSerializer[Any]])
        kryo.addDefaultSerializer(classOf[Register], classOf[RegisterKryoSerializer])
        println(s"after - ${kryo.getSerializer(classOf[Register])}")
      }
    }
    

    initializer を conf に追加する

    application.conf に追加する。 該当箇所はこんな感じ。

    extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]
    
    akka {
      actor {
        serializers {
          java = "akka.serialization.JavaSerializer"
          kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
        }
        warn-about-java-serializer-usage = false
        serialization-bindings {
          "java.io.Serializable" = java
          "net.petitviolet.ex.persistence.task.actor.Register" = kryo
        }
        kryo {
          type = "graph"
          idstrategy = "default"
          kryo-trace = true
          // ここ!
          kryo-custom-serializer-init = "net.petitviolet.ex.persistence.task.model.support.CustomKryoSerializerInitializer"
        }
      }
    }
    

    これでアプリケーションを起動すると以下のようなログが出力される。

    [info] 00:00 TRACE: [kryo] Registration required: false [info] 00:00 TRACE: [kryo] Register class name: net.petitviolet.ex.persistence.task.actor.Register (net.petitviolet.ex.persistence.task.model.support.RegisterKryoSerializer) [info] after - net.petitviolet.ex.persistence.task.model.support.RegisterKryoSerializer@550c6251

    kryo-custom-serializer-initに用意した initializer を設定してやればcustomizeが実行されて、Registerに対する serializer を登録する事が出来た。

    感想

    Akka-Persistence を使うにあたって恐らく重要な意思決定をしなければならない Serializer の選択。 プロダクト開発においてはpersistの対象となるイベントもリファクタリングしていくことが想定されるため、 先まで見据えて Serializer は導入段階から精査してきっちり実装していくことが必要になりそう。

    ちなみにイベントのリファクタリングについての Akka 公式な記事。 Persistence - Schema Evolution 本当はここまでやりたかったが、Akka-Persistence の Serializer として Kryo は向いてないんじゃないかというところで挫折。 イベントのバージョニングに対応した Kryo な Serializer 実装はあるんだろうか...?

    from: https://qiita.com/petitviolet/items/9cfe7e30b9108620eae4