petitviolet blog

    Scala - scalazのTaskは何が嬉しいのか

    2017-09-30

    QiitaScalascalaz

    Task?

    scalaz/scalazにあるクラス。
    scalaz/Task.scala

    これはscalaz/Future.scalaを wrap したクラスとなっている。

    Future?

    scala.concurrent.Futureと同じく非同期処理を表現するクラスになっているが、
    違いとしてはmapflatMapExecutionContextを必要とせず新しいタスクを生成しないようになっている点。
    これによってスレッドの再利用性が向上しているとのこと。

    こんな感じ。

    (for {
      i <- Future.apply(100)
      j <- Future.now(200)
      k <- Future.delay(300)
    } yield i + j + k).unsafePerformAsync(println)
    // 600
    

    monadic な API を備えているので for 式が書ける。
    Future自体は非同期タスクを定義・記述だけとなっていて、実行するにはunsafePerformAsyncunsafePerformSyncを呼び出す必要がある。

    Futureのインスタンスを作成するのにapply, now, delayを使っているが、applyだと別スレッドでの評価、nowは正格評価でdelayは遅延評価、みたいなイメージで使い分ける。

    Scala 標準のFutureapplyは遅延評価となっているが、Scalaz のものは異なる。
    さらに大きな違いとして例外処理機構は持っていない。
    そのため、nowでもdelayでも評価して例外が発生するとそれは例外になる。

    Future.now { sys.error("error"); 1 }.unsafePerformAsync(println)
    // Exception in thread "main" java.lang.RuntimeException: error
    Future.delay { sys.error("error"); 1 }.unsafePerformAsync(println)
    // Exception in thread "main" java.lang.RuntimeException: error
    

    ちなみにapplyだと闇に消える...。

    Future.apply { sys.error("error"); 300 }.unsafePerformAsync(println)
    // 何も表示されない
    

    Futureの実装クラスはこの辺にあって、例外っぽいものを表現する型はない。

    じゃあ例外処理はどうするの...?となった時にTaskが登場する。

    Task?

    改めて。
    scalaz.concurrent.Futureに例外処理を組み込んだもの。
    といいつつ Scala 標準Futureのようなものではなく、Future[Throwable \/ A]として例外を保持するようになっている。

    class Task[+A](val get: Future[Throwable \/ A]) {
      ...
    

    実装はこのあたり

    さっきと同じようなプログラムをTaskで書いてみると以下のようになる

    (for {
      i <- Task.apply(100)
      j <- Task.now(200)
      k <- Task.delay(300)
    } yield i + j + k).unsafePerformAsync(println)
    // \/-(600)
    

    Futureと同じように書けるが、結果は\/-になっているのが分かる。
    Futureと違う点として、apply, delayにおいて例外が起きるようにしてみると、-\/として結果を受け取る事もできるが、nowだと引数を正格評価するのでその時点で例外が発生する。

    Task.apply { sys.error("error"); 100 }.unsafePerformAsync(println)
    // -\/(java.lang.RuntimeException: error)
    
    Task.delay { sys.error("error"); 200 }.unsafePerformAsync(println)
    // -\/(java.lang.RuntimeException: error)
    
    Task.now { sys.error("error"); 300 }.unsafePerformAsync(println)
    // Exception in thread "main" java.lang.RuntimeException: error
    

    Task の嬉しいところ

    Future 的操作

    Scala のFutureのようにコールバックや例外処理の設定を付与できる。

    Task.delay { sys.error("error") }.onFinish { errOpt: Option[Throwable] =>
      Task.now {
        errOpt match {
          case Some(err) => println(s"fail! ${err.getMessage}")
          case None => println("success!")
        }
      }
    }.unsafePerformAsync(println)
    // fail! error
    // -\/(java.lang.RuntimeException: error)
    
    Task { sys.error("fail") }.handle { case t: Throwable => "success" }.unsafePerformAsync(println)
    // \/-(success)
    

    FutureonCompleteと違ってコールバックが設定されたTaskが返ってくるのが地味に嬉しい。

    実行方法の種類

    すでに上でunsafePerformAsyncというメソッドを叩いているが、名前の通り async かつ危険な実行という感じ。
    \/で返ってくるのだから unsafe じゃないような気もするが...。

    Taskの実行方法はざっくり以下。

    • unsafePerformAsync
      • 非同期実行で結果は\/に包まれる
    • unsafePerformSync
      • 同期実行
    • unsafePerformSyncAttempt
      • 同期実行で結果は\/に包まれる
    • unsafePerformSyncFor
      • 同期実行でタイムアウト設定可能
    • unsafePerformSyncAttemptFor
      • 同期実行でタイムアウト設定可能で結果は\/に包まれる
    Task.apply { 100 }.unsafePerformAsync(println)
    // \/-(100)
    Task.apply { sys.error("fail"); 100 }.unsafePerformAsync(println)
    // -\/(java.lang.RuntimeException: fail)
    
    println(Task.apply { 100 }.unsafePerformSync)
    // 100
    println(Task.apply { sys.error("fail"); 100 }.unsafePerformSync)
    // Exception in thread "main" java.lang.RuntimeException: fail
    
    println(Task.apply { 100 }.unsafePerformSyncAttempt)
    // \/-(100)
    println(Task.apply { sys.error("fail"); 100 }.unsafePerformSyncAttempt)
    // -\/(java.lang.RuntimeException: fail)
    
    import scala.concurrent.duration._
    val d = 10.millis
    println(Task { 100 }.unsafePerformSyncFor(d))
    // 100
    println(Task { Thread.sleep(1000); 100 }.unsafePerformSyncFor(d))
    // Exception in thread "main" java.util.concurrent.TimeoutException: Timed out after 10 milliseconds
    
    println(Task { 100 }.unsafePerformSyncAttemptFor(d))
    // \/-(100)
    println(Task { Thread.sleep(1000); 100 }.unsafePerformSyncAttemptFor(d))
    // -\/(java.util.concurrent.TimeoutException: Timed out after 10 milliseconds)
    

    安全に使うなら、unsafePerformAsyncunsafePerformSyncAttemptForのどちらかを使うことになりそう。
    ちなみにタイムアウトはtimedメソッドでも設定できる。

    Task { Thread.sleep(100); 100 }.timed(10.millis).unsafePerformAsync(println)
    // -\/(java.util.concurrent.TimeoutException: Timed out after 10 milliseconds)
    

    スケジューリングやリトライ出来る

    retryTask.scheduleで設定可能。

    Task { 100 }.retry(List.fill(2)(10.millis)).unsafePerformAsync(println)
    // \/-(100)
    Task { sys.error("fail"); 100 }.retry(List.fill(2)(10.millis)).unsafePerformAsync(println)
    // -\/(java.lang.RuntimeException: fail)
    
    Task.schedule(100, 10.millis).unsafePerformAsync(println)
    // \/-(100)
    

    豊富な API が便利。

    Scala の Future に変換する

    実際問題、開発の基盤として Scalaz いれてTaskを全面で使っていくにしても他のライブラリとの兼ね合いでscala.concurrent.Futureを使いたいシーンも多いはず。
    そういう時にはVerizon/deloreanみたいな変換ライブラリもあるので、合わせて入れておくと便利かもしれない。delorean/package.scala at master · Verizon/delorean

    まとめ

    Scalaz のTask便利。 Scala のFutureと同じようなこと出来る + 便利な API が生えているというイメージ。 ただ、関数型っぽく明示的な実行をキックしない限り走り出さないのでそこは少し慣れが必要かもしれない。

    あわせて読みたい Scalaz の Task を使う上で知っておきたいこと - Qiita

    from: https://qiita.com/petitviolet/items/18b1108ab904172baf61