/ / RxJava: Warum abonnierte gemeinsame beobachtbare Änderungen emittierte Elemente - rx-java, reaktive Programmierung, rx-java2

RxJava: Warum abonnierte gemeinsame beobachtbare Änderungen emittierte Objekte - rx-Java, reaktive Programmierung, rx-java2

Ich bin auf ein rätselhaftes Verhalten gestoßen, das ich nicht kannerklären. Ich habe das untenstehende Beispiel aus einer viel größeren Rx-Kette destilliert, also sei nicht überrascht, warum in aller Welt ich das tun würde. Ich möchte nur einen Einblick bekommen, warum dies geschieht! :)

enum class Request {
Request1,
Request2
}


fun main(args: Array<String>) {
val requestStream = PublishSubject.create<Request>()

val stateChanges = requestStream.share()

stateChanges
.delaySubscription(requestStream)
.subscribe({ println("received $it") })

// Comment this and it changes the output!
stateChanges.subscribe()

requestStream.onNext(Request.Request1)
requestStream.onNext(Request.Request2)
}

Damit. Wenn Sie das obige Programm ausführen, wird Folgendes gedruckt:

received Request1
received Request2

Aber wenn Sie kommentieren stateChanges.subscribe(), plötzlich Request1 geht verloren und es druckt nur das:

received Request2

Kannst du es erklären? Auch würde ich gerne wissen, ob es möglich ist, die obige Einstellung zu haben, um beide Elemente auch in Abwesenheit von extra zu emittieren subscribe().

Antworten:

3 für die Antwort № 1

Im Normalfall share ist bereits verbunden requestStream und wegen delaySubscription(requestStream), share bekommt es "Sekunde" Observer Vor requestStream sendet es die Anfrage1. Damit requestStream hat zwei Observers und Senden von Request1 an den ersten fügt einen weiteren hinzu Observer für den zweiten Verbraucher, der shareDaher erhält der Endteilnehmer dort Request1.

Im auskommentierten Fall share hat nicht mit verbunden requestStream dennoch, deshalb requestStream kann nur die delaySubscription. delaySubscription löst aus share was dann abonniert wird requestStream. Jedoch, PublishSubject sendet Objekte nur an den aktuellen Snapshot von Observers und kann sein erstes nicht sehen onNext ein neues hinzugefügt Observer in der Zwischenzeit. Request1 erreicht also nicht die println.

Dieser Eckfall wird nicht von PublishSubject wie es seine erfordern würde onNext sich daran erinnern Observer hat den aktuellen Artikel bereits erhalten und versucht, den aktuellen Fall erneut zu versuchen Observer geändert. Dies fügt Speicher- und Zeitaufwand hinzu.