/ / RxJava - Observables: одне Observable, ніж повернення іншого Observable - rx-java

RxJava - Observable: Наочний, ніж повернути інший Observable - rx-java

Я працюю в додатку, ніж перетворюю один список об'єктів в інші об'єкти, для цього я використовую Observable з Rx Java.

У мене є два методи, один - це адаптер (фасад Джерсі), який має службу asyn, і цей метод споживає з іншої служби один Observable.

Мені потрібно спожити Observable і обробити кожен елемент, коли це буде завершено. Я створив один список усіх оброблених елементів і повернув новий Observable, де E - список кожного обробленого елемента.

Для обробки кожного елемента я використовую оператор flatMap, але я не знаю, як створити новий Observer, який має один тип diferente на прикладі Список усіх оброблених оператором flatmap.

Будь-яка ідея?

Дякую

Оновлення:

Цей код обробляє кожен елемент і повертає інший Observable, але я не знаю, чи це добре зроблено.

@Override
public Observable<ArrayList> getGeoJson2() {
WKTReader2 reader2 = new WKTReader2();
WKBReader wkbReader = new WKBReader();
ArrayList featureCollection = new ArrayList();

Subject<ArrayList,ArrayList> subject = PublishSubject.create();

manzanaRepository.getManzanas().map(new Func1<Manzana, SimpleFeature>() {
@Override
public SimpleFeature call(Manzana manzana) {
try {
SimpleFeatureType TYPE = DataUtilities.createType("", "geom,name:String");
return SimpleFeatureBuilder.build( TYPE, new Object[]{ null, "name1"}, null);
}catch (Exception e){
System.out.println(e.getMessage());
return null;
}

}
}).subscribe(new Subscriber<SimpleFeature>() {
@Override
public void onCompleted() {
subject.onNext(featureCollection);
subject.onCompleted();
}

@Override
public void onError(Throwable throwable) {
subject.onError(throwable);
}

@Override
public void onNext(SimpleFeature simpleFeature) {
featureCollection.add(simpleFeature);
}
});

return subject;
}

І цей код використовує спостережуваного повернення:

@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/async/geom")
public void asyncGetGeom(@Suspended final AsyncResponse asyncResponse) {

Observable<ArrayList> features = service.getGeoJson2();

features.subscribe(new Observer<ArrayList>() {
@Override
public void onCompleted() {

System.out.println("Se completo la accion!!!");
}

@Override
public void onError(Throwable throwable) {

System.out.println(throwable.getMessage());
}

@Override
public void onNext(ArrayList features) {
asyncResponse.resume(features);
}
});
}

Метод onNext () ніколи не викликається !!!

Дякую

Відповіді:

1 для відповіді № 1

Спробуйте замінити getGeoJson2() з цим:

@Override
public Observable<List<SimpleFeature>> getGeoJson2() {
return manzanaRepository.getManzanas()
.map(new Func1<Manzana, SimpleFeature>() {
@Override
public SimpleFeature call(Manzana manzana) {
try {
SimpleFeatureType TYPE = DataUtilities.createType("", "geom,name:String");
return SimpleFeatureBuilder.build( TYPE, new Object[]{ null, "name1"}, null));
} catch (Exception e) {
System.out.println(e.getMessage());
return null;
}
}
})
.filter(new Func1<SimpleFeature, Boolean>() {
@Override
public Boolean call(SimpleFeature sf) {
return sf != null;
}
})
.toList();
}

Пояснення: toList() використовується оператор, який чекає на onCompleted з джерела спостерігається, а потім видає всі елементи, випущені джерелом, що спостерігається, у вигляді списку.


0 для відповіді № 2

проблема вашого коду полягає в тому, що ви використовуєте PublishSubject, який завершується до того, як ви почнете його використовувати

якщо ви зміните його на BehaviourSubject, він спрацює

однак, краще (і набагато коротше) реалізувати це наступним чином -

manzanaRepository.getManzanas().map(/**/).collect(/*create array*/, /*add next item*/)