Kafka и Spring Batch — Как читать ТОЛЬКО незафиксированные сообщения из той же темы?

#java #spring #apache-kafka #spring-batch #spring-kafka

#java #spring #apache-kafka #spring-batch #spring-kafka

Вопрос:

Я работаю над небольшим пакетом с Spring batch и Kafka, который считывает данные json из темы Kafka, преобразует их в объект Student, изменяет значение и отправляет его обратно в тему Kafka. Все работает нормально, но моя единственная проблема в том, что мой потребитель ВСЕГДА читает с самого начала темы. Мне нужно, чтобы это считывалось из последнего неиспользованного сообщения. Я уже добавил эти свойства :

 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false
ConsumerConfig.GROUP_ID_CONFIG to a random value
 

Но, похоже, это не работает, при запуске потребителя он обрабатывает все сообщения. У кого-нибудь есть идея, как это сделать с помощью Spring Batch и Kafka, пожалуйста? Это мой код :

BatchStudent.java :

 @SpringBootApplication
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchStudent {
    public static void main(String[] args) {
        SpringApplication.run(BatchStudent.class, args);
    }

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final KafkaTemplate<Integer, Student> template;
    private final KafkaProperties properties;

    @Value("${kafka.topic.consumer}")
    private String topic;

    @Bean
    public ItemProcessor<Student, Student> customItemProcessor() {
        return new CustomProcessor();
    }

    @Bean
    Job job() {
        return this.jobBuilderFactory.get("job")
                .start(start())
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    KafkaItemWriter<Integer, Student> writer() {
        return new KafkaItemWriterBuilder<Integer, Student>()
                .kafkaTemplate(template)
                .itemKeyMapper(Student::getId)
                .build();
    }

    @Bean
    public KafkaItemReader<Integer, Student> reader() {
        Properties props = new Properties();
        props.putAll(this.properties.buildConsumerProperties());

        return new KafkaItemReaderBuilder<Integer, Student>()
                .partitions(0)
                .consumerProperties(props)
                .name("students-consumer-reader")
                .saveState(true)
                .topic(topic)
                .build();
    }

    @Bean
    Step start() {
        return this.stepBuilderFactory
                .get("step")
                .<Student, Student>chunk(10)
                .writer(writer())
                .processor(customItemProcessor())
                .reader(reader())
                .build();
    }
}

 

app.yml

 spring.batch.initialize-schema: always

#Conf Kafka Consumer
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
#spring.kafka.consumer.group-id: student-group
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
spring.kafka.consumer.properties.spring.json.value.default.type: com.org.model.Student

#Conf Kafka Producer
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.bootstrap-servers: localhost:9092

#Conf topics
spring.kafka.template.default-topic: producer.student
kafka.topic.consumer: consumer.student
 

Student.java

 @Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
    Integer id;
    Integer count;
}
 

CustomProcessor.java

 @NoArgsConstructor
public class CustomProcessor implements ItemProcessor<Student, Student> {

    @Override
    public Student process(Student studentRecieved) {
        final Student studentSent = new Student();
        studentSent.setId(studentRecieved.getId());
        studentSent.setCount(200);
        return studentSent;
    }
}
 

Спасибо за вашу помощь!

Комментарии:

1. что произойдет, когда вы установите автоматическое смещение-сброс: последняя версия?

2. То же поведение, и я думаю, что его значение по умолчанию — latest

3. Если вы устанавливаете groupId на случайное значение (предположительно, при каждом запуске потребителя), то как он должен получать смещения от предыдущего запуска? Они будут сохранены под совершенно другим идентификатором группы.

Ответ №1:

Все работает нормально, но моя единственная проблема в том, что мой потребитель ВСЕГДА читает с самого начала темы. Мне нужно, чтобы это считывалось из последнего неиспользованного сообщения.

Spring Batch 4.3 представил способ использования записей из смещения, хранящихся в Kafka. Я говорил об этой функции в своем выступлении на Spring One в прошлом году: что нового в Spring Batch 4.3 ?. Вы можете настроить программу чтения kafka с пользовательским начальным смещением в каждом разделе, используя setPartitionOffsets:

 Setter for partition offsets. This mapping tells the reader the offset to start reading
from in each partition. This is optional, defaults to starting from offset 0 in each
partition. Passing an empty map makes the reader start from the offset stored in Kafka
for the consumer group ID.
 

Вы можете найти полный пример в этом тестовом примере.

Комментарии:

1. Привет @Mahmoud Ben Hassine. Я думаю, что есть еще одна проблема. ParitionsOffset всегда инициализируется последним прочитанным смещением. После повторного запуска ЗАДАНИЯ.. Считыватель начинает чтение с того же смещения, а не с нуля. Я знаю, что могу установить их с помощью настройки, но у меня нет этой информации в текущем контексте нового ЗАДАНИЯ. Мне нужно только вызвать KafkaConsumer и получить смещение, а затем увеличить его на 1.

2. Смещение сохраняется в контексте выполнения spring batch и в kafka. Вы сами выбираете, откуда его читать при перезапуске неудачного экземпляра задания. По умолчанию перезапуск заставит задание считывать данные из последнего смещения, сохраненного в контексте выполнения spring batch. Если вы хотите перезапустить со смещения, сохраненного в Kafka, вы можете передать пустую карту partitionOffset .

3. Я понял вашу точку зрения, и для перезагрузки я тоже могу это сделать. Проблема связана с новым экземпляром задания. В принципе, он не проходит эту проверку в методе Open в KafkaItemReader, если (this.saveState amp;amp; ExecutionContext.containsKey(TOPIC_PARTITION_OFFSETS)) . Он начинает чтение с последнего смещения в смещении разделов карты. Единственный вариант, который я должен получить для этого смещения, — это использовать KafkaOffset или я должен установить в каком-то глобальном контексте.

4. Также он не будет запускаться с последнего смещения, сохраненного в контексте выполнения автоматически. Я следил за вашими тестовыми примерами, нам нужно будет установить PartitionOffset с разделами разделов после получения из контекста выполнения. В принципе, KafkaItemReader необходимо повторно инициализировать.