निम्नलिखित उपयोगकर्ताकेस पर विचार करें:
Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
.blockingSubscribe(v -> consumeSlowly(v));
अपेक्षित परिणाम जब मैं पूर्व कोड निष्पादित करता हूंक्या मुझे मेमोरी अपवाद से बाहर निकलना चाहिए, फ्लोएबल.ब्लॉकिंग सदस्यता अनुरोध लंबे समय तक .MAX_VALUE आइटम, इटरेटर डेटा के टेराबाइट से निकाला जाता है, और ग्राहक बनाए रखने में असमर्थ है।
इसे हल करने के लिए, मैं अपने कोड में rebatchRequests जोड़ता हूं:
Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
.rebatchRequests(128)
.blockingSubscribe(v -> consumeSlowly(v));
मेरा अपेक्षित परिणाम जब मैं निम्नलिखित कोड निष्पादित करता हूं यह है कि सबकुछ दस्तावेज़ीकरण के रूप में पूरी तरह से काम करता है Flowable.rebatchRequests कहा गया है कि:
यह ऑपरेटर डाउनस्ट्रीम को अनुरोध (Long.MAX_VALUE) के माध्यम से असंबद्ध मोड को ट्रिगर करने या छोटे और लगातार अनुरोधों के प्रति-आइटम ओवरहेड की क्षतिपूर्ति करने की अनुमति देता है।
प्रैक्टिस में, मुझे एक और आउट-ऑफ-मेमोरी अपवाद मिलता है। मैं पुनरावर्तक को तुरंत इटरेटर को निकालने से रोकने के लिए बैकप्रेसर का उपयोग कैसे करूं?
उत्तर:
उत्तर № 1 के लिए 1समस्या यह है कि आपके पास एक तुल्यकालिक प्रवाह है और blockingSubscribe
उन वस्तुओं को कतारबद्ध करें जिन्हें तब खपत नहीं किया जा सकता है fromIterable
अभी भी एक ही धागे पर उत्सर्जित करना। rebatchRequest
यहां मदद नहीं करता है क्योंकि इसका उपभोक्ता अभी भी असंबद्ध है blockingSubscribe
इसलिए यह हमेशा सामान लेता है और उपभोग करता है fromIterable
पूरी तरह से।
या तो आपको जरूरत नहीं है blockingSubscribe
या आपको उपयोग करने पर विचार करना चाहिए blockingIterable()
प्रत्येक के लिए जो केवल तभी अनुरोध करता है जब वर्तमान धागे ने वास्तव में वस्तुओं का उपभोग किया हो। अन्यथा, आपको इसका उपयोग करना होगा blockingSubscribe(Subscriber)
अधिभार और मैन्युअल रूप से अनुरोध:
source.blockingSubscribe(new Subscriber<T>() {
Subscription upstream;
@Override public void onSubscribe(Subscription s) {
upstream = s;
s.request(1);
}
@Override public void onNext(T item) {
consumeSlowly(v);
upstream.request(1);
}
@Override public void onError(Throwable ex) {
ex.printStackTrace();
}
@Override public void onComplete() {
}
});
ध्यान दें कि आपके पास पहले से ही एक है Iterable
जिसे आप आरएक्सजेवा की भागीदारी के बिना प्रत्येक के साथ समकालिक रूप से उपभोग कर सकते हैं।