petitviolet blog

    Akka-StreamのRunnableGraphの構築パターン

    2016-10-22

    QiitaScalaAkkaactorAkka-Stream

    Akka-Streamでアプリケーションを実装する場合、 Source と Flow と Sink を組み合わせてRunnableGraphを構築し、runして実行する。

    本記事では Akka-Stream の部品のメインとなる Source, Flow, Sink を、 Akka-ActorおよびActorPublisher, ActorSubscriberを使う場合/使わない場合でシンプルに実装してみる。

    Actor を使わずに RunnableGraph を構築する

    まず、Actor を使わない Akka-Stream なアプリケーションを実装する。 コード全体はこちら AkkaStreamStandard.scala

    今回取り扱うデータ型はこれだけ。

    case class Message(value: String) extends AnyVal
    

    このMessageを追加されたら加工しながら出力する graph を実装する。

    Source

    後からデータを追加できるようにSourceQueueWithCompleteを使う。 Source.queue[T]で楽に作れて便利。

    val source: Source[Message, SourceQueueWithComplete[Message]] =
      Source.queue[Message](100, OverflowStrategy.backpressure)
    

    Flow

    Messageを受け取って値を書き換えるものと、畳み込んでStringにする 2 つの Flow。 Flow[T].mapFlow[T].foldで実装する。

    val flow: Flow[Message, Message, NotUsed] =
      Flow[Message].map { r => r.copy(value = s"(Mapped: ${r.value})") }
    
    val accumulater: Flow[Message, String, NotUsed] =
      Flow[Message].fold("init") { (acc, rep) => s"$acc :: ${rep.value}" }
    

    Sink

    先程の Flow で作成したStringprintlnするだけの簡単な Sink。

    val sink: Sink[String, Future[Done]] = Sink.foreach[String] { println }
    

    Graph

    上で作ったSource, Flow, Sinkを組み合わせてRunnableGraphを構築する。 viatoでつなげるだけ。

    // simple graph
    val graph: RunnableGraph[SourceQueueWithComplete[Message]] =
      source via flow via accumulater to sink
    

    実行

    上のgraphrunを呼んだら手に入るqueueに対してofferでメッセージを詰めると stream を流れていく。 completeで終了。

    val queue: SourceQueueWithComplete[Message] = graph.run()
    
    queue offer Message("hello!")
    queue offer Message("100")
    queue offer Message("good")
    queue complete
    

    実行するとこんなログが出る。

    [info] init :: (Mapped: hello!) :: (Mapped: 100) :: (Mapped: good)

    Actor{Publisher,Subscriber}を使って RunnableGraph を構築する

    コード全体はこちら AkkaStreamPracWithActor.scala

    使用するデータ型は、 先程のMessageと名前を変えたLetterqueue.completeに相当するメッセージとしてFinishを用意した。

    case class Letter(value: String) extends AnyVal
    case object Finish
    

    Source

    Source 用の Actor を実装

    Source に使用する Actor はActorPublisher[T]を実装すれば良い。 publish するメッセージはreceive内でonNext(msg)し、onCompleteで終了する。

    class PublishActor extends ActorPublisher[Letter] {
      // publish [[Letter]] or OnComplete
      override def receive: Actor.Receive = {
        case s: String =>
          onNext(Letter(s"Nice: $s"))
        case i: Int =>
          onNext(Letter(s"Great: ${i * 100}"))
        case Finish =>
          onComplete()
      }
    }
    

    ActorPublisher を使って Source を作る

    先程のPublishActorActorRefインスタンスをActorPublisher.applyPublisher[T]にしてから、 Source.fromPublisherで Source に変換する

    // publisher actor
    val publishActorRef = system.actorOf(Props[PublishActor])
    
    // source with actor
    val source: Source[Letter, NotUsed] = {
      val publisher: Publisher[Letter] = ActorPublisher(publishActorRef)
      Source.fromPublisher(publisher)
    }
    

    Flow

    Flow 用の Actor を実装

    これは普通にActorを extends した普通の Actor。 返り値を使いたいのでsenderにメッセージを送る。

    class FlowActor extends Actor {
      // subscribe and publish
      override def receive: Actor.Receive = {
        case Letter(msg) => sender() ! Letter(s"(Mapped: $msg)")
        case any        => println(s"??? => $any")
      }
    }
    

    Actor を使って Flow を作る

    StackOverFlowを参考に実装。 先程のFlowActorActorRefインスタンスにask(?)で得られるFuture[T]Flow.mapAsync[T]で Flow にするだけ。

    // flow
    val flow: Flow[Letter, Letter, NotUsed] = {
      import scala.concurrent.duration._
      implicit val timeout: Timeout = 1.second
      val flowActor = system.actorOf(Props[FlowActor])
      def flowWithActor(reply: Letter): Future[Letter] = (flowActor ? reply).mapTo[Letter]
    
      Flow[Letter].mapAsync[Letter](3)(flowWithActor)
    }
    
    // another flow without actor
    val accumulater: Flow[Letter, String, NotUsed] =
      Flow[Letter].fold("init") { (acc, rep) => s"$acc :: ${rep.value}" }
    

    Actor を使わない Flow もあわせて使ってみる。

    Sink

    Sink 用の Actor を実装

    ActorSubscriberを実装した Actor にする。 OnNextOnCompleteが送られてくるので、それをreceiveすればいい。

    class SubscribeActor extends ActorSubscriber {
      override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
    
      // just subscribe
      override def receive: Actor.Receive = {
        case OnNext(any) => println(s"subscribed: $any")
        case OnComplete  => println(s"finish process!")
      }
    }
    

    ActorSubscriber を使って Sink を作る

    先程のSubscribeActorActorRefActorSubscriber.applyしたものをSink.fromSubscriberで Source にする。

    // sink with actor
    val sink: Sink[String, NotUsed] = {
      val printActor = system.actorOf(Props[SubscribeActor])
      Sink.fromSubscriber[String](ActorSubscriber[String](printActor))
    }
    

    RunnableGraph

    これは特に変わらない。 viatoで Source, Flow, Sink をつなげればいい。

    // simple graph
    val graph: RunnableGraph[NotUsed] = source via flow via accumulater to sink
    

    実行

    RunnableGraph#runする。 そして、Source の元となったpublishActorRefに対してメッセージをtellすれば stream を流れていく。 runして即座にメッセージを流すとIllegalStateExceptionが出てしまうのでとりあえずThread.sleepしてごまかしている。

    graph.run
    // wait preparing graph
    Thread.sleep(100L)
    
    publishActorRef ! "hello!"
    publishActorRef ! 100
    publishActorRef ! "good"
    publishActorRef ! Finish
    

    実行するとこんなログが出る。

    [info] subscribed: init :: (Mapped: Nice: hello!) :: (Mapped: Great: 10000) :: (Mapped: Nice: good)

    ActorRef を使って RunnableGraph を構築する

    上記ではActorPublisher/ActorSubscriberを使って実装したが、次はSource.actorRef, Sink.actorRefを使ってみる。 コード全体はこちら AkkaStreamPracWithActorRef.scala

    Source

    Source.actorRefで Source を作る。

    // source with actorRef
    val source: Source[Message, ActorRef] = Source.actorRef[Message](65536, OverflowStrategy.fail)
    

    これだけ。 Source として使用するActorRefインスタンスを用意する必要すらない。 第一引数はメッセージバッファのサイズで、第二引数はそれが overflow した時の挙動。 ちなみにこの場合は BackPressure はサポートされていない。

    Flow

    先ほどと全く同じものを使うのでここでは省略。

    Sink

    Sink 用の Actor

    まず Sink 用の Actor を実装する。 ただprintlnするだけのシンプルな Actor。

    class SubscribeActor extends Actor {
      // just subscribe
      override def receive: Actor.Receive = {
        case Finish => println(s"finish process!")
        case any => println(s"subscribed: $any")
      }
    }
    

    Sink.actorRef で Sink を作る

    先程の Actor のActorRefインスタンスをSink.actorRefに渡すだけで良い。

    // subscriber
    val subscribeActor = system.actorOf(Props[SubscribeActor])
    // sink with actorRef
    val sink: Sink[Any, NotUsed] = Sink.actorRef(subscribeActor, onCompleteMessage = Finish)
    

    第二引数のonCompleteMessageは、Sink 内部のActorSubscriberに対してOnCompletetellされた時にこのActorRefに対してtellされる。 ActorRefSinkActor.scala

    RunnableGraph

    viatoで Source, Flow, Sink をつなげる点は同じ。 ただ、RunnableGraph[Mat]MatActorRefになる。

    // simple graph
    val graph: RunnableGraph[ActorRef] = source via flow via accumulator to sink
    

    実行

    先程のgraphに対してrunを実行するとActorRefが返ってくる。

    val sourceActorRef: ActorRef = graph.run
    

    これに対してメッセージをtellすればそのメッセージが Source として Stream を流れていく。

    // wait preparing graph
    Thread.sleep(100L)
    
    sourceActorRef ! Letter("hello!")
    sourceActorRef ! Letter("100")
    sourceActorRef ! Letter("good")
    
    // force complete upstream source
    sourceActorRef ! PoisonPill
    

    今回は途中の Flow でfoldを使っているため、上流となるsourceActorRefを終了させることで次に stream を無理やり流している。 実行結果は以下。

    [info] subscribed: init :: (Mapped: hello!) :: (Mapped: 100) :: (Mapped: good)

    まとめ

    RunnableGraph の構築に Actor を使わない, ActorPublisher/ActorSubscriber, ActorRef を使う 3 パターンで実装した。

    Actor を使うやり方だとそもそも Actor を多少知らないと使えない点ではコスト高にはなるが、 開発アーキテクチャのベースとして Actor を使用しているケースでは Stream でも統一して Actor を使えるというのは利点になるはず。

    ただ、Konrad さん(@ktosopl)StackOverFlow

    Please don't use ActorPublisher and ActorSubscriber. They're too low level and you might end up implementing them in such a way that's violating the Reactive Streams specification.

    と言っているのでActorPublisher/ActorSubscriberを使って実装するのは避けた方が良さそう。 なので、これらを使うくらいならActorRefを使うべきだが、 Akka-Stream が Actor の存在をせっかく隠してくれているので明示的に使わずにすむならそれがベターかと。 必要のないものを無理に使わない、という当たり前の感想となってしまった。

    from: https://qiita.com/petitviolet/items/744fa0d0634c5262970f