#java #spring #server-sent-events
Вопрос:
У меня есть этот метод, который должен извлекать данные из репозитория с помощью потоков, а затем обрабатывать эти результаты и отправлять отправленные сервером сообщения о событиях. Я смог заставить это работать с помощью простого «отправлено, подождите ()», а затем повторите, поэтому я не думаю, что это какая-либо проблема с тем, как я создаю службу исполнителей. У меня есть небольшое подозрение, что это может быть как-то связано с тем, что исполнитель работает асинхронно, но я не уверен.
@Transactional()
public UUID createNewDataStream(Requests information) {
sseEmitter emitter = new SseEmitter(86400000L);
ExecutorService sseMvcExecutor = Executors.newCachedThreadPool();
UUID id = UUID.randomUUID();
sseStreams.put(id, emitter);
List<Datastream> datastreams = sensorService
.getDatastreams(information.getSensors().get(0), information.getStart(),
information.getEnd()).collect(Collectors.toList());
sseMvcExecutor.execute(() -> {
try {
while (true) {
for (Datastream d : datastreams) {
try (Stream<Observation> observations = observationRepository
.findObservationsByDatastreamIdAndPhenomenonStartAfterAndPhenomenonEndBefore(
d.getId(), information.getStart(),
information.getEnd())) {
...dostuff...
}
}
}
}
}
}
который вызывает это исключение:
org.springframework.dao.InvalidDataAccessApiUsageException: You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction.
at org.springframework.data.jpa.repository.query.JpaQueryExecution$StreamExecution.doExecute(JpaQueryExecution.java:341)
at org.springframework.data.jpa.repository.query.JpaQueryExecution.execute(JpaQueryExecution.java:88)
at org.springframework.data.jpa.repository.query.AbstractJpaQuery.doExecute(AbstractJpaQuery.java:155)
at org.springframework.data.jpa.repository.query.AbstractJpaQuery.execute(AbstractJpaQuery.java:143)
at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:137)
at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:121)
at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:159)
at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:138)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.data.repository.core.support.SurroundingTransactionDetectorMethodInterceptor.invoke(SurroundingTransactionDetectorMethodInterceptor.java:61)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:145)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy122.findObservationsByDatastreamIdAndPhenomenonStartAfterAndPhenomenonEndBefore(Unknown Source)
at edu.teco.sensordatenbankmanagementsystem.services.ObservationServiceImp.lambda$createNewDataStream$1(ObservationServiceImp.java:79)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
На этой линии
.findObservationsByDatastreamIdAndPhenomenonStartAfterAndPhenomenonEndBefore
(ObservationServiceImp.java:79)
Я уже пытался сделать класс транзакционным, что приводит к той же ошибке. Метод sensorservice.getDatastreams также является транзакционным, и мой репозиторий выглядит следующим образом
@Repository
public interface ObservationRepository extends JpaRepository<Observation, String> {
Optional<Observation> findById(String id);
Stream<Observation> findObservationsByDatastreamIdAndPhenomenonStartAfterAndPhenomenonEndBefore(
String dataStream, @Param("PhenomenonStart") LocalDateTime phenomenonStart,
@Param("PhenomenonEnd") LocalDateTime phenomenonEnd);
.
.
.
}
Репозиторий должен возвращать потоки, потому что объем возвращаемых им данных превышает стандартный размер кучи за 1 вызов, и мы подозреваем, что нам нужно обрабатывать до 20 вызовов одновременно с различным размером.
Комментарии:
1. Использование вашего потока бесполезно, так как вы напрямую включаете его в список, он ничего не добавляет. Вместо того, чтобы собирать в виде списка, вы должны либо перебирать его (либо отказаться от потоков и просто напрямую запрашивать список). У вас также есть бесконечный цикл, поэтому он будет бесконечно держать транзакцию (и, следовательно, соединение) открытой. Вы также не должны каждый раз создавать нового исполнителя, так как в конечном итоге у вас закончится память/потоки.
2. @ M. Deinum Я думаю, вы имеете в виду следующее: «Список<потоков данных> потоков данных = SensorService .getDatastreams(информация.getSensors().get(0), информация.getStart(), информация.getEnd()).сбор(Сборщики. ToList());» Который да, он непосредственно потребляется, но это также не тот, который меня интересует. Мне придется создать новый execturo, если предполагается, что все они будут работать асинхронно друг от друга, не так ли? Правка: Этот первый поток в настоящее время является держателем места, так как я больше беспокоился о том, чтобы попытаться решить возникшую проблему
3. @M. Deinum Я попытался изменить этот поток на список, и это приводит к той же проблеме, что и раньше, так что, похоже, это ничего не решает, к сожалению.
4. Ошибка, которую вы получаете, связана с новым потоком, транзакция привязана к потоку. Когда вы начинаете новую, у нее нет транзакции, и, таким образом, вы получаете эту ошибку. Однако imho ваш код имеет недостатки и остановит ваше приложение после того, как ваш пул подключений будет исчерпан (по умолчанию это 10 подключений).
5. Как бы я мог иметь несколько разных излучателей, не подключая их к разным подключениям?