Scala - ScalazのTaskを使う上で知っておきたいこと
2017-11-17
QiitaScalascalazこの記事は何?
scalaz の Task を使う上で知っておきたい tips とか注意点とか。
以前、こういう記事を書いた。
[Scala]scalaz の Task は何が嬉しいのか - Qiita
Scala の Future と同じようなこと出来る + 便利な API が生えているというイメージ。
と書いたが、実際に使っていく上ではもう少し知っておくべきことがあるのでまとめておく。
Task は遅延評価される
改めて、Task は遅延評価されるもの。
Task.now
を使うと名前の通り即時評価されるが、apply
, delay
, point
などの別のファクトリを使用すると遅延評価される。
作成した Task をmap
やflatMap
を使って合成していくことになると思うが、それらも遅延評価されて逐次実行される。
scala 標準の Future と比較してみる。
import scala.concurrent.Future
{
def future(i: Int) = Future {
println(s"future sleeping: $i")
Thread.sleep(i * 10)
println(s"future wakeup: $i")
i
}
val i = future(10)
val j = future(20)
val k = future(30)
(for {
_i <- i
_j <- j
_k <- k
} yield { _i * _j * _k }).onComplete(println)
}
これを実行すると以下のような出力が得られる。
future sleeping: 30 future sleeping: 10 future sleeping: 20 future wakeup: 10 future wakeup: 20 future wakeup: 30 Success(6000)
Thread.sleep
する前のコードが先に評価されているのがわかる。
↑ の Future を Task に置き換えてみると以下のようなコードになる。ほぼ同じ。
import scalaz.concurrent.Task
{
def task(i: Int) = Task {
println(s"task sleeping: $i")
Thread.sleep(i * 10)
println(s"task wakeup: $i")
i
}
val i = task(10)
val j = task(20)
val k = task(30)
(for {
_i <- i
_j <- j
_k <- k
} yield {_i * _j * _k }).unsafePerformAsync(println)
}
これを実行すると以下のような出力が得られる。
task sleeping: 10 task wakeup: 10 task sleeping: 20 task wakeup: 20 task sleeping: 30 task wakeup: 30 /-(6000)
先ほどと違ってThread.sleep
前後のコードが順番に評価されている。
これが遅延評価されていて逐次実行されるということ。
Future なら for 式の中でFuture.apply
を呼ばないようにしておけ並行実行されるが、Task の場合は for 式で書くと逐次実行されてしまう。
Task を並行実行したい
遅延評価されて逐次実行されると困る、という場合にどうするか。 選択肢は以下の 2 つ。
Task.gatherUnordered
かTask.reduceUnordered
を使うTask.unsafeStart
を使うNondeterminism
を使うものについては@xuwei_k さんのコメントを参照して下さい
Nondeterminism
を使うのもありだが、ここでは触れない。
Task.gatherUnordered か Task.reduceUnordered を使う
複数の Task をまとめて並行実行するための API となっている。
val i = task(10)
val j = task(20)
val k = task(30)
val reducer = {
val adder = (i1: Int) => (i2: Int) => i1 * i2
Reducer.apply[Int, Int](identity, adder, adder)(Scalaz.intInstance)
}
// 順番入れ替え
Task.reduceUnordered(List(k, i, j))(reducer).unsafePerformAsync(println)
これを実行すると以下のようになる。
task sleeping: 30 task sleeping: 10 task sleeping: 20 task wakeup: 10 task wakeup: 20 task wakeup: 30 /-(6000)
Thread.sleep
する前のコードが先に評価されているのがわかる。
Reducer
を用意するのが面倒なので、gatherUnorderd
を使った方がコード的には楽。
// 同じ結果になる
Task.gatherUnordered(Seq(i, j, k)).map { _.foldLeft(1) { _ * _ } }.unsafePerformAsync(println)
ここで問題点があって、シグネチャを見ると型パラメータが 1 つしかない。
def gatherUnordered[A](tasks: Seq[Task[A]], exceptionCancels: Boolean = false): Task[List[A]] = ...
def reduceUnordered[A, M](tasks: Seq[Task[A]], exceptionCancels: Boolean = false)(implicit R: Reducer[A, M]): Task[M] = ...
たとえば複数のテーブルから別々にデータを取得したい場合に共通の型があればいいが、無ければAny
とかになってしまう。
shapeless のHListを使うとか...。
あるいは諦めてTask.unsafeStart
を使ったほうが良い。
Task.unsafeStart を使う
これは Scala 標準の Future っぽい使い方が出来る Task を生成するための API になっている。
def task(i: Int) = Task.unsafeStart {
println(s"task sleeping: $i")
Thread.sleep(i * 10)
println(s"task wakeup: $i")
i
}
val k = task(3)
val i = task(1)
val j = task(2)
(for {
_i <- i
_k <- k
_j <- j
} yield {_i + _j + _k }).unsafePerformAsync(println)
これを実行すると以下のようになる。
task sleeping: 3 task sleeping: 2 task sleeping: 1 task wakeup: 1 task wakeup: 2 task wakeup: 3 /-(6)
Future を使った時とほぼ同じような結果が得られている。
並行実行まとめ
やり方は以下の 2 通り。
Task.gatherUnordered
かTask.reduceUnordered
を使うTask.unsafeStart
を使う
MapReduce のようなイメージで使う場合は前者、Scala 標準の Future のように使う場合は後者、という使い分けだろうか。
通常のアプリケーション的なコードだとunsafeStart
を使うことも多くなりそうだが、それなら Scala 標準の Future を使えば十分だと思う。
トランザクションを抜けてしまう問題
Task は遅延評価されるもの。
その弊害(?)として、例えば RDB のトランザクション内での処理をしたい場合に Task を使うとトランザクションが閉じてしまっていたりする問題が発生する。
擬似コードとしては以下のような感じ。
class Transaction(var status: String = "OPEN") {
def close() = status = "CLOSED"
}
def withTransaction[A](f: Transaction => Task[A]): Task[A] = {
val ctx = new Transaction("OPEN")
try {
f(ctx)
} finally { ctx.close() } // 本当はcommit/rollback的な処理
}
val t = withTransaction { ctx =>
Task(s"transaction status: ${ctx.status}")
}
t.unsafePerformASync(println)
実行すると
/-(transaction status: CLOSED)
と出力される。
この例だとTask#unsafePerformAsync
が実行されて Task の中身が評価されるタイミングではwithTransaction
のブロックを抜けてしまっていることになる。
なので、withTransaction
の中で Task を実行してしまわないといけないことになる。
val t = withTransaction { ctx =>
Task.fromDisjunction {
\/.fromTryCatchNonFatal {
Task(s"transaction status: ${ctx.status }").unsafePerformSync
}
}
}
t.unsafePerformAsync(println)
unsafePerformSync
を使って同期的に実行してトランザクションを確定してからブロックを抜けるようなコードになってしまった。
トランザクションのようなローンパターンと Task は気をつけて組み合わせないといけない。
from: https://qiita.com/petitviolet/items/181e4cce35ada5cc8669