Приложение-потребитель, вызывающее дублирование при чтении записей из нескольких разделов

#apache-kafka #spring-kafka

#apache-kafka #spring-kafka

Вопрос:

Я пытаюсь использовать запись из потока, используя spring-kafka. Записи находятся в схеме avro. Поскольку в этой теме есть 2 раздела, я использую параллелизм kafka как 2 для параллельного использования записей из разделов. Но, похоже, это вызывает некоторую проблему.

Я регистрирую полученную запись из раздела перед обработкой, чтобы убедиться, что мы не получаем дубликат (тот же ключ в другом разделе).

Конфигурация :

     @Bean
    public ConsumerFactory<String, GenericRecord> consumerFactory(){
        
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, MAX_POLL_INTERVAL);
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
        
        return new DefaultKafkaConsumerFactory<>(props);    
    }
    
    @Bean
    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> 
    kafkaListenerContainerFactory() {
 
      ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
      factory.setConcurrency(KAFKA_CONCURRENCY);
      factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // manual async committ
      //factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO)
      return factory;
  }
  

Код :

 @KafkaListener(topics = "${app.topic}", groupId = "${app.group_id_config}")
    public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgement) throws Exception {

        try {
            //System.out.println(record);
            if (record.value().get("enrollmentType").toString().matches("ACH|VCP|CHK")) 
            {
                prov_tin_number         = record.value().get("providerTinNumber").toString();
                //prov_tin_type             = record.value().get("providerTINType").toString();
                enroll_type             = record.value().get("enrollmentType").toString();
                vcp_prov_choice_ind     = record.value().get("vcpProvChoiceInd").toString();
                error_flag              = "";
        
        
            } 
            
            
                System.out.println("coming from stream :"   prov_tin_number   " into offset "   record.offset()   " and partition "   record.partition());
                
        
                
            
            acknowledgement.acknowledge();
                
        }catch (Exception ex) {
            System.out.println(record);
            System.out.println(ex.getMessage());
        }

    }
  

Вывод из кода :

Пример :

 coming from stream :018601027 into offset 500428 and partition 0
coming from stream :018601027 into offset 499553 and partition 1
  

Из приведенного выше вывода видно, что одна и та же запись попадает в другое смещение и раздел и вызывает дублирование на стороне потребителя. Но это не тот случай, когда я пытаюсь прочитать запись с помощью командной строки, я получаю вывод ниже :

 root@fast-data-dev bin $ kafka-avro-console-consumer --topic kaas.pe.enrollment.csp.ts2 --bootstrap-server kaas-test-ctc-a.optum.com:443 --consumer.config
/data/test/client-test-ssl.properties **--partition 1 --offset 499553**  --property schema.registry.url="http://kaas-test-schema-registry.com" --max-messages 1
{"**providerTinNumber":"018601027"**,"providerTINType":"TIN","enrollmentType":"ACH","vcpProvChoiceInd":{"string":"null"},"usrDefFld1":null,"usrDefFld2":null,"usrDefFld3":null,"usrDefFld4":null,"usrDefFld5":null,"usrDefFld6":null,"usrDefFld7":null,"usrDefFld8":null,"usrDefFld9":null,"usrDefFld10":null}
Processed a total of 1 messages
root@fast-data-dev bin $ kafka-avro-console-consumer --topic kaas.pe.enrollment.csp.ts2 --bootstrap-server kaas-test-ctc-a.optum.com:443 --consumer.config
/data/test/client-test-ssl.properties **--partition 0 --offset 500428** --property schema.registry.url="http://kaas-test-schema-registry.com" --max-messages 1
{"**providerTinNumber":"024580061"**,"providerTINType":"TIN","enrollmentType":"ACH","vcpProvChoiceInd":{"string":"null"},"usrDefFld1":null,"usrDefFld2":null,"usrDefFld3":null,"usrDefFld4":null,"usrDefFld5":null,"usrDefFld6":null,"usrDefFld7":null,"usrDefFld8":null,"usrDefFld9":null,"usrDefFld10":null}
Processed a total of 1 messages
  

we do have different values on different offsets and partition. It’s clearly visible there is something wrong in my code and it’s not happening for just 1 record but multiple.

Full Spring boot log :

 00:26:11.507 [restartedMain] INFO  com.emerald.peconsumer.ApplicationRun - Started ApplicationRun in 2.896 seconds (JVM running for 12.571)
00:26:13.357 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
00:26:13.357 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
00:26:13.359 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
00:26:13.359 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
00:26:13.521 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:13.521 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:15.196 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:15.197 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:30.504 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Finished assignment for group at generation 77: {consumer-csp-prov-emerald-test-2-d2f920dc-a52a-4ed4-aa0f-1e3ef268a4fc=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@1f9e0b89, consumer-csp-prov-emerald-test-1-242f32f2-b823-4946-be1f-a6c584a0f3ce=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@91e5bc9}
00:26:30.815 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Successfully joined group with generation 77
00:26:30.815 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Successfully joined group with generation 77
00:26:30.818 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-0
00:26:30.818 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-1
00:26:31.133 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-0 to the committed offset FetchPosition{offset=500428, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1559.uhc.com:9093 (id: 69 rack: null), epoch=37}}
00:26:31.133 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-1 to the committed offset FetchPosition{offset=499553, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1562.uhc.com:9093 (id: 72 rack: null), epoch=36}}
00:26:31.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.springframework.kafka.listener.KafkaMessageListenerContainer - csp-prov-emerald-test: partitions assigned: [kaas.pe.enrollment.csp.ts2-0]
00:26:31.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.springframework.kafka.listener.KafkaMessageListenerContainer - csp-prov-emerald-test: partitions assigned: [kaas.pe.enrollment.csp.ts2-1]
coming from stream :018601027 into offset 500428 and partition 0
coming from stream :018601027 into offset 499553 and partition 1
  

Update :

I tried to print records again with 1 consumer thread and 2 consumer thread.

Output from 2 consumer thread :

You can observe the randomness in behaviour.

Duplicate record : Same record is coming into 2 different partition

First run :

 coming from stream :018601027 into offset 500428 and partition 0 <-- duplicate 
coming from stream :018601027 into offset 499553 and partition 1 <-- duplicate
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :090341206 into offset 500430 and partition 0 <-- duplicate
coming from stream :090341206 into offset 499555 and partition 1 <-- duplicate
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :107422748 into offset 499561 and partition 1
coming from stream :113423162 into offset 500436 and partition 0
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :113424057 into offset 500437 and partition 0
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1
  

Second run :

  coming from stream :024580061 into offset 499553 and partition 1 <-- duplicate
coming from stream :024580061 into offset 500428 and partition 0 <-- duplicate
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :090341206 into offset 500430 and partition 0
coming from stream :031866294 into offset 499555 and partition 1
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :113423162 into offset 500436 and partition 0 <-- duplicate
coming from stream :113423162 into offset 499561 and partition 1 <-- duplicate
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :113424057 into offset 500437 and partition 0
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1
  

Вывод из 1 потока-потребителя :

Нет дублирования при использовании 1 потока-потребителя. Записи печатаются, как и ожидалось. Означает ли это, что параметр параллелизма spring-kafka ненадежен? тогда как я могу масштабировать приложение-потребитель для параллельной обработки записей?

 coming from stream :018601027 into offset 499553 and partition 1
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :031866294 into offset 499555 and partition 1
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :107422748 into offset 499561 and partition 1
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1
coming from stream :024580061 into offset 500428 and partition 0
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :090341206 into offset 500430 and partition 0
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :113423162 into offset 500436 and partition 0
coming from stream :113424057 into offset 500437 and partition 0
  

Ответ №1:

Скорее всего, у вас проблема с потокобезопасностью в коде, который вы вызываете из своего слушателя; при использовании нескольких потоков вы не должны использовать поля, если вы не защищаете их логикой синхронизации.

например

 public class NotThreadSafe {

    String someValue;

    void processRecord(ConsumerRecord<?, ?> record) {

        this.someValue = record.value();

        doSomeMoreWork();
    
    }

    void doSomeMoreWork() {
        ...
        doSomethingWith(this.someValue);
    }

}
  

Когда существует несколько потоков, один поток может видеть someValue из другого потока.

Комментарии:

1. Вам нужно исправить код, чтобы сделать его потокобезопасным или использовать только один поток. Вы также можете использовать synchronized блоки, если общее состояние используется только для небольшой части логики. Но это может привести к отказу от многопоточности, если вам нужно много синхронизации. Лучше переработать логику, чтобы не существовало общего состояния. В приведенном выше примере вы должны передавать someValue в качестве параметра вместо использования поля.