/ / Wie kann ein Observable angehalten werden, ohne die ausgestrahlten Objekte zu verlieren? - RX-Java, ReactiveX

Wie kann ein Observable pausiert werden, ohne die emittierten Objekte zu verlieren? - RX-Java, reaktivex

ich habe ein Observable das stößt jede Sekunde Zecken aus:

Observable.interval(0, 1, TimeUnit.SECONDS)
.take(durationInSeconds + 1));

Ich möchte dieses Observable anhalten, damit es keine Zahlen mehr ausgibt, und es bei Bedarf fortsetzen.

Es gibt einige Fallstricke:

  • entsprechend der Observable Javadoc, interval Der Bediener unterstützt den Gegendruck nicht
  • das RxJava Wiki über Gegendruck hat einen Abschnitt über Callstack-Blockierung als Durchflussregelungsalternative zum Gegendruck:

Eine andere Möglichkeit, eine überproduktive Observable zu handhaben, besteht darin, den Callstack zu blockieren (den Thread zu parken, der die überproduktive Observable regelt). Dies hat den Nachteil, dass es dem „reaktiven“ und nicht blockierenden Modell von Rx widerspricht. Dies kann jedoch eine praktikable Option sein, wenn sich das problematische Observable auf einem Thread befindet, der sicher blockiert werden kann. Derzeit macht RxJava keine Operatoren verfügbar, um dies zu ermöglichen.

Gibt es eine Möglichkeit, eine Pause einzulegen? interval Beobachtbar? Oder sollte ich mein eigenes "Ticking" implementieren, das mit etwas Gegendruckunterstützung beobachtet werden kann?

Antworten:

5 für die Antwort № 1

Dafür gibt es viele Möglichkeiten. Zum Beispiel können Sie immer noch verwenden interval() und behalten Sie zwei zusätzliche Zustände bei: sagen Sie Boolesches Flag "angehalten" und einen Zähler.

public static final Observable<Long> pausableInterval(
final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {

final AtomicLong counter = new AtomicLong();
return Observable.interval(initial, interval, unit, scheduler)
.filter(tick -> !paused.get())
.map(tick -> counter.getAndIncrement());
}

Dann rufen Sie einfach paused.set (true / false) auf, um die Wiedergabe anzuhalten bzw. fortzusetzen

EDIT 2016-06-04

Es gibt ein kleines Problem mit der obigen Lösung. Wenn wir die beobachtbare Instanz mehrmals wiederverwenden, beginnt sie mit dem Wert beim letzten Abbestellen. Zum Beispiel:

Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();

Während list1 erwartet wird [0,1,2,3,4], list2wird tatsächlich [5,6,7,8,9] sein. Wenn das obige Verhalten nicht erwünscht ist, muss der Observable zustandslos gemacht werden. Dies kann mit scan () erreicht werden. Die überarbeitete Version kann so aussehen:

  public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay,
final long period, TimeUnit unit, Scheduler scheduler) {

return Observable.interval(initialDelay, period, unit, scheduler)
.filter(tick->!pause.get())
.scan((acc,tick)->acc + 1);
}

Oder wenn Sie keine Abhängigkeit von Java 8 und Lambdas wünschen, können Sie so etwas mit Java 6+ -kompatiblem Code tun:

https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361