/ / RxJava2 - Mehrere Observables zusammenfassen, Ausführung verhindern, wenn erstes Element gefunden wird - Reaktivprogrammierung, rx-java2

RxJava2 - Concat mehrere Observablen, verhindern die Ausführung, wenn das erste Element gefunden wurde - reaktive Programmierung, rx-java2

Ich arbeite an der Implementierung eines einfachen Datenproviders mit 2-Ebenen-Cache unter Verwendung von RxJava2

public static void main(String args[]) {
Observable.concat(getFromMemory(), getFromFileCache(), getFromNetwork())
.firstElement()
.subscribe((integer) ->
{
System.out.println("Completed with val: " + integer);
});
}

static Observable<Integer> getFromMemory() {

return Observable.create(e -> {
System.out.println("Source: Memory");
e.onNext(1);
e.onComplete();
});
}

static Observable<Integer> getFromFileCache() {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}

return Observable.create(e -> {
System.out.println("Source: FileCache");
e.onNext(2);
e.onComplete();
});
}

static Observable<Integer> getFromNetwork() {
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}

return Observable.create(e -> {
System.out.println("Source: Network");
e.onNext(3);
e.onComplete();
});
}

Ziel ist es, nach Objekten im Speicher-Cache zu suchen, wenn nicht gefunden - aus Datei lesen, wenn nicht gefunden - Netzwerk anrufen, um Ressource abzurufen. Die Cache-Aktualisierung ist in diesem Beispiel nicht wichtig.

Beim Ausführen dieses Codes wird das Konsolenprotokoll angezeigt:

Source: Memory
Source: FileCache
Source: Network
Completed with val: 1

Welcher Mittelwert für Dateicache und Netzwerkaufruf istausgeführt, obwohl der Speichercache einen Wert zurückgibt. Ich verwende rxJava 2. Welchen Operator kann ich verwenden, um Quellen zu kombinieren, aber die Ausführung bei erstem Wert abbrechen? Ich habe mit first (default) experimentiert und nehme, bisher kein Glück

Antworten:

0 für die Antwort № 1

Eigentlich ist es ein Fehler in rxJava2 - ein Upgrade auf 2.1.0 hat geholfen:

Github-Problem: https://github.com/ReactiveX/RxJava/issues/5100