/ / Пауза та відновлення спостережуваного на основі логічних воріт в RxJava 2.X? - android, rx-java, реактивне програмування, rx-java2

Призупинити і відновити спостережуваний на основі логічних воріт в RxJava 2.X? - android, rx-java, реактивне програмування, rx-java2

Скажімо, у мене є Процесор, який випускає логічне значення коли коли натискається кнопка, подумайте про це як перемикач.

    boolean gateValue = true;
PublishProcessor<Boolean> gate = PublishProcessor.create();
view.onButtonClicked()
.subscribe(new Action1<Void>() {
@Override
public void call(final Void aVoid) {
gate.onNext(gateValue = !gateValue);
}
}));

Що я хотів би зробити, це використання значення воріт для призупинення та відновлення спостережуваної послідовності, буферизація випущених значень під час призупинення.

Я багато читав у цьому, і, хоча, здається, це можливо в реактивних розширеннях для інших мов, RxJava, схоже, не підтримує його.

Ось приклад того, що я хочу досягти, цепросто виводить додаткове значення кожну секунду. Коли я натискаю кнопку, я хочу, щоб вихід зупинився, поки я не натискаю його знову, щоб виводити кожен елемент, що випускається між двома кнопками, натискає:

Flowable.interval(1, TimeUnit.SECONDS)
.bufferWhile(gate)
.flatMapIterable(longs -> longs)
.subscribe(new Consumer<Long>() {
@Override
public void accept(final Long aLong) throws Exception {
view.displayTime(aLong);
}
});

Хто-небудь знає спосіб досягти щось подібне?

Редагувати Я написав повідомлення в блозі про те, як це досягти https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk

Відповіді:

4 для відповіді № 1

В даний час є оператор valve() у бібліотеці RxJava2Extensions, яка виконує запитувану поведінку.


1 для відповіді № 2

Це можна досягти за допомогою операторів RxJava за замовчуванням:

    final Random random = new Random();
final Observable<Boolean> gates = Observable.interval(10, TimeUnit.SECONDS)
.map(it -> random.nextBoolean())
.startWith(false)
.doOnNext(it -> System.out.println("Gate " + (it ? "opened" : "closed")))
.take(100);
final Observable<Long> data = Observable.interval(3, TimeUnit.SECONDS)
.doOnNext(it -> System.out.println("New value " + it))
.take(100);

gates.publish(innerGates -> data.publish(innerData -> Observable.merge(
innerData.window(
innerGates.distinctUntilChanged().filter(it -> it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> !it))
).flatMap(it -> it),
innerData.buffer(
innerGates.distinctUntilChanged().filter(it -> !it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> it))
)
.flatMap(Observable::from)
)))
.subscribe(it -> System.out.println("On next " + it));

0 для відповіді № 3

Просто використовуйте готові Observable.window оператор, який приймає один Observable<T> параметр