ho un Observable
che emette zecche ogni secondo:
Observable.interval(0, 1, TimeUnit.SECONDS)
.take(durationInSeconds + 1));
Mi piacerebbe mettere in pausa questo Osservabile in modo che smetta di emettere numeri, e riprenderlo su richiesta.
Ci sono alcuni trucchi:
- secondo il
Observable
javadoc,interval
l'operatore non supporta la contropressione - il wiki RxJava circa contropressione ha una sezione su Blocco del blocco delle chiamate come alternativa di controllo del flusso alla contropressione:
Un altro modo di gestire un Osservabile eccessivamente produttivo è bloccare il blocco delle chiamate (parcheggiando il filo che governa l'Osservabile eccessivamente produttivo). Questo ha lo svantaggio di andare contro il modello "reattivo" e non bloccante di Rx. Tuttavia questa può essere una valida opzione se l'Osservabile problematico si trova su un thread che può essere bloccato in modo sicuro. Attualmente RxJava non espone alcun operatore per facilitare questo.
C'è un modo per mettere in pausa un interval
Osservabile? O dovrei implementare il mio "ticking" Osservabile con un supporto di backpressure?
risposte:
5 per risposta № 1Ci sono molti modi per farlo. Ad esempio, puoi ancora usarlo interval()
e mantieni due stati aggiuntivi: diciamo la bandiera booleana "in pausa" e un contatore.
public static final Observable<Long> pausableInterval(
final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {
final AtomicLong counter = new AtomicLong();
return Observable.interval(initial, interval, unit, scheduler)
.filter(tick -> !paused.get())
.map(tick -> counter.getAndIncrement());
}
Quindi basta chiamare paused.set (true / false) da qualche parte per mettere in pausa / riprendere
EDIT 2016-06-04
C'è un piccolo problema con una soluzione sopra. Se riusciamo a riutilizzare l'istanza osservabile più volte, inizierà dal valore all'ultimo annullamento dell'iscrizione. Per esempio:
Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();
Mentre la lista1 sarà attesa [0,1,2,3,4], lista2sarà in realtà [5,6,7,8,9]. Se il comportamento sopra non è desiderato, l'osservabile deve essere reso apolide. Questo può essere ottenuto dall'operatore scan (). La versione rivista può assomigliare a questo:
public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay,
final long period, TimeUnit unit, Scheduler scheduler) {
return Observable.interval(initialDelay, period, unit, scheduler)
.filter(tick->!pause.get())
.scan((acc,tick)->acc + 1);
}
Oppure, se non desideri la dipendenza da Java 8 e lambda, puoi fare qualcosa del genere usando il codice compatibile con Java 6+: