Rozważ następującą stronę użytkownika:
Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
.blockingSubscribe(v -> consumeSlowly(v));
Oczekiwany wynik po wykonaniu poprzedniego koduczy powinienem otrzymać wyjątek z pamięci, ponieważ Flowable.blockingSubscribe żąda pozycji Long.MAX_VALUE, iterator jest pozbawiony terabajta danych, a subskrybent nie jest w stanie nadążyć.
Aby rozwiązać ten problem, dodam rebatchRequests do mojego kodu:
Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
.rebatchRequests(128)
.blockingSubscribe(v -> consumeSlowly(v));
Moim oczekiwanym rezultatem, kiedy wykonuję poniższy kod, jest to, że wszystko działa idealnie, jako dokumentacja dla Flowable.rebatchRequests stwierdza, że:
Ten operator umożliwia zapobieganie uruchamianiu nieokreślonego trybu przez żądanie (Long.MAX_VALUE) lub rekompensuje obciążenie związane z pozycjami dla małych i częstych żądań.
W praktyce otrzymuję inny wyjątek poza pamięcią. Jak korzystać z przeciwciśnienia, aby zapobiec natychmiastowemu opróżnianiu iteratora przez płyn?
Odpowiedzi:
1 dla odpowiedzi № 1Problem polega na tym, że masz przepływ synchroniczny i blockingSubscribe
umieszcza w kolejce przedmioty, które nie mogą zostać zużyte z powodu fromIterable
nadal emituje w tym samym wątku. rebatchRequest
nie pomaga tutaj, ponieważ jego konsument jest wciąż nieograniczony blockingSubscribe
więc zawsze bierze przedmioty i konsumuje fromIterable
całkowicie.
Albo nie potrzebujesz blockingSubscribe
lub powinieneś rozważyć użycie blockingIterable()
dla każdego, kto żąda więcej tylko wtedy, gdy bieżący wątek rzeczywiście pochłonął przedmioty. W przeciwnym razie będziesz musiał użyć blockingSubscribe(Subscriber)
przeciążenie i żądanie ręczne:
source.blockingSubscribe(new Subscriber<T>() {
Subscription upstream;
@Override public void onSubscribe(Subscription s) {
upstream = s;
s.request(1);
}
@Override public void onNext(T item) {
consumeSlowly(v);
upstream.request(1);
}
@Override public void onError(Throwable ex) {
ex.printStackTrace();
}
@Override public void onComplete() {
}
});
Zauważ jednak, że masz już Iterable
które można spożywać synchronicznie z użytkownikami bez udziału RxJavy.