Ich habe einige beobachtbare Elemente, die ich parallel ausführe, wie z localObservable
und networkObservable
. Wenn die networkObservable
Startet die Ausgabe von Elementen (ab diesem Zeitpunkt benötige ich nur diese Elemente) und verwirft dann die von ausgegebenen Elemente localObservable
(könnte sein localObservable
hat noch nicht begonnen).
Observable<Integer> localObservable =
Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io());
Observable<Integer> networkObservable =
Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io());
Antworten:
3 für die Antwort № 1Sie können so etwas tun:
Observable<Long> networkObservable =
Observable.interval(1000, 500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.share();
Observable<Long> localObservable =
Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.takeUntil(networkObservable);
Observable.merge(networkObservable, localObservable)
.subscribe(System.out::println);
Dies wird ausgegeben:
0 // localObservable
1 // localObservable
0 // networkObservable from here on
1
2
...
takeUntil
wird machen localObservable
zu stoppen und abzumelden, wenn die erste Emission von networkObservable
passiert ist, so dass die zusammengeführt Observable
wird ausstrahlen localObservable
so lange networkObservable
hat nicht begonnen, und wenn dies der Fall ist, hört es auf zu emittieren localObservable
und wechseln Sie, um nur von zu emittieren networkObservable
.
0 für die Antwort № 2
Hierfür gibt es eine einfache Lösung von einem Betreiber: AMB
Schauen Sie sich einfach die Ausgabe von System.out an.
Dokumentation: http://reactivex.io/documentation/operators/amb.html
Grundsätzlich abonnieren Sie sowohl beobachtbares als auch das, was zuerst von beobachtbarem Material ausgesendet wird. Die andere beobachtbare Person wird abgemeldet.
@Test
public void ambTest() throws Exception {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> network = Observable.timer(1000, TimeUnit.MILLISECONDS, testScheduler)
.concatMap(aLong -> Observable.just(1, 2, 3))
.doOnSubscribe(disposable -> System.out.println("connect network"))
.doOnDispose(() -> System.out.println("dispose network"));
Observable<Integer> local = Observable.timer(500, TimeUnit.MILLISECONDS, testScheduler)
.concatMap(aLong -> Observable.just(4, 5, 6))
.doOnSubscribe(disposable -> System.out.println("connect local"))
.doOnDispose(() -> System.out.println("dispose local"));
Observable<Integer> integerObservable = Observable.ambArray(network, local);
TestObserver<Integer> test = integerObservable.test();
testScheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS);
test.assertValues(4, 5, 6);
testScheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
test.assertValues(4, 5, 6);
}