Mam metodę rx do wywołania api, a osoba wywołująca tę metodę może zdarzyć się wiele razy w bardzo krótkim czasie. Tak więc metoda rx byłaby
public void apiCallWithRx() {
apiService.makeApiCall()
.subscribeOn(Schecdulars.io())
.observeOn(AndroidSchedulers.mainTread())
.subscribe(
// onNext
new onConsume(),
// onError
new onConsume(),
);
}
Metoda wywołująca może wywołać tę apiCallWithRxwiele razy w krótkim czasie .. Problem polega jednak na tym, że czasami nie mogłem uzyskać odpowiedzi z powodu spadku podczas połączenia z drugiego razu lub w określonym czasie. Ani onNext, onError ani onComplete nie są wywoływane. Zastanawiam się, czy to z powodu bufora, czy przeciwciśnienia .. Próbowałem z rxjava1 i rxjava2, są one takie same.
Byłbym bardzo wdzięczny za jakąkolwiek radę.
AKTUALIZACJA 1
Nie widziałem żadnego wyjątku przeciwciśnienia, więc nie mógł to być problem z przeciwciśnieniem.
AKTUALIZACJA 2
Proszę zignorować szczegóły, kod Rx działa przez większość czasu. Po prostu pominąłem jakiś kod do celów ilustracyjnych
AKTUALIZACJA 3
Mam BlockingQueue w tle, więc ta metoda jest wywoływana, gdy w kolejce są dostępne dane. Dane mogą być dodawane do kolejki w dowolnym momencie. I ta metoda jest nie nazywany asynchronicznie, ponieważ ta metoda jest wywoływana tylko po pierwszej odpowiedzi, a następnie sprawdź kolejkę, jeśli istnieją dane, wtedy wysyłamy drugie żądanie API.
Odpowiedzi:
0 dla odpowiedzi № 1Observable
jest domyślnie leniwy i musisz subscribe
do tego dla Observable
biegać.
0 dla odpowiedzi nr 2
Nie wiem dlaczego w RxJava2 subscribeOn i observerOn operator może być użyty bez określenia wątku, w którym ma zostać wykonany.
Ale przynajmniej w RxJava1, jeśli używasz tych operatorów, uruchamiasz je asynchronicznie, a następnie musisz czekać na wykonanie, aby uzyskać wynik.
Sprawdź ten test
@Test
public void testObservableAsync() throws InterruptedException {
Subscription subscription = Observable.from(numbers)
.doOnNext(increaseTotalItemsEmitted())
.subscribeOn(Schedulers.newThread())
.subscribe(number -> System.out.println("Items emitted:" + total));
System.out.println("I finish before the observable finish. Items emitted:" + total);
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}
Możesz zobaczyć tutaj inne przykłady https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java