Я працюю в додатку, ніж перетворюю один список об'єктів в інші об'єкти, для цього я використовую Observable з Rx Java.
У мене є два методи, один - це адаптер (фасад Джерсі), який має службу asyn, і цей метод споживає з іншої служби один Observable.
Мені потрібно спожити Observable і обробити кожен елемент, коли це буде завершено. Я створив один список усіх оброблених елементів і повернув новий Observable, де E - список кожного обробленого елемента.
Для обробки кожного елемента я використовую оператор flatMap, але я не знаю, як створити новий Observer, який має один тип diferente на прикладі Список усіх оброблених оператором flatmap.
Будь-яка ідея?
Дякую
Оновлення:
Цей код обробляє кожен елемент і повертає інший Observable, але я не знаю, чи це добре зроблено.
@Override
public Observable<ArrayList> getGeoJson2() {
WKTReader2 reader2 = new WKTReader2();
WKBReader wkbReader = new WKBReader();
ArrayList featureCollection = new ArrayList();
Subject<ArrayList,ArrayList> subject = PublishSubject.create();
manzanaRepository.getManzanas().map(new Func1<Manzana, SimpleFeature>() {
@Override
public SimpleFeature call(Manzana manzana) {
try {
SimpleFeatureType TYPE = DataUtilities.createType("", "geom,name:String");
return SimpleFeatureBuilder.build( TYPE, new Object[]{ null, "name1"}, null);
}catch (Exception e){
System.out.println(e.getMessage());
return null;
}
}
}).subscribe(new Subscriber<SimpleFeature>() {
@Override
public void onCompleted() {
subject.onNext(featureCollection);
subject.onCompleted();
}
@Override
public void onError(Throwable throwable) {
subject.onError(throwable);
}
@Override
public void onNext(SimpleFeature simpleFeature) {
featureCollection.add(simpleFeature);
}
});
return subject;
}
І цей код використовує спостережуваного повернення:
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/async/geom")
public void asyncGetGeom(@Suspended final AsyncResponse asyncResponse) {
Observable<ArrayList> features = service.getGeoJson2();
features.subscribe(new Observer<ArrayList>() {
@Override
public void onCompleted() {
System.out.println("Se completo la accion!!!");
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onNext(ArrayList features) {
asyncResponse.resume(features);
}
});
}
Метод onNext () ніколи не викликається !!!
Дякую
Відповіді:
1 для відповіді № 1Спробуйте замінити getGeoJson2()
з цим:
@Override
public Observable<List<SimpleFeature>> getGeoJson2() {
return manzanaRepository.getManzanas()
.map(new Func1<Manzana, SimpleFeature>() {
@Override
public SimpleFeature call(Manzana manzana) {
try {
SimpleFeatureType TYPE = DataUtilities.createType("", "geom,name:String");
return SimpleFeatureBuilder.build( TYPE, new Object[]{ null, "name1"}, null));
} catch (Exception e) {
System.out.println(e.getMessage());
return null;
}
}
})
.filter(new Func1<SimpleFeature, Boolean>() {
@Override
public Boolean call(SimpleFeature sf) {
return sf != null;
}
})
.toList();
}
Пояснення: toList()
використовується оператор, який чекає на onCompleted
з джерела спостерігається, а потім видає всі елементи, випущені джерелом, що спостерігається, у вигляді списку.
0 для відповіді № 2
проблема вашого коду полягає в тому, що ви використовуєте PublishSubject, який завершується до того, як ви почнете його використовувати
якщо ви зміните його на BehaviourSubject, він спрацює
однак, краще (і набагато коротше) реалізувати це наступним чином -
manzanaRepository.getManzanas().map(/**/).collect(/*create array*/, /*add next item*/)