Потоковая передача большого набора данных в формате CSV при загрузке Spring

#spring-boot #csv #java-stream #rest

#spring-boot #csv #java-stream #остальное

Вопрос:

У меня есть приложение Spring Boot (версия 2.3.3), и мне нужно обслуживать большие наборы данных из базы данных (Postgres) через REST API в виде CSV-файлов. До сих пор обрабатывались только небольшие подмножества данных, которые можно накапливать в виде коллекций и передавать сразу. Теперь я хочу подготовить сервер к работе с большими объемами данных.

Данные извлекаются в виде прогнозов (а не сущностей как есть) с помощью запросов в репозитории CRUD.

 public interface IMeasurementsRepo extends CrudRepository<Measurement, Long> {
    @Query("SELECT .... ")
    public Iterable<IMeasurementProjection> getMeasurementHistory(...);
}
 

И проекция определяется следующим образом:

 public interface IMeasurementProjection {
    public Timestamp getTimestamp();
    public Long getSensorId();
    public Group getGroup();
    public Integer getNum();
    public Signal getSignal();
    public Double getStrength();
}
 

После извлечения каждая проекция должна быть преобразована в строку CSV-файла и отправлена клиенту. Преобразование (форматирование) не является проблемой. Мой вопрос в том, как передать запрошенные данные в виде потока.

Скорее всего, мне нужно ResponseEntity<StreamingResponseBody> или Flux , но я не знаю, как применить их в моем случае.

Любой намек или ссылка приветствуются.

Ответ №1:

Если вы используете spring-webflux, то вместе с этим r2dbc может достичь того, чего вы хотите, ниже приведены зависимости, конфигурация и рабочий образец

 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
 

весенний веб-поток

 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
 

application.properties

 spring.r2dbc.url=r2dbc:postgresql://172.17.0.2:5432/streaming-db-server
spring.r2dbc.username=postgres
spring.r2dbc.password=ungabunga
 

Вот пример кода:

 package com.example.postgres.batchinsertandstreaming.demo;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.relational.core.mapping.Table;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Repository;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import javax.persistence.Id;


@SpringBootApplication
public class PostgresBatchStreamingDataApplication {

    public static void main(String[] args) {
        SpringApplication.run(PostgresBatchStreamingDataApplication.class, args);
    }

}

@RequestMapping("streaming")
@RestController
class StreamingSalesRecord {

    private final StreamingSaleRecordService service;

    StreamingSalesRecord(StreamingSaleRecordService service) {
        this.service = service;
    }

    @GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<String> get() {
        return service.all();
    }
}

@Service
class StreamingSaleRecordService {

    private final StreamingSalesRecordRepository repository;

    StreamingSaleRecordService(StreamingSalesRecordRepository repository) {
        this.repository = repository;
    }

    public Flux<String> all() {
        return repository.findAll()
                .map(object -> String.join(",", new String[]{object.getId().toString(), object.getCountry(), object.getRegion()}))
                .map(object -> object   "n");
    }
}

@Repository
interface StreamingSalesRecordRepository extends ReactiveCrudRepository<SalesRecordReactive, Long> {
}

@Getter
@Setter
@Table(value = "sales_record")
class SalesRecordReactive {

    @Id
    private Long id;

    private String region;

    private String country;
}
 

Комментарии:

1. Файлы CSV обычно имеют столбец заголовка.

2. почему потоковые данные будут иметь заголовок, он говорит либо о потоке строк со значениями csv, либо о ndjson

3. Я интерпретировал цель как обслуживание больших файлов CSV через API, а файлы CSV обычно имеют столбцы заголовка. Потоковая передача была просто способом избежать исключений из памяти. Если файл CSV без заголовков приемлем для потребителя, очевидно, что ваше решение работает.