petitviolet blog

    RxJava(RxAndroid)でのBackpressureについて

    2015-07-21

    QiitaAndroidRxJava

    RxJava の Android バインディングであるRxAndroidを使い始めて Backpressure ではまったので整理 retrolambdaで lambda 式使っていますが、使っていなければ適宜newするコードに置き換えてください

    Backpressure

    Backpressure · ReactiveX/RxJava Wiki

    通信処理に RxJava 使った時に、

    System.err﹕ rx.exceptions.MissingBackpressureException
    

    みたいなエラーを吐いてしまうことがあったのでその対策

    問題のあるコード

    通信部分は例えばこんな感じ

    client.java
    public Observable<Item> get() {
        return Observable.create(subscriber -> {
            subscriber.onStart();
            List<Object> items = // 通信
            for (int i = 0; i < items.size(); i++) {
                Item item = // parseする
                subscriber.onNext(Item);
            }
            subscriber.onCompleted();
        });
    }
    

    何か通信して取ってきてパースしてItemオブジェクトにしてからsubscriber#onNextで subscriber に返す、という流れ これをsubscribeする側はこんな感じ

    mainactivity.java
    private void fetchItems(String url) {
    	new Client(url).get()
        	    .subscribeOn(Schedulers.io())
    	        .observeOn(AndroidSchedulers.mainThread())
        	    .subscribe(
        	    		this::setResult,
        	    		Throwable::printStackTrace,
        	    		() -> Log.d(TAG, "complete"));
    }
    
    private void setResult(Item item) {
        // 通信結果を何かする
    }
    

    io スレッドで通信してメインスレッドでsetResultメソッドを呼ぶ、という流れ これだと上にも上げたようなエラーの stacktrace が表示された

    W/System.err﹕ rx.exceptions.MissingBackpressureException
    W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:338)
    W/System.err﹕ at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
    W/System.err﹕ at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)
    

    ワーカースレッドから送られてくるデータにメインスレッドでの処理が間に合ってないようなイメージだと思う

    解決策

    RxJava Wikiにもあるようにいくつか解決策があった

    new Client(url).get()
    		.onBackpressureBuffer() // ここ
       	    .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            // .onBackpressureBuffer() ここだとダメ
       	    .subscribe(/** なにか処理 **/);
    

    この例のようにonBackpressureBufferやその他onBackpressureDroponBackpressureBlockなどをメソッドチェーンで call するといい感じに完了できる

    • onBackpressureBuffer - ワーカースレッドからの通知をバッファリングする
    • onBackpressureDrop - メインスレッドでの処理中にはワーカースレッドから通知せずに捨てる
    • onBackpressureBlock - メインスレッドの処理が終わるまでワーカースレッド処理を一時停止させる

    理解がやや曖昧ですが、こんな感じのはず observeOnより前に call しなければいけない点に注意しなければいけない

    from: https://qiita.com/petitviolet/items/f6d9395d40ee0bbef94b