/ / RxJava async subscribe will lost message - rx-java2

RxJava async subscribe perdra un message - rx-java2

J'étais en train d'apprendre comment fonctionne l'abonnement asynchrone RxJava. Mais un problème m'a confondu.

@Test public void testCreateAsync() throws InterruptedException {
Observable<String> observable = Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
if (!emitter.isDisposed()) {
int finalI = i;
new Thread(() -> emitter.onNext("value_" + finalI)).start();
}
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
});

observable.subscribe(System.out::println);
}

Le code ci-dessus fonctionne correctement et imprimez value_1 à value_9. Mais quand j'ajoute un sommeil avant de m'abonner, le dernier message value_9 ne sera pas imprimé, comme ceci:

@Test public void testCreateAsync() throws InterruptedException {
...
Thread.sleep(3000);
observable.subscribe(System.out::println);
}

Toute discussion sur cette question est appréciée.

ps: la version java est 1.8, et la version RxJava est 2.1.1.

Réponses:

0 pour la réponse № 1

Il y a deux problèmes:

  1. appel onNext à partir de différents threads simultanément
  2. onComplete peut s'exécuter avant que les threads précédents puissent émettre leur valeur, ce qui empêche onNext émissions.

Le sommeil n'a pas d'effet particulier, l'exécution répétée du code ci-dessus entraînera parfois le comportement.

(Je ne sais pas ce que vous vouliez réaliser avec cette configuration).