RxJava i spring-data

0

Hej,

pytanie korzystając z ReactiveMongoRepository i RxJavy odpytuje kilka api cyklicznie o dane. Następnie korzystają z zip tworze nowy obiekt który chce zapisać w bazie, mapuje wywołując weatherRepository.save(weather) i rekord nie zapisuje się. Pytanie co robię źle?

        var cities = citiesRepository.findAll();
        Observable<Weather> callOpenWeather = Observable.interval(10, TimeUnit.SECONDS)
                .observeOn(Schedulers.computation())
                .flatMap(n -> Observable.fromPublisher(cities))
                .flatMap(openWeatherApiHandler::compute);
        Observable<Weather> callDarkSky = Observable.interval(10, TimeUnit.SECONDS)
                .observeOn(Schedulers.computation())
                .flatMap(n -> Observable.fromPublisher(cities))
                .flatMap(darkSkyApiHandler::compute);
        Observable<Weather> callWeatherBit = Observable.interval(10, TimeUnit.SECONDS)
                .observeOn(Schedulers.computation())
                .flatMap(n -> Observable.fromPublisher(cities))
                .flatMap(callWeatherBitApiHandler::compute);
        Observable.zip(callOpenWeather, callDarkSky, callWeatherBit, 
                (weather1, weather2, weather3) -> new Weather(Arrays.asList(weather1, weather2, weather3)))
                .map(weather -> weatherRepository.save(weather))
                .subscribe();

przykładowa funkcja compute

    public ObservableSource<Weather> compute(City city) {
        var response = WebClient.builder().baseUrl(darkSkyBaseUrl).build().get()
                .uri("/" + darkSkyApiKey + "/" + city.getLatitude() + "," + city.getLongitude())
                .exchange()
                .doOnError(Throwable::printStackTrace)
                .flatMap(clientResponse -> clientResponse.bodyToMono(JsonNode.class))
                .map(jsonNode -> new Weather(jsonNode, city.getName(), Weather.WeatherSupplier.DARK_SKY));
        return Observable.fromPublisher(response);
    }

gdy w funkcji compute dodam mapowanie z zapisem do bazy, rekord zapisuje się poprawnie

1

Na szybko. Tam gdzie masz save to powinno być zrobione we flatMap, a nie w map. Zobacz co zwraca save.

1 użytkowników online, w tym zalogowanych: 0, gości: 1