/ / Anhalten und Fortsetzen eines beobachtbaren Objekts basierend auf einem booleschen Gate in RxJava 2.X? - Android, RX-Java, reaktive Programmierung, RX-Java2

Pausieren und Fortsetzen einer Observablen basierend auf einem booleschen Gatter in RxJava 2.X? - Android, RX-Java, reaktive Programmierung, RX-Java2

Nehmen wir an, ich habe einen Prozessor, der einen booleschen Wert ausgibt, wenn ein Knopf gedrückt wird.

    boolean gateValue = true;
PublishProcessor<Boolean> gate = PublishProcessor.create();
view.onButtonClicked()
.subscribe(new Action1<Void>() {
@Override
public void call(final Void aVoid) {
gate.onNext(gateValue = !gateValue);
}
}));

Ich möchte den Wert des Gatters verwenden, um eine beobachtbare Sequenz anzuhalten und wieder aufzunehmen, wobei die emittierten Werte während der Pause zwischengespeichert werden.

Ich habe viel darüber gelesen, und obwohl es in reaktiven Erweiterungen für andere Sprachen möglich erscheint, scheint RxJava dies nicht zu unterstützen.

Hier ist ein Beispiel dafür, was ich erreichen möchtegibt einfach jede Sekunde einen inkrementellen Wert aus. Wenn ich die Taste drücke, möchte ich, dass die Ausgabe gestoppt wird, bis ich sie erneut drücke, was jedes Element ausgeben soll, das zwischen den beiden Tastendrücken ausgegeben wird:

Flowable.interval(1, TimeUnit.SECONDS)
.bufferWhile(gate)
.flatMapIterable(longs -> longs)
.subscribe(new Consumer<Long>() {
@Override
public void accept(final Long aLong) throws Exception {
view.displayTime(aLong);
}
});

Kennt jemand einen Weg, um so etwas zu erreichen?

Bearbeiten Ich habe einen Blogpost geschrieben, wie man das erreichen kann https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk

Antworten:

4 für die Antwort № 1

Es gibt jetzt einen Operator valve() in der RxJava2Extensions-Bibliothek, die das angeforderte Verhalten ausführt.


1 für die Antwort № 2

Dies erreichen Sie mit den Standard-RxJava-Operatoren

    final Random random = new Random();
final Observable<Boolean> gates = Observable.interval(10, TimeUnit.SECONDS)
.map(it -> random.nextBoolean())
.startWith(false)
.doOnNext(it -> System.out.println("Gate " + (it ? "opened" : "closed")))
.take(100);
final Observable<Long> data = Observable.interval(3, TimeUnit.SECONDS)
.doOnNext(it -> System.out.println("New value " + it))
.take(100);

gates.publish(innerGates -> data.publish(innerData -> Observable.merge(
innerData.window(
innerGates.distinctUntilChanged().filter(it -> it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> !it))
).flatMap(it -> it),
innerData.buffer(
innerGates.distinctUntilChanged().filter(it -> !it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> it))
)
.flatMap(Observable::from)
)))
.subscribe(it -> System.out.println("On next " + it));

0 für die Antwort № 3

Verwenden Sie einfach das fertige Observable.window Operator, der einen nimmt Observable<T> Parameter.