petitviolet blog

    Scala - ScalazのTaskを使う上で知っておきたいこと

    2017-11-17

    QiitaScalascalaz

    この記事は何?

    scalaz の Task を使う上で知っておきたい tips とか注意点とか。 以前、こういう記事を書いた。
    [Scala]scalaz の Task は何が嬉しいのか - Qiita

    Scala の Future と同じようなこと出来る + 便利な API が生えているというイメージ。

    と書いたが、実際に使っていく上ではもう少し知っておくべきことがあるのでまとめておく。

    Task は遅延評価される

    改めて、Task は遅延評価されるもの。

    Task.nowを使うと名前の通り即時評価されるが、apply, delay, pointなどの別のファクトリを使用すると遅延評価される。 作成した Task をmapflatMapを使って合成していくことになると思うが、それらも遅延評価されて逐次実行される。

    scala 標準の Future と比較してみる。

    import scala.concurrent.Future
    
    {
      def future(i: Int) = Future {
        println(s"future sleeping: $i")
        Thread.sleep(i * 10)
        println(s"future wakeup: $i")
        i
      }
    
      val i = future(10)
      val j = future(20)
      val k = future(30)
    
      (for {
        _i <- i
        _j <- j
        _k <- k
      } yield { _i * _j * _k }).onComplete(println)
    }
    

    これを実行すると以下のような出力が得られる。

    future sleeping: 30 future sleeping: 10 future sleeping: 20 future wakeup: 10 future wakeup: 20 future wakeup: 30 Success(6000)

    Thread.sleepする前のコードが先に評価されているのがわかる。

    ↑ の Future を Task に置き換えてみると以下のようなコードになる。ほぼ同じ。

    import scalaz.concurrent.Task
    {
      def task(i: Int) = Task {
        println(s"task sleeping: $i")
        Thread.sleep(i * 10)
        println(s"task wakeup: $i")
        i
      }
    
      val i = task(10)
      val j = task(20)
      val k = task(30)
      (for {
        _i <- i
        _j <- j
        _k <- k
      } yield {_i * _j * _k }).unsafePerformAsync(println)
    }
    

    これを実行すると以下のような出力が得られる。

    task sleeping: 10 task wakeup: 10 task sleeping: 20 task wakeup: 20 task sleeping: 30 task wakeup: 30 /-(6000)

    先ほどと違ってThread.sleep前後のコードが順番に評価されている。

    これが遅延評価されていて逐次実行されるということ。 Future なら for 式の中でFuture.applyを呼ばないようにしておけ並行実行されるが、Task の場合は for 式で書くと逐次実行されてしまう。

    Task を並行実行したい

    遅延評価されて逐次実行されると困る、という場合にどうするか。 選択肢は以下の 2 つ。

    • Task.gatherUnorderedTask.reduceUnorderedを使う
    • Task.unsafeStartを使う
    • Nondeterminismを使うものについては@xuwei_k さんのコメントを参照して下さい

    Nondeterminismを使うのもありだが、ここでは触れない。

    Task.gatherUnordered か Task.reduceUnordered を使う

    複数の Task をまとめて並行実行するための API となっている。

    val i = task(10)
    val j = task(20)
    val k = task(30)
    val reducer = {
      val adder = (i1: Int) => (i2: Int) => i1 * i2
      Reducer.apply[Int, Int](identity, adder, adder)(Scalaz.intInstance)
    }
    // 順番入れ替え
    Task.reduceUnordered(List(k, i, j))(reducer).unsafePerformAsync(println)
    

    これを実行すると以下のようになる。

    task sleeping: 30 task sleeping: 10 task sleeping: 20 task wakeup: 10 task wakeup: 20 task wakeup: 30 /-(6000)

    Thread.sleepする前のコードが先に評価されているのがわかる。

    Reducerを用意するのが面倒なので、gatherUnorderdを使った方がコード的には楽。

    // 同じ結果になる
    Task.gatherUnordered(Seq(i, j, k)).map { _.foldLeft(1) { _ * _ } }.unsafePerformAsync(println)
    

    ここで問題点があって、シグネチャを見ると型パラメータが 1 つしかない。

    def gatherUnordered[A](tasks: Seq[Task[A]], exceptionCancels: Boolean = false): Task[List[A]] = ...
    def reduceUnordered[A, M](tasks: Seq[Task[A]], exceptionCancels: Boolean = false)(implicit R: Reducer[A, M]): Task[M] = ...
    

    たとえば複数のテーブルから別々にデータを取得したい場合に共通の型があればいいが、無ければAnyとかになってしまう。
    shapeless のHListを使うとか...。

    あるいは諦めてTask.unsafeStartを使ったほうが良い。

    Task.unsafeStart を使う

    これは Scala 標準の Future っぽい使い方が出来る Task を生成するための API になっている。

    def task(i: Int) = Task.unsafeStart {
      println(s"task sleeping: $i")
      Thread.sleep(i * 10)
      println(s"task wakeup: $i")
      i
    }
    val k = task(3)
    val i = task(1)
    val j = task(2)
    (for {
      _i <- i
      _k <- k
      _j <- j
    } yield {_i + _j + _k }).unsafePerformAsync(println)
    

    これを実行すると以下のようになる。

    task sleeping: 3 task sleeping: 2 task sleeping: 1 task wakeup: 1 task wakeup: 2 task wakeup: 3 /-(6)

    Future を使った時とほぼ同じような結果が得られている。

    並行実行まとめ

    やり方は以下の 2 通り。

    • Task.gatherUnorderedTask.reduceUnorderedを使う
    • Task.unsafeStartを使う

    MapReduce のようなイメージで使う場合は前者、Scala 標準の Future のように使う場合は後者、という使い分けだろうか。 通常のアプリケーション的なコードだとunsafeStartを使うことも多くなりそうだが、それなら Scala 標準の Future を使えば十分だと思う。

    トランザクションを抜けてしまう問題

    Task は遅延評価されるもの。

    その弊害(?)として、例えば RDB のトランザクション内での処理をしたい場合に Task を使うとトランザクションが閉じてしまっていたりする問題が発生する。

    擬似コードとしては以下のような感じ。

    class Transaction(var status: String = "OPEN") {
      def close() = status = "CLOSED"
    }
    def withTransaction[A](f: Transaction => Task[A]): Task[A] = {
      val ctx = new Transaction("OPEN")
      try {
        f(ctx)
      } finally { ctx.close() } // 本当はcommit/rollback的な処理
    }
    
    val t = withTransaction { ctx =>
      Task(s"transaction status: ${ctx.status}")
    }
    t.unsafePerformASync(println)
    

    実行すると

    /-(transaction status: CLOSED)

    と出力される。

    この例だとTask#unsafePerformAsyncが実行されて Task の中身が評価されるタイミングではwithTransactionのブロックを抜けてしまっていることになる。 なので、withTransactionの中で Task を実行してしまわないといけないことになる。

    val t = withTransaction { ctx =>
      Task.fromDisjunction {
        \/.fromTryCatchNonFatal {
          Task(s"transaction status: ${ctx.status }").unsafePerformSync
        }
      }
    }
    t.unsafePerformAsync(println)
    

    unsafePerformSyncを使って同期的に実行してトランザクションを確定してからブロックを抜けるようなコードになってしまった。

    トランザクションのようなローンパターンと Task は気をつけて組み合わせないといけない。

    from: https://qiita.com/petitviolet/items/181e4cce35ada5cc8669