/ / Metoda OnSubscribe.call wywoływana dwukrotnie - rx-java, obserwowalna

Metoda OnSubscribe.call wywoływana dwukrotnie - rx-java, obserwowalna

Tło: Szukam wywoływania usługi sieci web wewnątrzzadzwoń do metody OnSubscribe. Istnieje niestandardowa klasa subskrybentów, która jest również subskrybentem tego obserwowalnego. (Poniższy kod jest zbliżeniem tego samego)

Kwestia: Widząc, że metoda OnSubscribe.call jest wywoływana dwa razy.

Relacja między "Observable.create (...) ", subskrybuj (...) i ostatecznie konwertowanie obserwowalnego do Bblokowania nie jest dla mnie jasne.Zobacz poniżej, linia dodaje zachowanie i jakoś ponownie wywołuje OnSubscribe.call.Trzeba pamiętać, że wywołanie toBlocking ostatecznie musi zostać wywołane - tuż przed wynik powinien zostać zwrócony przez serwer WWW (serwlet / kontroler, który nie ma natury Rx).

observable.toBlocking().forEach(i-> System.out.println(i));

Kompletny kod poniżej

 public static void main(String args[]){
Observable<Integer> observable =  Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});

observable.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
observable.toBlocking().forEach(i-> System.out.println(i));
}

Wydajność

In OnSubscribe Create: false
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
In OnSubscribe Create: false
For Each Printing 1
For Each Printing 2
For Each Printing 3
For Each Printing 4

Odpowiadając na własne pytanie: Sposób, w jaki przetwarzałem wyniki przez forEach, był nieprawidłowy. To samo w sobie jest subskrybentem, a tym samym ponownie wywołuje metodę wywołania. Prawidłowy sposób robienia tego, co robiłem, był przez CountDownLatch.

To również wyjaśnia moje inne wątpliwości - wKontekst punktu, w którym ostatecznie czekamy na zakończenie pracy tuż przed zwróceniem uwagi na kontroler / serwlet, oznaczałby użycie zatrzasku z czasem oczekiwania.

Kod poniżej.

public static void main(String args[]) throws Exception {
CountDownLatch latch = new CountDownLatch(4);
Observable<Integer> observable =  Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
latch.countDown();

}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});

observable.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});

latch.await();
System.out.println(" Wait for Latch Over");
}

Odpowiedzi:

0 dla odpowiedzi № 1

toBlocking można zrzucić do subskrypcji za pomocą zatrzasku, a więc do dwóch subskrypcji.

Ale myślę, że masz rację w użyciu toBlocking().forEach()i powinien zachować tę część jako rozwiązanie.

Problemem jest więc twoja druga subskrypcja, wykonana przez połączenie subscribe. Czy jest to naprawdę potrzebne? subscribe? Jeśli chodzi tylko o rejestrowanie i nie modyfikowanie przepływu danych, możesz użyć doOnNext, doOnError i doOnCompleted zamiast.

Jeśli Twój przypadek użycia jest nieco bardziej złożony, niż jest to odzwierciedlone w pytaniu, a faktycznie musisz poczekać na dwa równoległe asynchroniczne przetwarzanie, być może musisz dołączyć i poczekać, używając CountDownLatch (lub podobny) ... Należy pamiętać, że jeśli chcesz równoległości, RxJava jest agnostykiem w tym zakresie i musisz je prowadzić za pomocą Schedulers, subscribeOn/observeOn.


2 dla odpowiedzi nr 2

To może nie być wszystko istotne od Ciebiejuż rozwiązałeś problem w twoim przypadku, ale tworzenie obserwowalnych przy użyciu surowego OnSubscribe jest generalnie podatne na błędy. Lepszym rozwiązaniem jest użycie jednej z istniejących klas pomocniczych: SyncOnSubscribe/AsyncOnSubscribe (od RxJava 1.0.15) lub AbstractOnSubscribe (do RxJava v1.1.0). Dają gotową strukturę, która zarządza obserwowalnym stanem i przeciwciśnieniem, pozostawiając cię do czynienia (głównie) z własną logiką.