Nehmen wir an, ich habe einen Prozessor, der einen booleschen Wert ausgibt, wenn ein Knopf gedrückt wird.
boolean gateValue = true;
PublishProcessor<Boolean> gate = PublishProcessor.create();
view.onButtonClicked()
.subscribe(new Action1<Void>() {
@Override
public void call(final Void aVoid) {
gate.onNext(gateValue = !gateValue);
}
}));
Ich möchte den Wert des Gatters verwenden, um eine beobachtbare Sequenz anzuhalten und wieder aufzunehmen, wobei die emittierten Werte während der Pause zwischengespeichert werden.
Ich habe viel darüber gelesen, und obwohl es in reaktiven Erweiterungen für andere Sprachen möglich erscheint, scheint RxJava dies nicht zu unterstützen.
Hier ist ein Beispiel dafür, was ich erreichen möchtegibt einfach jede Sekunde einen inkrementellen Wert aus. Wenn ich die Taste drücke, möchte ich, dass die Ausgabe gestoppt wird, bis ich sie erneut drücke, was jedes Element ausgeben soll, das zwischen den beiden Tastendrücken ausgegeben wird:
Flowable.interval(1, TimeUnit.SECONDS)
.bufferWhile(gate)
.flatMapIterable(longs -> longs)
.subscribe(new Consumer<Long>() {
@Override
public void accept(final Long aLong) throws Exception {
view.displayTime(aLong);
}
});
Kennt jemand einen Weg, um so etwas zu erreichen?
Bearbeiten Ich habe einen Blogpost geschrieben, wie man das erreichen kann https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk
Antworten:
4 für die Antwort № 1Es gibt jetzt einen Operator valve()
in der RxJava2Extensions-Bibliothek, die das angeforderte Verhalten ausführt.
1 für die Antwort № 2
Dies erreichen Sie mit den Standard-RxJava-Operatoren
final Random random = new Random();
final Observable<Boolean> gates = Observable.interval(10, TimeUnit.SECONDS)
.map(it -> random.nextBoolean())
.startWith(false)
.doOnNext(it -> System.out.println("Gate " + (it ? "opened" : "closed")))
.take(100);
final Observable<Long> data = Observable.interval(3, TimeUnit.SECONDS)
.doOnNext(it -> System.out.println("New value " + it))
.take(100);
gates.publish(innerGates -> data.publish(innerData -> Observable.merge(
innerData.window(
innerGates.distinctUntilChanged().filter(it -> it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> !it))
).flatMap(it -> it),
innerData.buffer(
innerGates.distinctUntilChanged().filter(it -> !it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> it))
)
.flatMap(Observable::from)
)))
.subscribe(it -> System.out.println("On next " + it));
0 für die Antwort № 3
Verwenden Sie einfach das fertige Observable.window
Operator, der einen nimmt Observable<T>
Parameter.