/ / Rx-Java2 Floawable.rebatchRequests अनुरोधों को रीबैच नहीं करता है - आरएक्स-जावा, आरएक्स-जावा 2

आरएक्स-जावा 2 Floawable.rebatchRequests अनुरोधों को रीबैच नहीं करता है - आरएक्स-जावा, आरएक्स-जावा 2

निम्नलिखित उपयोगकर्ताकेस पर विचार करें:

Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
.blockingSubscribe(v -> consumeSlowly(v));

अपेक्षित परिणाम जब मैं पूर्व कोड निष्पादित करता हूंक्या मुझे मेमोरी अपवाद से बाहर निकलना चाहिए, फ्लोएबल.ब्लॉकिंग सदस्यता अनुरोध लंबे समय तक .MAX_VALUE आइटम, इटरेटर डेटा के टेराबाइट से निकाला जाता है, और ग्राहक बनाए रखने में असमर्थ है।


इसे हल करने के लिए, मैं अपने कोड में rebatchRequests जोड़ता हूं:

Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
.rebatchRequests(128)
.blockingSubscribe(v -> consumeSlowly(v));

मेरा अपेक्षित परिणाम जब मैं निम्नलिखित कोड निष्पादित करता हूं यह है कि सबकुछ दस्तावेज़ीकरण के रूप में पूरी तरह से काम करता है Flowable.rebatchRequests कहा गया है कि:

यह ऑपरेटर डाउनस्ट्रीम को अनुरोध (Long.MAX_VALUE) के माध्यम से असंबद्ध मोड को ट्रिगर करने या छोटे और लगातार अनुरोधों के प्रति-आइटम ओवरहेड की क्षतिपूर्ति करने की अनुमति देता है।

प्रैक्टिस में, मुझे एक और आउट-ऑफ-मेमोरी अपवाद मिलता है। मैं पुनरावर्तक को तुरंत इटरेटर को निकालने से रोकने के लिए बैकप्रेसर का उपयोग कैसे करूं?

उत्तर:

उत्तर № 1 के लिए 1

समस्या यह है कि आपके पास एक तुल्यकालिक प्रवाह है और blockingSubscribe उन वस्तुओं को कतारबद्ध करें जिन्हें तब खपत नहीं किया जा सकता है fromIterable अभी भी एक ही धागे पर उत्सर्जित करना। rebatchRequest यहां मदद नहीं करता है क्योंकि इसका उपभोक्ता अभी भी असंबद्ध है blockingSubscribe इसलिए यह हमेशा सामान लेता है और उपभोग करता है fromIterable पूरी तरह से।

या तो आपको जरूरत नहीं है blockingSubscribe या आपको उपयोग करने पर विचार करना चाहिए blockingIterable() प्रत्येक के लिए जो केवल तभी अनुरोध करता है जब वर्तमान धागे ने वास्तव में वस्तुओं का उपभोग किया हो। अन्यथा, आपको इसका उपयोग करना होगा blockingSubscribe(Subscriber) अधिभार और मैन्युअल रूप से अनुरोध:

source.blockingSubscribe(new Subscriber<T>() {
Subscription upstream;
@Override public void onSubscribe(Subscription s) {
upstream = s;
s.request(1);
}

@Override public void onNext(T item) {
consumeSlowly(v);
upstream.request(1);
}

@Override public void onError(Throwable ex) {
ex.printStackTrace();
}

@Override public void onComplete() {
}
});

ध्यान दें कि आपके पास पहले से ही एक है Iterable जिसे आप आरएक्सजेवा की भागीदारी के बिना प्रत्येक के साथ समकालिक रूप से उपभोग कर सकते हैं।