blog.petitviolet.net

RxJava(RxAndroid)でのBackpressureについて

2015-07-21

QiitaAndroidRxJava

RxJava の Android バインディングであるRxAndroidを使い始めて Backpressure ではまったので整理 retrolambdaで lambda 式使っていますが、使っていなければ適宜newするコードに置き換えてください

Backpressure

Backpressure · ReactiveX/RxJava Wiki

通信処理に RxJava 使った時に、

System.err﹕ rx.exceptions.MissingBackpressureException

みたいなエラーを吐いてしまうことがあったのでその対策

問題のあるコード

通信部分は例えばこんな感じ

Client.java
public Observable<Item> get() {
    return Observable.create(subscriber -> {
        subscriber.onStart();
        List<Object> items = // 通信
        for (int i = 0; i < items.size(); i++) {
            Item item = // parseする
            subscriber.onNext(Item);
        }
        subscriber.onCompleted();
    });
}

何か通信して取ってきてパースしてItemオブジェクトにしてからsubscriber#onNextで subscriber に返す、という流れ これをsubscribeする側はこんな感じ

MainActivity.java
private void fetchItems(String url) {
	new Client(url).get()
    	    .subscribeOn(Schedulers.io())
	        .observeOn(AndroidSchedulers.mainThread())
    	    .subscribe(
    	    		this::setResult,
    	    		Throwable::printStackTrace,
    	    		() -> Log.d(TAG, "complete"));
}

private void setResult(Item item) {
    // 通信結果を何かする
}

io スレッドで通信してメインスレッドでsetResultメソッドを呼ぶ、という流れ これだと上にも上げたようなエラーの stacktrace が表示された

W/System.err﹕ rx.exceptions.MissingBackpressureException
W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:338)
W/System.err﹕ at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
W/System.err﹕ at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)

ワーカースレッドから送られてくるデータにメインスレッドでの処理が間に合ってないようなイメージだと思う

解決策

RxJava Wikiにもあるようにいくつか解決策があった

new Client(url).get()
		.onBackpressureBuffer() // ここ
   	    .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        // .onBackpressureBuffer() ここだとダメ
   	    .subscribe(/** なにか処理 **/);

この例のようにonBackpressureBufferやその他onBackpressureDroponBackpressureBlockなどをメソッドチェーンで call するといい感じに完了できる

  • onBackpressureBuffer - ワーカースレッドからの通知をバッファリングする
  • onBackpressureDrop - メインスレッドでの処理中にはワーカースレッドから通知せずに捨てる
  • onBackpressureBlock - メインスレッドの処理が終わるまでワーカースレッド処理を一時停止させる

理解がやや曖昧ですが、こんな感じのはず observeOnより前に call しなければいけない点に注意しなければいけない

from: https://qiita.com/petitviolet/items/f6d9395d40ee0bbef94b