RxJava(RxAndroid)でのBackpressureについて
2015-07-21
QiitaAndroidRxJavaRxJava の Android バインディングであるRxAndroidを使い始めて Backpressure ではまったので整理
retrolambdaで lambda 式使っていますが、使っていなければ適宜new
するコードに置き換えてください
Backpressure
Backpressure · ReactiveX/RxJava Wiki
通信処理に RxJava 使った時に、
System.err﹕ rx.exceptions.MissingBackpressureException
みたいなエラーを吐いてしまうことがあったのでその対策
問題のあるコード
通信部分は例えばこんな感じ
client.java
public Observable<Item> get() {
return Observable.create(subscriber -> {
subscriber.onStart();
List<Object> items = // 通信
for (int i = 0; i < items.size(); i++) {
Item item = // parseする
subscriber.onNext(Item);
}
subscriber.onCompleted();
});
}
何か通信して取ってきてパースしてItem
オブジェクトにしてからsubscriber#onNext
で subscriber に返す、という流れ
これをsubscribe
する側はこんな感じ
mainactivity.java
private void fetchItems(String url) {
new Client(url).get()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
this::setResult,
Throwable::printStackTrace,
() -> Log.d(TAG, "complete"));
}
private void setResult(Item item) {
// 通信結果を何かする
}
io スレッドで通信してメインスレッドでsetResult
メソッドを呼ぶ、という流れ
これだと上にも上げたようなエラーの stacktrace が表示された
W/System.err﹕ rx.exceptions.MissingBackpressureException
W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:338)
W/System.err﹕ at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
W/System.err﹕ at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)
ワーカースレッドから送られてくるデータにメインスレッドでの処理が間に合ってないようなイメージだと思う
解決策
RxJava Wikiにもあるようにいくつか解決策があった
new Client(url).get()
.onBackpressureBuffer() // ここ
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
// .onBackpressureBuffer() ここだとダメ
.subscribe(/** なにか処理 **/);
この例のようにonBackpressureBuffer
やその他onBackpressureDrop
、onBackpressureBlock
などをメソッドチェーンで call するといい感じに完了できる
onBackpressureBuffer
- ワーカースレッドからの通知をバッファリングするonBackpressureDrop
- メインスレッドでの処理中にはワーカースレッドから通知せずに捨てるonBackpressureBlock
- メインスレッドの処理が終わるまでワーカースレッド処理を一時停止させる
理解がやや曖昧ですが、こんな感じのはず
observeOn
より前に call しなければいけない点に注意しなければいけない
from: https://qiita.com/petitviolet/items/f6d9395d40ee0bbef94b