#java #spring-boot #exception #events #redis-streams
#java #spring-boot #исключение #Мероприятия #redis-потоки
Вопрос:
У меня есть продюсер в проекте spring boot, который создает два события для одной и той же группы потребителей redis, а потребитель (другой проект spring boot) использует то же самое. Потребитель redis может использовать только один тип события. Я вижу приведенное ниже исключение при использовании события 2-го типа :-
Исключение, замеченное в потребительском приложении spring :-
2021-01-06 19:14:22,580 ERROR [SimpleAsyncTaskExecutor-1] DefaultStreamMessageListenerContainer$LoggingErrorHandler: Unexpected error occurred in scheduled task.
java.lang.IllegalArgumentException: Value must not be null!
at org.springframework.util.Assert.notNull(Assert.java:198)
at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:132)
at org.springframework.data.redis.core.StreamObjectMapper.map(StreamObjectMapper.java:158)
at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:377)
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$2(DefaultStreamMessageListenerContainer.java:232)
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:138)
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:123)
at java.lang.Thread.run(Thread.java:748)
Задача потока Redis :-
@Component
public class RedisStreamTask<T> {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Async
public void publish(final StreamingEvent<T> event) {
final ObjectRecord<String, T> record = StreamRecords.newRecord()
.ofObject(event.getMessage())
.withStreamKey(event.getKey());
final RecordId recordId = redisTemplate
.opsForStream()
.add(record);
}
}
Производитель в 1-м приложении spring :-
final StreamingEvent<MyBean> streamingEvent = new StreamingEvent<>();
streamingEvent.setKey("CONSUMER_GROUP_KEY");
streamingEvent.setMessage(myBean);
redisStreamTask.publish(streamingEvent);
Потребитель во 2-м приложении spring :-
imports...
@Component
public class Consumer implements StreamListener<String, ObjectRecord<String, MyBean>> {
private AtomicInteger atomicInteger = new AtomicInteger(0);
private final Logger log = LoggerFactory.getLogger(Consumer.class);
@Autowired
private ReactiveRedisTemplate<String, String> redisTemplate;
@Autowired
public RedisConfig config;
@Autowired
private MyService myService;
@Override
public void onMessage(ObjectRecord<String, MyBean> record) {
record.getValue();
tempCreateFile.log(record.getValue());
try {
myService.saveEvent(record.getValue());
} catch (Exception e) {
log.error("Error while storing redis events", e);
}
atomicInteger.incrementAndGet();
}
@Scheduled(fixedRate = 10000)
public void showPublishedEventsSoFar() {
log.info("Total Consumed :: " atomicInteger.get());
}
@PostConstruct
public void afterPropertiesSet() throws Exception {
try {
log.info("Create Consumer group: {}", config.getConsumerGroupName());
redisTemplate.opsForStream().createGroup(config.getStreamName(), config.getConsumerGroupName()).block();
} catch (RedisSystemException e) {
if (e.getRootCause().getClass().equals(RedisBusyException.class)) {
} else if (e.getRootCause().getClass().equals(RedisCommandExecutionException.class)) {
log.info("STREAM - Stream does not yet exist, creating empty stream: " config.getStreamName());
redisTemplate.opsForStream().add( config.getStreamName(), Collections.singletonMap("", "")).block();
redisTemplate.opsForStream().createGroup(config.getStreamName(), config.getConsumerGroupName()).block();
} else {
e.printStackTrace();
throw e;
}
}
}
}
Пожалуйста, дайте мне знать, если требуется какая-либо дополнительная информация. Буду признателен за любую помощь в решении этой проблемы!
Ответ №1:
У меня была эта ошибка, и оказалось, что это проблема десериализации, связанная с тем, что объект записи потока _class
не соответствует точно между производителем и потребителем (поскольку он использует полное имя класса). Я смог это исправить, добавив @TypeAlias
объект на стороне производителя, чтобы он соответствовал пути к тому же объекту на стороне потребителя:
@TypeAlias("com.example.consumer.model.MyStreamRecord")
public class MyStreamRecord {
...
Мне также пришлось использовать a RedisTemplate<String,String>
вместо <String,MyStreamRecord>
как на стороне производителя, так и на стороне потребителя из-за того, как Redis обрабатывает сопоставления объекта с хэшем по умолчанию (подробнее здесь и здесь ).