/ / Wstrzymaj i wznów obserwowalne na podstawie bramki logicznej w RxJava 2.X? - android, rx-java, reaktywne programowanie, rx-java2

Zatrzymaj i wznów obserwowalne na podstawie bramki logicznej w RxJava 2.X? - android, rx-java, reaktywne programowanie, rx-java2

Powiedzmy, że mam procesor, który emituje wartość boolowską, gdy naciśnięty jest dowolny przycisk, potraktuj to jako przełączenie.

    boolean gateValue = true;
PublishProcessor<Boolean> gate = PublishProcessor.create();
view.onButtonClicked()
.subscribe(new Action1<Void>() {
@Override
public void call(final Void aVoid) {
gate.onNext(gateValue = !gateValue);
}
}));

Chciałbym użyć wartości bramki, aby wstrzymać i wznowić obserwowalną sekwencję, buforując emitowane wartości podczas pauzy.

Wczytałem to dużo i chociaż wydaje się to możliwe w reaktywnych rozszerzeniach dla innych języków, RxJava nie wydaje się go wspierać.

Oto przykład tego, co chciałbym osiągnąćpo prostu wysyła co przyrostową wartość przyrostową. Po naciśnięciu przycisku chcę, aby wyjście zatrzymało się, dopóki nie naciśnie go ponownie, co powinno wyprowadzać każdy element emitowany między dwoma naciśnięciami przycisku:

Flowable.interval(1, TimeUnit.SECONDS)
.bufferWhile(gate)
.flatMapIterable(longs -> longs)
.subscribe(new Consumer<Long>() {
@Override
public void accept(final Long aLong) throws Exception {
view.displayTime(aLong);
}
});

Czy ktoś wie o sposobie osiągnięcia czegoś takiego?

Edytować Napisałem post na blogu, jak to osiągnąć https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk

Odpowiedzi:

4 dla odpowiedzi № 1

Teraz jest operator valve() w bibliotece RxJava2Extensions, która wykonuje żądane zachowanie.


1 dla odpowiedzi nr 2

Możesz to osiągnąć używając domyślnych operatorów RxJava:

    final Random random = new Random();
final Observable<Boolean> gates = Observable.interval(10, TimeUnit.SECONDS)
.map(it -> random.nextBoolean())
.startWith(false)
.doOnNext(it -> System.out.println("Gate " + (it ? "opened" : "closed")))
.take(100);
final Observable<Long> data = Observable.interval(3, TimeUnit.SECONDS)
.doOnNext(it -> System.out.println("New value " + it))
.take(100);

gates.publish(innerGates -> data.publish(innerData -> Observable.merge(
innerData.window(
innerGates.distinctUntilChanged().filter(it -> it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> !it))
).flatMap(it -> it),
innerData.buffer(
innerGates.distinctUntilChanged().filter(it -> !it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> it))
)
.flatMap(Observable::from)
)))
.subscribe(it -> System.out.println("On next " + it));

0 dla odpowiedzi № 3

Po prostu użyj gotowego Observable.window operator, który je bierze Observable<T> parametr.