Поток Spring Cloud для Kafka с API потребителя / производителя ровно один раз семантика с префиксом идентификатора транзакции не работает должным образом

#kafka-consumer-api #kafka-producer-api #spring-cloud-stream

#kafka-consumer-api #kafka-producer-api #spring-cloud-stream

Вопрос:

У меня есть сценарий, в котором я вижу другое поведение. Всего 3 разных сервиса

  • Первая служба прослушает из очереди Solace и передаст ее в kafka topic-1 (где транзакции включены)
  • Вторая служба прослушает выше kafka topic-1 и запишет ее в другой kafka topic-2 (где у нас нет ручных коммитов, транзакции разрешены для создания в другой теме, смещение автоматической фиксации как false и изоляция.уровень установлен на read_commited) назад удалить
  • Третья служба прослушает раздел kafka topic-2 и запишет его обратно в очередь Solace (где у нас нет ручных коммитов, автоматическое смещение фиксации как false и изоляция.уровень установлен на read_commited).

Теперь проблема после того, как я включил транзакцию и уровень изоляции во второй службе, я не могу читать какие-либо сообщения, если я отключил транзакцию во второй службе, я могу читать все сообщения.

  • Можем ли мы включить транзакции и уровень изоляции в одном сервисе
  • Как это работает, если мой сервис является просто производителем или потребителем (как EoS гарантирует для этих сервисов)

Отредактировано: Ниже показано, как выглядит мой yml

  - kafka:
   - binder:
     - transaction:
         - transaction-id-prefix:
       - brokers: 
         - configuration: 
               all my consumer properties (ssl, sasl)
  

Обновлено (yml с spring cloud):

 spring: 
  cloud.stream:
      bindings:
        input:
          destination: test_input
          content-type: application/json
          group: test_group
        output:
          destination: test_output
          content-type: application/json
      kafka.binder: 
          configuration: 
            isolation.level: read_committed
            security.protocol: SASL_SSL
            sasl.mechanism: GSSAPI
            sasl.kerberos.service.name: kafka
            ssl.truststore.location: jks
            ssl.truststore.password: 
            ssl.endpoint.identification.algorithm: null            
          brokers: broker1:9092,broker2:9092,broker3:9092
          auto-create-topics: false
          transaction:
            transaction-id-prefix: trans-2
            producer:
              configuration:
                retries: 2000
                acks: all
                security.protocol: SASL_SSL
                sasl.mechanism: GSSAPI
                sasl.kerberos.service.name: kafka
                ssl.truststore.location: jks
                ssl.truststore.password: 
                ssl.endpoint.identification.algorithm: null
  

Обновлено (yml с spring kafka):

 spring:
  kafka:
    bootstrap-servers: broker1:9092,broker2:9092,broker3:9092
    consumer:
      properties:
        isolation.level: read_committed
        ssl.truststore.location: truststore.jks
        ssl.truststore.password: 
        security.protocol: SASL_SSL
        sasl.mechanism: GSSAPI
        sasl.kerberos.service.name: kafka
    producer:
      transaction-id-prefix: trans-2
      retries: 2000
      acks: all
      properties:
        ssl.truststore.location: truststore.jks
        ssl.truststore.password: 
        security.protocol: SASL_SSL
        sasl.mechanism: GSSAPI
        sasl.kerberos.service.name: kafka
    admin:
      properties:
        ssl.truststore.location: truststore.jks
        ssl.truststore.password: 
        security.protocol: SASL_SSL
        sasl.mechanism: GSSAPI
        sasl.kerberos.service.name: kafka
  

Обновлено с помощью dynamic destination

 Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
    at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:810) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.0.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:423) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:351) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
  

попробовал оба подхода для решения проблемы с решателем динамического назначения:
динамический преобразователь назначения

Комментарии:

1. Это должно работать нормально; Я попробую это.

Ответ №1:

У меня это работает нормально; все это в одном приложении, но это не будет иметь значения…

 @SpringBootApplication
@EnableBinding(Channels.class)
public class So55419549Application {

    public static void main(String[] args) {
        SpringApplication.run(So55419549Application.class, args);
    }

    @Bean
    public IntegrationFlow service1(MessageChannel out1) {
        return IntegrationFlows.from(() -> "foo", e -> e
                    .poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
                .log(Level.INFO, m -> "s1 "   m.getPayload())
                .channel(out1)
                .get();
    }

    @StreamListener("in2")
    @SendTo("out2")
    public String service2(String in) {
        System.out.println("s2 "   in);
        return in.toUpperCase();
    }

    @StreamListener("in3")
    public void service3(String in) {
        System.out.println("s3 "   in);
    }

}

interface Channels {

    @Output
    MessageChannel out1();

    @Input
    MessageChannel in2();

    @Output
    MessageChannel out2();

    @Input
    MessageChannel in3();

}
  

и

 spring:
  cloud:
    stream:
      bindings:
        out1:
          destination: topic1
        in2:
          group: s2
          destination: topic1
        out2:
          destination: topic2
        in3:
          group: s3
          destination: topic2
      kafka:
        binder:
          transaction:
            transaction-id-prefix: tx
        bindings:
          in2:
            consumer:
              configuration:
                isolation:
                  level: read_committed
          in3:
            consumer:
              configuration:
                isolation:
                  level: read_committed
  kafka:
    producer:
      # needed again here so boot declares a TM for us
      transaction-id-prefix: tx
      retries: 10
      acks: all
logging:
  level:
    org.springframework.kafka.transaction: debug
  

и

 2019-03-29 12:57:08.345  INFO 75700 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   
    : s1 foo
2019-03-29 12:57:08.353 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-03-29 12:57:08.353 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@6790c874, txId=txs2.topic1.0]]
s2 foo
2019-03-29 12:57:08.357 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-03-29 12:57:08.358 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@820ef3d, txId=txs3.topic2.0]]
s3 FOO
  

Редактировать

Связующее устройство не включает синхронизацию транзакций в диспетчере транзакций. В качестве обходного пути добавьте

 TransactionSynchronizationManager.setActualTransactionActive(true);
  

для вашего @StreamListener .

Я обнаружил ошибку в binder.

Комментарии:

1. когда я попробовал ту же конфигурацию, я получаю некоторую ошибку (Ошибка во время транзакционной операции; производитель удален из кэша; возможная причина: перезапуск брокера во время транзакции), и мой слушатель (который должен был прослушивать зафиксированные сообщения из своей предыдущей службы) не может прослушивать какие-либо сообщения. Через некоторое время сердцебиение прекращается, и оно переходит к перебалансировке. Ниже показано, как выглядит мой yml kafka: binder: транзакция: transaction-id-prefix: брокеры: конфигурация: все мои потребительские свойства (ssl, sasl)

2. Не добавляйте yaml в комментарии; вместо этого отредактируйте вопрос. Я не знаю, что еще я могу сказать; должно быть, у вас что-то не так; у меня с первой попытки все сработало нормально.

3. Как я уже сказал, я не могу больше помочь с предоставленной вами информацией; мое правильно настроенное приложение работает отлично, так что у вас, должно быть, что-то еще не так. Если вы не можете опубликовать упрощенную версию своего приложения (полную), которая демонстрирует поведение, которое вы где-то видите, я не могу помочь дальше.

4. отредактировал вопрос с помощью updated . yml. Когда я пробовал с spring cloud, я не смог получить как конфигурацию (в kafka.binder), так и транзакцию (kafka.binder). Он пытается создать несколько потоков-производителей, но суффикс транзакции для каждого потока-производителя не является порядковым номером (для начальных двух потоков я получаю порядковый номер 0,1, а для третьего потока я вижу транзакцию как добавленную с префиксом название группы название темы последующее число). и когда я попробовал ту же конфигурацию без spring cloud, она работает нормально, как и ожидалось. Если я сделал какую-либо неправильную конфигурацию, пожалуйста, поправьте меня.

5. Да; что-то не так, производитель не участвует в глобальной транзакции. Расследование…