/ / RxJava: perché l'abbonamento alle modifiche osservabili condivise emette elementi - rx-java, programmazione reattiva, rx-java2

RxJava: perché iscriversi alle modifiche osservabili condivise elementi emessi - rx-java, programmazione reattiva, rx-java2

Mi sono imbattuto in un comportamento sconcertante che non posso "tspiegare. Ho distillato l'esempio di seguito da una catena rx molto più grande, quindi non stupitevi perché mai avrei fatto questo. Voglio solo avere un'idea del perché questo sta accadendo! :)

enum class Request {
Request1,
Request2
}


fun main(args: Array<String>) {
val requestStream = PublishSubject.create<Request>()

val stateChanges = requestStream.share()

stateChanges
.delaySubscription(requestStream)
.subscribe({ println("received $it") })

// Comment this and it changes the output!
stateChanges.subscribe()

requestStream.onNext(Request.Request1)
requestStream.onNext(Request.Request2)
}

Così. se esegui il programma sopra, stamperà:

received Request1
received Request2

Ma se commentate stateChanges.subscribe(), ad un tratto Request1 si perde e stampa solo questo:

received Request2

Puoi spiegarlo? Inoltre vorrei sapere se è possibile avere la configurazione sopra per emettere entrambi gli elementi anche in assenza di extra subscribe().

risposte:

3 per risposta № 1

Nel caso normale, share è già collegato a requestStream e a causa di delaySubscription(requestStream), share ottiene il secondo Observer prima requestStream invia la richiesta1. Così requestStream ha due Observerse l'invio di Request1 al primo ne aggiunge un altro Observer al secondo consumatore, il share, quindi, il sottoscrittore finale riceve Request1.

Nel caso commentato, share non è collegato a requestStream tuttavia, quindi requestStream posso solo avvisare il delaySubscription. delaySubscription trigger share a cui quindi si abbona requestStream. Però, PublishSubject emette elementi solo nell'istantanea corrente di Observerse non riesce a vederne prima onNext aggiunto un nuovo Observer nel frattempo. Pertanto, Request1 non raggiunge il println.

Questo caso d'angolo non è gestito da PublishSubject come richiederebbe il suo onNext per ricordare quale Observer ha già ricevuto l'oggetto corrente e continua a riprovare nel caso in cui l'insieme di corrente Observer cambiato. Questo aggiunge memoria e tempo in eccesso.