Akka-StreamのRunnableGraphの構築パターン
2016-10-22
QiitaScalaAkkaactorAkka-StreamAkka-Streamでアプリケーションを実装する場合、
Source と Flow と Sink を組み合わせてRunnableGraph
を構築し、run
して実行する。
本記事では Akka-Stream の部品のメインとなる Source, Flow, Sink を、 Akka-ActorおよびActorPublisher, ActorSubscriberを使う場合/使わない場合でシンプルに実装してみる。
Actor を使わずに RunnableGraph を構築する
まず、Actor を使わない Akka-Stream なアプリケーションを実装する。 コード全体はこちら AkkaStreamStandard.scala
今回取り扱うデータ型はこれだけ。
case class Message(value: String) extends AnyVal
このMessage
を追加されたら加工しながら出力する graph を実装する。
Source
後からデータを追加できるようにSourceQueueWithComplete
を使う。
Source.queue[T]
で楽に作れて便利。
val source: Source[Message, SourceQueueWithComplete[Message]] =
Source.queue[Message](100, OverflowStrategy.backpressure)
Flow
Message
を受け取って値を書き換えるものと、畳み込んでString
にする 2 つの Flow。
Flow[T].map
とFlow[T].fold
で実装する。
val flow: Flow[Message, Message, NotUsed] =
Flow[Message].map { r => r.copy(value = s"(Mapped: ${r.value})") }
val accumulater: Flow[Message, String, NotUsed] =
Flow[Message].fold("init") { (acc, rep) => s"$acc :: ${rep.value}" }
Sink
先程の Flow で作成したString
をprintln
するだけの簡単な Sink。
val sink: Sink[String, Future[Done]] = Sink.foreach[String] { println }
Graph
上で作ったSource
, Flow
, Sink
を組み合わせてRunnableGraph
を構築する。
via
とto
でつなげるだけ。
// simple graph
val graph: RunnableGraph[SourceQueueWithComplete[Message]] =
source via flow via accumulater to sink
実行
上のgraph
にrun
を呼んだら手に入るqueue
に対してoffer
でメッセージを詰めると stream を流れていく。
complete
で終了。
val queue: SourceQueueWithComplete[Message] = graph.run()
queue offer Message("hello!")
queue offer Message("100")
queue offer Message("good")
queue complete
実行するとこんなログが出る。
[info] init :: (Mapped: hello!) :: (Mapped: 100) :: (Mapped: good)
Actor{Publisher,Subscriber}を使って RunnableGraph を構築する
コード全体はこちら AkkaStreamPracWithActor.scala
使用するデータ型は、
先程のMessage
と名前を変えたLetter
とqueue.complete
に相当するメッセージとしてFinish
を用意した。
case class Letter(value: String) extends AnyVal
case object Finish
Source
Source 用の Actor を実装
Source に使用する Actor はActorPublisher[T]
を実装すれば良い。
publish するメッセージはreceive
内でonNext(msg)
し、onComplete
で終了する。
class PublishActor extends ActorPublisher[Letter] {
// publish [[Letter]] or OnComplete
override def receive: Actor.Receive = {
case s: String =>
onNext(Letter(s"Nice: $s"))
case i: Int =>
onNext(Letter(s"Great: ${i * 100}"))
case Finish =>
onComplete()
}
}
ActorPublisher を使って Source を作る
先程のPublishActor
のActorRef
インスタンスをActorPublisher.apply
でPublisher[T]
にしてから、
Source.fromPublisher
で Source に変換する
// publisher actor
val publishActorRef = system.actorOf(Props[PublishActor])
// source with actor
val source: Source[Letter, NotUsed] = {
val publisher: Publisher[Letter] = ActorPublisher(publishActorRef)
Source.fromPublisher(publisher)
}
Flow
Flow 用の Actor を実装
これは普通にActor
を extends した普通の Actor。
返り値を使いたいのでsender
にメッセージを送る。
class FlowActor extends Actor {
// subscribe and publish
override def receive: Actor.Receive = {
case Letter(msg) => sender() ! Letter(s"(Mapped: $msg)")
case any => println(s"??? => $any")
}
}
Actor を使って Flow を作る
StackOverFlowを参考に実装。
先程のFlowActor
のActorRef
インスタンスにask
(?
)で得られるFuture[T]
をFlow.mapAsync[T]
で Flow にするだけ。
// flow
val flow: Flow[Letter, Letter, NotUsed] = {
import scala.concurrent.duration._
implicit val timeout: Timeout = 1.second
val flowActor = system.actorOf(Props[FlowActor])
def flowWithActor(reply: Letter): Future[Letter] = (flowActor ? reply).mapTo[Letter]
Flow[Letter].mapAsync[Letter](3)(flowWithActor)
}
// another flow without actor
val accumulater: Flow[Letter, String, NotUsed] =
Flow[Letter].fold("init") { (acc, rep) => s"$acc :: ${rep.value}" }
Actor を使わない Flow もあわせて使ってみる。
Sink
Sink 用の Actor を実装
ActorSubscriber
を実装した Actor にする。
OnNext
とOnComplete
が送られてくるので、それをreceive
すればいい。
class SubscribeActor extends ActorSubscriber {
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
// just subscribe
override def receive: Actor.Receive = {
case OnNext(any) => println(s"subscribed: $any")
case OnComplete => println(s"finish process!")
}
}
ActorSubscriber を使って Sink を作る
先程のSubscribeActor
のActorRef
をActorSubscriber.apply
したものをSink.fromSubscriber
で Source にする。
// sink with actor
val sink: Sink[String, NotUsed] = {
val printActor = system.actorOf(Props[SubscribeActor])
Sink.fromSubscriber[String](ActorSubscriber[String](printActor))
}
RunnableGraph
これは特に変わらない。
via
とto
で Source, Flow, Sink をつなげればいい。
// simple graph
val graph: RunnableGraph[NotUsed] = source via flow via accumulater to sink
実行
RunnableGraph#run
する。
そして、Source の元となったpublishActorRef
に対してメッセージをtell
すれば stream を流れていく。
run
して即座にメッセージを流すとIllegalStateException
が出てしまうのでとりあえずThread.sleep
してごまかしている。
graph.run
// wait preparing graph
Thread.sleep(100L)
publishActorRef ! "hello!"
publishActorRef ! 100
publishActorRef ! "good"
publishActorRef ! Finish
実行するとこんなログが出る。
[info] subscribed: init :: (Mapped: Nice: hello!) :: (Mapped: Great: 10000) :: (Mapped: Nice: good)
ActorRef を使って RunnableGraph を構築する
上記ではActorPublisher
/ActorSubscriber
を使って実装したが、次はSource.actorRef
, Sink.actorRef
を使ってみる。
コード全体はこちら
AkkaStreamPracWithActorRef.scala
Source
Source.actorRef
で Source を作る。
// source with actorRef
val source: Source[Message, ActorRef] = Source.actorRef[Message](65536, OverflowStrategy.fail)
これだけ。
Source として使用するActorRef
インスタンスを用意する必要すらない。
第一引数はメッセージバッファのサイズで、第二引数はそれが overflow した時の挙動。
ちなみにこの場合は BackPressure はサポートされていない。
Flow
先ほどと全く同じものを使うのでここでは省略。
Sink
Sink 用の Actor
まず Sink 用の Actor を実装する。
ただprintln
するだけのシンプルな Actor。
class SubscribeActor extends Actor {
// just subscribe
override def receive: Actor.Receive = {
case Finish => println(s"finish process!")
case any => println(s"subscribed: $any")
}
}
Sink.actorRef で Sink を作る
先程の Actor のActorRef
インスタンスをSink.actorRef
に渡すだけで良い。
// subscriber
val subscribeActor = system.actorOf(Props[SubscribeActor])
// sink with actorRef
val sink: Sink[Any, NotUsed] = Sink.actorRef(subscribeActor, onCompleteMessage = Finish)
第二引数のonCompleteMessage
は、Sink 内部のActorSubscriber
に対してOnComplete
がtell
された時にこのActorRef
に対してtell
される。
ActorRefSinkActor.scala
RunnableGraph
via
とto
で Source, Flow, Sink をつなげる点は同じ。
ただ、RunnableGraph[Mat]
のMat
がActorRef
になる。
// simple graph
val graph: RunnableGraph[ActorRef] = source via flow via accumulator to sink
実行
先程のgraph
に対してrun
を実行するとActorRef
が返ってくる。
val sourceActorRef: ActorRef = graph.run
これに対してメッセージをtell
すればそのメッセージが Source として Stream を流れていく。
// wait preparing graph
Thread.sleep(100L)
sourceActorRef ! Letter("hello!")
sourceActorRef ! Letter("100")
sourceActorRef ! Letter("good")
// force complete upstream source
sourceActorRef ! PoisonPill
今回は途中の Flow でfold
を使っているため、上流となるsourceActorRef
を終了させることで次に stream を無理やり流している。
実行結果は以下。
[info] subscribed: init :: (Mapped: hello!) :: (Mapped: 100) :: (Mapped: good)
まとめ
RunnableGraph の構築に Actor を使わない, ActorPublisher/ActorSubscriber, ActorRef を使う 3 パターンで実装した。
Actor を使うやり方だとそもそも Actor を多少知らないと使えない点ではコスト高にはなるが、 開発アーキテクチャのベースとして Actor を使用しているケースでは Stream でも統一して Actor を使えるというのは利点になるはず。
ただ、Konrad さん(@ktosopl)がStackOverFlowで
Please don't use ActorPublisher and ActorSubscriber. They're too low level and you might end up implementing them in such a way that's violating the Reactive Streams specification.
と言っているのでActorPublisher
/ActorSubscriber
を使って実装するのは避けた方が良さそう。
なので、これらを使うくらいならActorRef
を使うべきだが、
Akka-Stream が Actor の存在をせっかく隠してくれているので明示的に使わずにすむならそれがベターかと。
必要のないものを無理に使わない、という当たり前の感想となってしまった。
from: https://qiita.com/petitviolet/items/744fa0d0634c5262970f