blog.petitviolet.net

Scala - scalazのTaskは何が嬉しいのか

2017-09-30

QiitaScalascalaz

Task?

scalaz/scalazにあるクラス。
scalaz/Task.scala

これはscalaz/Future.scalaを wrap したクラスとなっている。

Future?

scala.concurrent.Futureと同じく非同期処理を表現するクラスになっているが、
違いとしてはmapflatMapExecutionContextを必要とせず新しいタスクを生成しないようになっている点。
これによってスレッドの再利用性が向上しているとのこと。

こんな感じ。

(for {
  i <- Future.apply(100)
  j <- Future.now(200)
  k <- Future.delay(300)
} yield i + j + k).unsafePerformAsync(println)
// 600

monadic な API を備えているので for 式が書ける。
Future自体は非同期タスクを定義・記述だけとなっていて、実行するにはunsafePerformAsyncunsafePerformSyncを呼び出す必要がある。

Futureのインスタンスを作成するのにapply, now, delayを使っているが、applyだと別スレッドでの評価、nowは正格評価でdelayは遅延評価、みたいなイメージで使い分ける。

Scala 標準のFutureapplyは遅延評価となっているが、Scalaz のものは異なる。
さらに大きな違いとして例外処理機構は持っていない。
そのため、nowでもdelayでも評価して例外が発生するとそれは例外になる。

Future.now { sys.error("error"); 1 }.unsafePerformAsync(println)
// Exception in thread "main" java.lang.RuntimeException: error
Future.delay { sys.error("error"); 1 }.unsafePerformAsync(println)
// Exception in thread "main" java.lang.RuntimeException: error

ちなみにapplyだと闇に消える…。

Future.apply { sys.error("error"); 300 }.unsafePerformAsync(println)
// 何も表示されない

Futureの実装クラスはこの辺にあって、例外っぽいものを表現する型はない。

じゃあ例外処理はどうするの…?となった時にTaskが登場する。

Task?

改めて。
scalaz.concurrent.Futureに例外処理を組み込んだもの。
といいつつ Scala 標準Futureのようなものではなく、Future[Throwable \/ A]として例外を保持するようになっている。

class Task[+A](val get: Future[Throwable \/ A]) {
  ...

実装はこのあたり

さっきと同じようなプログラムをTaskで書いてみると以下のようになる

(for {
  i <- Task.apply(100)
  j <- Task.now(200)
  k <- Task.delay(300)
} yield i + j + k).unsafePerformAsync(println)
// \/-(600)

Futureと同じように書けるが、結果は\/-になっているのが分かる。
Futureと違う点として、apply, delayにおいて例外が起きるようにしてみると、-\/として結果を受け取る事もできるが、nowだと引数を正格評価するのでその時点で例外が発生する。

Task.apply { sys.error("error"); 100 }.unsafePerformAsync(println)
// -\/(java.lang.RuntimeException: error)

Task.delay { sys.error("error"); 200 }.unsafePerformAsync(println)
// -\/(java.lang.RuntimeException: error)

Task.now { sys.error("error"); 300 }.unsafePerformAsync(println)
// Exception in thread "main" java.lang.RuntimeException: error

Task の嬉しいところ

Future 的操作

Scala のFutureのようにコールバックや例外処理の設定を付与できる。

Task.delay { sys.error("error") }.onFinish { errOpt: Option[Throwable] =>
  Task.now {
    errOpt match {
      case Some(err) => println(s"fail! ${err.getMessage}")
      case None => println("success!")
    }
  }
}.unsafePerformAsync(println)
// fail! error
// -\/(java.lang.RuntimeException: error)

Task { sys.error("fail") }.handle { case t: Throwable => "success" }.unsafePerformAsync(println)
// \/-(success)

FutureonCompleteと違ってコールバックが設定されたTaskが返ってくるのが地味に嬉しい。

実行方法の種類

すでに上でunsafePerformAsyncというメソッドを叩いているが、名前の通り async かつ危険な実行という感じ。
\/で返ってくるのだから unsafe じゃないような気もするが…。

Taskの実行方法はざっくり以下。

  • unsafePerformAsync

    • 非同期実行で結果は\/に包まれる
  • unsafePerformSync

    • 同期実行
  • unsafePerformSyncAttempt

    • 同期実行で結果は\/に包まれる
  • unsafePerformSyncFor

    • 同期実行でタイムアウト設定可能
  • unsafePerformSyncAttemptFor

    • 同期実行でタイムアウト設定可能で結果は\/に包まれる
Task.apply { 100 }.unsafePerformAsync(println)
// \/-(100)
Task.apply { sys.error("fail"); 100 }.unsafePerformAsync(println)
// -\/(java.lang.RuntimeException: fail)

println(Task.apply { 100 }.unsafePerformSync)
// 100
println(Task.apply { sys.error("fail"); 100 }.unsafePerformSync)
// Exception in thread "main" java.lang.RuntimeException: fail

println(Task.apply { 100 }.unsafePerformSyncAttempt)
// \/-(100)
println(Task.apply { sys.error("fail"); 100 }.unsafePerformSyncAttempt)
// -\/(java.lang.RuntimeException: fail)

import scala.concurrent.duration._
val d = 10.millis
println(Task { 100 }.unsafePerformSyncFor(d))
// 100
println(Task { Thread.sleep(1000); 100 }.unsafePerformSyncFor(d))
// Exception in thread "main" java.util.concurrent.TimeoutException: Timed out after 10 milliseconds

println(Task { 100 }.unsafePerformSyncAttemptFor(d))
// \/-(100)
println(Task { Thread.sleep(1000); 100 }.unsafePerformSyncAttemptFor(d))
// -\/(java.util.concurrent.TimeoutException: Timed out after 10 milliseconds)

安全に使うなら、unsafePerformAsyncunsafePerformSyncAttemptForのどちらかを使うことになりそう。
ちなみにタイムアウトはtimedメソッドでも設定できる。

Task { Thread.sleep(100); 100 }.timed(10.millis).unsafePerformAsync(println)
// -\/(java.util.concurrent.TimeoutException: Timed out after 10 milliseconds)

スケジューリングやリトライ出来る

retryTask.scheduleで設定可能。

Task { 100 }.retry(List.fill(2)(10.millis)).unsafePerformAsync(println)
// \/-(100)
Task { sys.error("fail"); 100 }.retry(List.fill(2)(10.millis)).unsafePerformAsync(println)
// -\/(java.lang.RuntimeException: fail)

Task.schedule(100, 10.millis).unsafePerformAsync(println)
// \/-(100)

豊富な API が便利。

Scala の Future に変換する

実際問題、開発の基盤として Scalaz いれてTaskを全面で使っていくにしても他のライブラリとの兼ね合いでscala.concurrent.Futureを使いたいシーンも多いはず。
そういう時にはVerizon/deloreanみたいな変換ライブラリもあるので、合わせて入れておくと便利かもしれない。delorean/package.scala at master · Verizon/delorean

まとめ

Scalaz のTask便利。 Scala のFutureと同じようなこと出来る + 便利な API が生えているというイメージ。 ただ、関数型っぽく明示的な実行をキックしない限り走り出さないのでそこは少し慣れが必要かもしれない。

あわせて読みたい Scalaz の Task を使う上で知っておきたいこと - Qiita

from: https://qiita.com/petitviolet/items/18b1108ab904172baf61