/ / Asynchrone Bereinigung bei Terminierung einer beobachtbaren Sequenz - rx-java

Async-Bereinigung bei beobachtbarer Sequenzbeendigung

Ich muss eine asynchrone Methode (z. B. einen Bereinigungsjob) bei einem beobachtbaren Abschluss / Fehler ausführen. Auch wenn die Bereinigungsmethode fehlschlägt, sollte die beobachtbare Kette ebenfalls fehlschlagen.

Gibt es eine Standardmethode dafür?

Angenommen, ich habe eine Sequenz Observable source und eine asyncCleanup () -Methode, die ein beobachtbares Bereinigungsergebnis zurückgeben.

Die Nebenwirkungsmethoden wie doOnCompleted / doOnTerminate / doOnUnsubscribe scheinen nicht geeignet zu sein:

source.doOnUnsubscribe(()->asyncCleanup().subscribe());

Die beobachtbare Kette ist auch dann erfolgreich, wenn asyncCleanup () fehlschlägt. AsyncCleanup () sollte also Teil derselben Kette sein.

Das Beste, was ich mir ausgedacht habe, ist das Folgende:

source.onErrorResumeNext(err->asyncCleanup()
.flatMap(cleanupResult->Observable.error(err)))
.concatWith(asyncCleanup()
.flatMap(cleanupResult->Observable.empty()));

Im Fehlerfall ruft onErrorResumeNext aufasyncCleanup () und wird dem ursprünglichen Fehler zugeordnet. Im Erfolgsfall können wir mit asyncCleanup () verknüpfen, das einer leeren Sequenz zugeordnet ist. Leider funktioniert es nicht, wenn ein take () - oder ähnlicher einschränkender Operator nachgeschaltet ist, der dazu führen kann, dass das verkettete Observable nicht einmal abonniert wird.

UPDATE 01.08.2017: Die Frage bezieht sich speziell auf Sequenzobservablen. Für eine beobachtbare Einzelelementquelle ist die Lösung ganz einfach:

singleItemSource
.onErrorResumeNext(err->asyncCleanup()
.flatMap(cleanupResult->Observable.error(err)))
.flatMap(single->asyncCleanup()
.map(cleanupResult->single));

Antworten:

1 für die Antwort № 1

Die Idee, die Sie vorschlagen, bricht die Observable Vertrag, weil:

Rx-Designrichtlinien, 6.8 - Abmeldung sollte nicht werfen</ em>

Das einzige, was mir in den Sinn kommt, um Ihr Problem zu lösen, ist sich zu wenden asyncCleanup() auf den gleichen Typ der vorhergehenden Sequenz und im Erfolgsfall zum Beispiel Observable.empty () zurück, den Sie verwenden können concat Operator.

public Observable<Long> asyncCleanup() {
...
return Observable.empty();
}

//Concat asyncCleanup() to be executed after the sequence
Observable.rangeLong(1, 10)
.concatWith(asyncCleanup())
.subscribe(System.out::println, System.out::println);

Ich hoffe es hilft...


1 für die Antwort № 2

Da ist ein flatMap Überladung, mit der Sie die onXXX Ereignisse des Upstream in Observables:

Observable.range(1, 10)
.flatMap(Observable::just, e -> asyncCleanup(), () -> asyncCleanup())
.subscribe(...);

0 für die Antwort № 3

Die andere Lösung, die ich kommentierte, ist die Factory-Methode Observable.using. Hier ist ein einfaches Beispiel, wie es funktioniert:

Observable<Long> range = Observable.rangeLong(1, 10);

Observable.using(() -> range, //The resource provider
observable -> observable, //The resource
observable -> asyncCleanup() //The resource disposer
.subscribe(System.out::println, System.out::println))
.subscribe(System.out::println, System.out::println);

Der Nachteil hierbei ist, dass Sie damit umgehen sollten asyncCleanup() Erfolg und Fehler, ohne sich zu verbreiten oder zu werfen. Bei diesem Ansatz sollten Sie darüber nachdenken, sich im Fall von von Observables zu entfernen asyncCleanup() Methode.