/ / Jak zebrać obserwowalny <ResultSet> do mapy - rx-java, vert.x

Jak zebrać obserwowalne <ResultSet> do mapy - rx-java, vert.x

Próbuję zebrać Observable<ResultSet> z Vertex AsyncSqlClient do HashMap.

Map<String, Integer> map = Maps.newHashMap();
asyncSQLClient
.getConnectionObservable()
.flatMap(sqlConnection -> sqlConnection.queryObservable("select a, b from table")
.doOnCompleted(sqlConnection::close)
.doOnError(throwable -> log.error("Error while querying.", throwable)))
.flatMap(resultSet -> Observable.from(resultSet.getRows()))
.toBlocking()
.forEach(row -> map.put(row.getString("a"), row.getInteger("b")));

ale to wydaje się blokować na zawsze.

Po przeszukaniu chwili bez rezultatu, czy mógłbyś mi pomóc?

Odpowiedzi:

1 dla odpowiedzi № 1

jak skomentował @Phoenix Wang, forEach() metoda będzie blokować do momentu ukończenia Obserowalnego. co oznacza, że ​​będzie blokować na zawsze, jeśli masz Observable które emitują nieskończone przedmioty lub masz Obserwowalny, który niepoprawnie nie emituje onCompleted zasygnalizować, że Observable jest zakończone.

To może być spowodowane przez ciebie getConnectionObservable() implementacja metody, jeśli tworzy niestandardową Observable, za pomocą Observable.create() na przykład musisz zadzwonić onCompleted() po emisji wszystkich elementów.

W każdym razie powinieneś być tego świadomy toBlocking() blokuje i czeka, co może nie być odpowiednie w przypadku kodu produkcyjnego (ponieważ łamie on cel reaktywności), możesz osiągnąć ten sam cel, używając reduce():

 asyncSQLClient
.getConnectionObservable()
.flatMap(sqlConnection -> sqlConnection.queryObservable("select a, b from table")
.doOnCompleted(sqlConnection::close)
.doOnError(throwable -> log.error("Error while querying.", throwable)))
.flatMap(resultSet -> Observable.from(resultSet.getRows()))
.reduce(Maps.newHashMap(), (map, o) -> map.put(row.getString("a"), row.getInteger("b")))
.subscribe(map -> {
//do something with map
}
);

Uwaga, nadal musisz rozwiązać onCompleted problem, ponieważ redukcja będzie również oczekiwać Observable zakończenie i wyemituje element sygnału po źródle Observable kończy.
Dodatkową opcją jest użycie scan() (wystarczy zastąpić redukcję skanem), dzięki skanowi otrzymasz emisję dla każdego elementu emitowanego ze źródłem Observable, co oznacza, że ​​z czasem gromadzisz przedmioty do mapowania.