/ / Rx-Java2 Floawable.rebatchRequests nie odbiera zleceń - rx-java, rx-java2

Rx-Java2 Floawable.rebatchRequests nie odbiera żądań - rx-java, rx-java2

Rozważ następującą stronę użytkownika:

Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
.blockingSubscribe(v -> consumeSlowly(v));

Oczekiwany wynik po wykonaniu poprzedniego koduczy powinienem otrzymać wyjątek z pamięci, ponieważ Flowable.blockingSubscribe żąda pozycji Long.MAX_VALUE, iterator jest pozbawiony terabajta danych, a subskrybent nie jest w stanie nadążyć.


Aby rozwiązać ten problem, dodam rebatchRequests do mojego kodu:

Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
.rebatchRequests(128)
.blockingSubscribe(v -> consumeSlowly(v));

Moim oczekiwanym rezultatem, kiedy wykonuję poniższy kod, jest to, że wszystko działa idealnie, jako dokumentacja dla Flowable.rebatchRequests stwierdza, że:

Ten operator umożliwia zapobieganie uruchamianiu nieokreślonego trybu przez żądanie (Long.MAX_VALUE) lub rekompensuje obciążenie związane z pozycjami dla małych i częstych żądań.

W praktyce otrzymuję inny wyjątek poza pamięcią. Jak korzystać z przeciwciśnienia, aby zapobiec natychmiastowemu opróżnianiu iteratora przez płyn?

Odpowiedzi:

1 dla odpowiedzi № 1

Problem polega na tym, że masz przepływ synchroniczny i blockingSubscribe umieszcza w kolejce przedmioty, które nie mogą zostać zużyte z powodu fromIterable nadal emituje w tym samym wątku. rebatchRequest nie pomaga tutaj, ponieważ jego konsument jest wciąż nieograniczony blockingSubscribe więc zawsze bierze przedmioty i konsumuje fromIterable całkowicie.

Albo nie potrzebujesz blockingSubscribe lub powinieneś rozważyć użycie blockingIterable() dla każdego, kto żąda więcej tylko wtedy, gdy bieżący wątek rzeczywiście pochłonął przedmioty. W przeciwnym razie będziesz musiał użyć blockingSubscribe(Subscriber) przeciążenie i żądanie ręczne:

source.blockingSubscribe(new Subscriber<T>() {
Subscription upstream;
@Override public void onSubscribe(Subscription s) {
upstream = s;
s.request(1);
}

@Override public void onNext(T item) {
consumeSlowly(v);
upstream.request(1);
}

@Override public void onError(Throwable ex) {
ex.printStackTrace();
}

@Override public void onComplete() {
}
});

Zauważ jednak, że masz już Iterable które można spożywać synchronicznie z użytkownikami bez udziału RxJavy.