/ / rxjava opuszcza kolejne wyniki - rx-java, rx-java2

rxjava obniża wyniki końcowe - rx-java, rx-java2

Mam metodę rx do wywołania api, a osoba wywołująca tę metodę może zdarzyć się wiele razy w bardzo krótkim czasie. Tak więc metoda rx byłaby

public void apiCallWithRx() {
apiService.makeApiCall()
.subscribeOn(Schecdulars.io())
.observeOn(AndroidSchedulers.mainTread())
.subscribe(
// onNext
new onConsume(),
// onError
new onConsume(),

);
}

Metoda wywołująca może wywołać tę apiCallWithRxwiele razy w krótkim czasie .. Problem polega jednak na tym, że czasami nie mogłem uzyskać odpowiedzi z powodu spadku podczas połączenia z drugiego razu lub w określonym czasie. Ani onNext, onError ani onComplete nie są wywoływane. Zastanawiam się, czy to z powodu bufora, czy przeciwciśnienia .. Próbowałem z rxjava1 i rxjava2, są one takie same.

Byłbym bardzo wdzięczny za jakąkolwiek radę.

AKTUALIZACJA 1

Nie widziałem żadnego wyjątku przeciwciśnienia, więc nie mógł to być problem z przeciwciśnieniem.

AKTUALIZACJA 2

Proszę zignorować szczegóły, kod Rx działa przez większość czasu. Po prostu pominąłem jakiś kod do celów ilustracyjnych

AKTUALIZACJA 3

Mam BlockingQueue w tle, więc ta metoda jest wywoływana, gdy w kolejce są dostępne dane. Dane mogą być dodawane do kolejki w dowolnym momencie. I ta metoda jest nie nazywany asynchronicznie, ponieważ ta metoda jest wywoływana tylko po pierwszej odpowiedzi, a następnie sprawdź kolejkę, jeśli istnieją dane, wtedy wysyłamy drugie żądanie API.

Odpowiedzi:

0 dla odpowiedzi № 1

Observable jest domyślnie leniwy i musisz subscribe do tego dla Observable biegać.


0 dla odpowiedzi nr 2

Nie wiem dlaczego w RxJava2 subscribeOn i observerOn operator może być użyty bez określenia wątku, w którym ma zostać wykonany.

Ale przynajmniej w RxJava1, jeśli używasz tych operatorów, uruchamiasz je asynchronicznie, a następnie musisz czekać na wykonanie, aby uzyskać wynik.

Sprawdź ten test

@Test
public void testObservableAsync() throws InterruptedException {
Subscription subscription = Observable.from(numbers)
.doOnNext(increaseTotalItemsEmitted())
.subscribeOn(Schedulers.newThread())
.subscribe(number -> System.out.println("Items emitted:" + total));
System.out.println("I finish before the observable finish.  Items emitted:" + total);
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}

Możesz zobaczyć tutaj inne przykłady https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java