/ / rxjava-Schalter beobachtbar, wenn der zweite beobachtbare Start Elemente ausgibt - rx-java

rxjava switch beobachtbar, wenn der zweite beobachtbare Start Items ausstrahlt - rx-java

Ich habe einige beobachtbare Elemente, die ich parallel ausführe, wie z localObservable und networkObservable. Wenn die networkObservable Startet die Ausgabe von Elementen (ab diesem Zeitpunkt benötige ich nur diese Elemente) und verwirft dann die von ausgegebenen Elemente localObservable (könnte sein localObservable hat noch nicht begonnen).

Observable<Integer> localObservable =
Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io());
Observable<Integer> networkObservable =
Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io());

Antworten:

3 für die Antwort № 1

Sie können so etwas tun:

 Observable<Long> networkObservable =
Observable.interval(1000, 500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.share();
Observable<Long> localObservable =
Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.takeUntil(networkObservable);

Observable.merge(networkObservable, localObservable)
.subscribe(System.out::println);

Dies wird ausgegeben:

0 // localObservable
1 // localObservable
0 // networkObservable from here on
1
2
...

takeUntil wird machen localObservable zu stoppen und abzumelden, wenn die erste Emission von networkObservable passiert ist, so dass die zusammengeführt Observable wird ausstrahlen localObservable so lange networkObservable hat nicht begonnen, und wenn dies der Fall ist, hört es auf zu emittieren localObservable und wechseln Sie, um nur von zu emittieren networkObservable.


0 für die Antwort № 2

Hierfür gibt es eine einfache Lösung von einem Betreiber: AMB

Schauen Sie sich einfach die Ausgabe von System.out an.

Dokumentation: http://reactivex.io/documentation/operators/amb.html

Grundsätzlich abonnieren Sie sowohl beobachtbares als auch das, was zuerst von beobachtbarem Material ausgesendet wird. Die andere beobachtbare Person wird abgemeldet.

@Test
public void ambTest() throws Exception {
TestScheduler testScheduler = new TestScheduler();

Observable<Integer> network = Observable.timer(1000, TimeUnit.MILLISECONDS, testScheduler)
.concatMap(aLong -> Observable.just(1, 2, 3))
.doOnSubscribe(disposable -> System.out.println("connect network"))
.doOnDispose(() -> System.out.println("dispose network"));

Observable<Integer> local = Observable.timer(500, TimeUnit.MILLISECONDS, testScheduler)
.concatMap(aLong -> Observable.just(4, 5, 6))
.doOnSubscribe(disposable -> System.out.println("connect local"))
.doOnDispose(() -> System.out.println("dispose local"));

Observable<Integer> integerObservable = Observable.ambArray(network, local);

TestObserver<Integer> test = integerObservable.test();

testScheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS);

test.assertValues(4, 5, 6);

testScheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);

test.assertValues(4, 5, 6);
}