Скажімо, у мене є Процесор, який випускає логічне значення коли коли натискається кнопка, подумайте про це як перемикач.
boolean gateValue = true;
PublishProcessor<Boolean> gate = PublishProcessor.create();
view.onButtonClicked()
.subscribe(new Action1<Void>() {
@Override
public void call(final Void aVoid) {
gate.onNext(gateValue = !gateValue);
}
}));
Що я хотів би зробити, це використання значення воріт для призупинення та відновлення спостережуваної послідовності, буферизація випущених значень під час призупинення.
Я багато читав у цьому, і, хоча, здається, це можливо в реактивних розширеннях для інших мов, RxJava, схоже, не підтримує його.
Ось приклад того, що я хочу досягти, цепросто виводить додаткове значення кожну секунду. Коли я натискаю кнопку, я хочу, щоб вихід зупинився, поки я не натискаю його знову, щоб виводити кожен елемент, що випускається між двома кнопками, натискає:
Flowable.interval(1, TimeUnit.SECONDS)
.bufferWhile(gate)
.flatMapIterable(longs -> longs)
.subscribe(new Consumer<Long>() {
@Override
public void accept(final Long aLong) throws Exception {
view.displayTime(aLong);
}
});
Хто-небудь знає спосіб досягти щось подібне?
Редагувати Я написав повідомлення в блозі про те, як це досягти https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk
Відповіді:
4 для відповіді № 1В даний час є оператор valve()
у бібліотеці RxJava2Extensions, яка виконує запитувану поведінку.
1 для відповіді № 2
Це можна досягти за допомогою операторів RxJava за замовчуванням:
final Random random = new Random();
final Observable<Boolean> gates = Observable.interval(10, TimeUnit.SECONDS)
.map(it -> random.nextBoolean())
.startWith(false)
.doOnNext(it -> System.out.println("Gate " + (it ? "opened" : "closed")))
.take(100);
final Observable<Long> data = Observable.interval(3, TimeUnit.SECONDS)
.doOnNext(it -> System.out.println("New value " + it))
.take(100);
gates.publish(innerGates -> data.publish(innerData -> Observable.merge(
innerData.window(
innerGates.distinctUntilChanged().filter(it -> it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> !it))
).flatMap(it -> it),
innerData.buffer(
innerGates.distinctUntilChanged().filter(it -> !it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> it))
)
.flatMap(Observable::from)
)))
.subscribe(it -> System.out.println("On next " + it));
0 для відповіді № 3
Просто використовуйте готові Observable.window
оператор, який приймає один Observable<T>
параметр