/ / Come può essere sospeso un osservabile senza perdere gli oggetti emessi? - rx-java, reactivex

Come può essere sospeso un osservabile senza perdere gli oggetti emessi? - rx-java, reactivex

ho un Observable che emette zecche ogni secondo:

Observable.interval(0, 1, TimeUnit.SECONDS)
.take(durationInSeconds + 1));

Mi piacerebbe mettere in pausa questo Osservabile in modo che smetta di emettere numeri, e riprenderlo su richiesta.

Ci sono alcuni trucchi:

  • secondo il Observable javadoc, interval l'operatore non supporta la contropressione
  • il wiki RxJava circa contropressione ha una sezione su Blocco del blocco delle chiamate come alternativa di controllo del flusso alla contropressione:

Un altro modo di gestire un Osservabile eccessivamente produttivo è bloccare il blocco delle chiamate (parcheggiando il filo che governa l'Osservabile eccessivamente produttivo). Questo ha lo svantaggio di andare contro il modello "reattivo" e non bloccante di Rx. Tuttavia questa può essere una valida opzione se l'Osservabile problematico si trova su un thread che può essere bloccato in modo sicuro. Attualmente RxJava non espone alcun operatore per facilitare questo.

C'è un modo per mettere in pausa un interval Osservabile? O dovrei implementare il mio "ticking" Osservabile con un supporto di backpressure?

risposte:

5 per risposta № 1

Ci sono molti modi per farlo. Ad esempio, puoi ancora usarlo interval() e mantieni due stati aggiuntivi: diciamo la bandiera booleana "in pausa" e un contatore.

public static final Observable<Long> pausableInterval(
final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {

final AtomicLong counter = new AtomicLong();
return Observable.interval(initial, interval, unit, scheduler)
.filter(tick -> !paused.get())
.map(tick -> counter.getAndIncrement());
}

Quindi basta chiamare paused.set (true / false) da qualche parte per mettere in pausa / riprendere

EDIT 2016-06-04

C'è un piccolo problema con una soluzione sopra. Se riusciamo a riutilizzare l'istanza osservabile più volte, inizierà dal valore all'ultimo annullamento dell'iscrizione. Per esempio:

Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();

Mentre la lista1 sarà attesa [0,1,2,3,4], lista2sarà in realtà [5,6,7,8,9]. Se il comportamento sopra non è desiderato, l'osservabile deve essere reso apolide. Questo può essere ottenuto dall'operatore scan (). La versione rivista può assomigliare a questo:

  public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay,
final long period, TimeUnit unit, Scheduler scheduler) {

return Observable.interval(initialDelay, period, unit, scheduler)
.filter(tick->!pause.get())
.scan((acc,tick)->acc + 1);
}

Oppure, se non desideri la dipendenza da Java 8 e lambda, puoi fare qualcosa del genere usando il codice compatibile con Java 6+:

https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361