/ / रोकें और RxJava 2.X में एक बूलियन गेट पर आधारित एक अवलोकन योग्य है? - एंड्रॉयड, आरएक्स-जावा, रिएक्टिव-प्रोग्रामिंग, आरएक्स-जावा 2

RxJava 2.X में एक बूलियन गेट के आधार पर एक अवलोकन योग्य को रोकें और फिर से शुरू करें? - एंड्रॉइड, आरएक्स-जावा, प्रतिक्रियाशील प्रोग्रामिंग, आरएक्स-जावा 2

मान लीजिए कि मेरे पास एक प्रोसेसर है जो कभी भी एक बटन दबाए जाने पर बूलियन मान का उत्सर्जन करता है, इसे टॉगल के रूप में सोचें।

    boolean gateValue = true;
PublishProcessor<Boolean> gate = PublishProcessor.create();
view.onButtonClicked()
.subscribe(new Action1<Void>() {
@Override
public void call(final Void aVoid) {
gate.onNext(gateValue = !gateValue);
}
}));

मैं जो करना चाहूंगा वह गेट के मूल्य को विराम देने और एक दृश्यमान अनुक्रम को फिर से शुरू करने के लिए उपयोग करता है, जो उत्सर्जित मूल्यों को रोक देता है।

मैं इसे बहुत पढ़ता हूं और जबकि अन्य भाषाओं के लिए प्रतिक्रियाशील एक्सटेंशन में यह संभव लगता है, RxJava doesn "t इसे समर्थन करने के लिए लगता है।

यहाँ एक उदाहरण है कि मुझे क्या हासिल करना हैबस हर सेकंड में एक वृद्धिशील मूल्य का उत्पादन होता है। जब मैं बटन दबाता हूं तो मैं चाहता हूं कि जब तक मैं इसे फिर से दबाऊं तब तक उत्पादन बंद हो जाए जो दो बटन प्रेस के बीच उत्सर्जित हर वस्तु का उत्पादन करना चाहिए:

Flowable.interval(1, TimeUnit.SECONDS)
.bufferWhile(gate)
.flatMapIterable(longs -> longs)
.subscribe(new Consumer<Long>() {
@Override
public void accept(final Long aLong) throws Exception {
view.displayTime(aLong);
}
});

क्या किसी को इस तरह से कुछ हासिल करने का तरीका पता है?

संपादित करें मैंने इसे प्राप्त करने के तरीके पर एक ब्लॉग पोस्ट लिखा है https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk

उत्तर:

उत्तर № 1 के लिए 4

अब एक ऑपरेटर है valve() RxJava2Extensions लाइब्रेरी जो अनुरोधित व्यवहार करता है।


उत्तर № 2 के लिए 1

आप डिफ़ॉल्ट RxJava ऑपरेटरों का उपयोग करके इसे प्राप्त कर सकते हैं:

    final Random random = new Random();
final Observable<Boolean> gates = Observable.interval(10, TimeUnit.SECONDS)
.map(it -> random.nextBoolean())
.startWith(false)
.doOnNext(it -> System.out.println("Gate " + (it ? "opened" : "closed")))
.take(100);
final Observable<Long> data = Observable.interval(3, TimeUnit.SECONDS)
.doOnNext(it -> System.out.println("New value " + it))
.take(100);

gates.publish(innerGates -> data.publish(innerData -> Observable.merge(
innerData.window(
innerGates.distinctUntilChanged().filter(it -> it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> !it))
).flatMap(it -> it),
innerData.buffer(
innerGates.distinctUntilChanged().filter(it -> !it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> it))
)
.flatMap(Observable::from)
)))
.subscribe(it -> System.out.println("On next " + it));

जवाब के लिए 0 № 3

बस रेडीमेड का उपयोग करें Observable.window ऑपरेटर जो एक लेता है Observable<T> पैरामीटर।