#kafka-consumer-api #kafka-producer-api #spring-cloud-stream
#kafka-consumer-api #kafka-producer-api #spring-cloud-stream
Вопрос:
У меня есть сценарий, в котором я вижу другое поведение. Всего 3 разных сервиса
- Первая служба прослушает из очереди Solace и передаст ее в kafka topic-1 (где транзакции включены)
- Вторая служба прослушает выше kafka topic-1 и запишет ее в другой kafka topic-2 (где у нас нет ручных коммитов, транзакции разрешены для создания в другой теме, смещение автоматической фиксации как false и изоляция.уровень установлен на read_commited) назад удалить
- Третья служба прослушает раздел kafka topic-2 и запишет его обратно в очередь Solace (где у нас нет ручных коммитов, автоматическое смещение фиксации как false и изоляция.уровень установлен на read_commited).
Теперь проблема после того, как я включил транзакцию и уровень изоляции во второй службе, я не могу читать какие-либо сообщения, если я отключил транзакцию во второй службе, я могу читать все сообщения.
- Можем ли мы включить транзакции и уровень изоляции в одном сервисе
- Как это работает, если мой сервис является просто производителем или потребителем (как EoS гарантирует для этих сервисов)
Отредактировано: Ниже показано, как выглядит мой yml
- kafka:
- binder:
- transaction:
- transaction-id-prefix:
- brokers:
- configuration:
all my consumer properties (ssl, sasl)
Обновлено (yml с spring cloud):
spring:
cloud.stream:
bindings:
input:
destination: test_input
content-type: application/json
group: test_group
output:
destination: test_output
content-type: application/json
kafka.binder:
configuration:
isolation.level: read_committed
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
ssl.truststore.location: jks
ssl.truststore.password:
ssl.endpoint.identification.algorithm: null
brokers: broker1:9092,broker2:9092,broker3:9092
auto-create-topics: false
transaction:
transaction-id-prefix: trans-2
producer:
configuration:
retries: 2000
acks: all
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
ssl.truststore.location: jks
ssl.truststore.password:
ssl.endpoint.identification.algorithm: null
Обновлено (yml с spring kafka):
spring:
kafka:
bootstrap-servers: broker1:9092,broker2:9092,broker3:9092
consumer:
properties:
isolation.level: read_committed
ssl.truststore.location: truststore.jks
ssl.truststore.password:
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
producer:
transaction-id-prefix: trans-2
retries: 2000
acks: all
properties:
ssl.truststore.location: truststore.jks
ssl.truststore.password:
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
admin:
properties:
ssl.truststore.location: truststore.jks
ssl.truststore.password:
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
Обновлено с помощью dynamic destination
Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:810) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.0.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:423) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:351) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
попробовал оба подхода для решения проблемы с решателем динамического назначения:
динамический преобразователь назначения
Комментарии:
1. Это должно работать нормально; Я попробую это.
Ответ №1:
У меня это работает нормально; все это в одном приложении, но это не будет иметь значения…
@SpringBootApplication
@EnableBinding(Channels.class)
public class So55419549Application {
public static void main(String[] args) {
SpringApplication.run(So55419549Application.class, args);
}
@Bean
public IntegrationFlow service1(MessageChannel out1) {
return IntegrationFlows.from(() -> "foo", e -> e
.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
.log(Level.INFO, m -> "s1 " m.getPayload())
.channel(out1)
.get();
}
@StreamListener("in2")
@SendTo("out2")
public String service2(String in) {
System.out.println("s2 " in);
return in.toUpperCase();
}
@StreamListener("in3")
public void service3(String in) {
System.out.println("s3 " in);
}
}
interface Channels {
@Output
MessageChannel out1();
@Input
MessageChannel in2();
@Output
MessageChannel out2();
@Input
MessageChannel in3();
}
и
spring:
cloud:
stream:
bindings:
out1:
destination: topic1
in2:
group: s2
destination: topic1
out2:
destination: topic2
in3:
group: s3
destination: topic2
kafka:
binder:
transaction:
transaction-id-prefix: tx
bindings:
in2:
consumer:
configuration:
isolation:
level: read_committed
in3:
consumer:
configuration:
isolation:
level: read_committed
kafka:
producer:
# needed again here so boot declares a TM for us
transaction-id-prefix: tx
retries: 10
acks: all
logging:
level:
org.springframework.kafka.transaction: debug
и
2019-03-29 12:57:08.345 INFO 75700 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler
: s1 foo
2019-03-29 12:57:08.353 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-03-29 12:57:08.353 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@6790c874, txId=txs2.topic1.0]]
s2 foo
2019-03-29 12:57:08.357 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-03-29 12:57:08.358 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@820ef3d, txId=txs3.topic2.0]]
s3 FOO
Редактировать
Связующее устройство не включает синхронизацию транзакций в диспетчере транзакций. В качестве обходного пути добавьте
TransactionSynchronizationManager.setActualTransactionActive(true);
для вашего @StreamListener
.
Я обнаружил ошибку в binder.
Комментарии:
1. когда я попробовал ту же конфигурацию, я получаю некоторую ошибку (Ошибка во время транзакционной операции; производитель удален из кэша; возможная причина: перезапуск брокера во время транзакции), и мой слушатель (который должен был прослушивать зафиксированные сообщения из своей предыдущей службы) не может прослушивать какие-либо сообщения. Через некоторое время сердцебиение прекращается, и оно переходит к перебалансировке. Ниже показано, как выглядит мой yml kafka: binder: транзакция: transaction-id-prefix: брокеры: конфигурация: все мои потребительские свойства (ssl, sasl)
2. Не добавляйте yaml в комментарии; вместо этого отредактируйте вопрос. Я не знаю, что еще я могу сказать; должно быть, у вас что-то не так; у меня с первой попытки все сработало нормально.
3. Как я уже сказал, я не могу больше помочь с предоставленной вами информацией; мое правильно настроенное приложение работает отлично, так что у вас, должно быть, что-то еще не так. Если вы не можете опубликовать упрощенную версию своего приложения (полную), которая демонстрирует поведение, которое вы где-то видите, я не могу помочь дальше.
4. отредактировал вопрос с помощью updated . yml. Когда я пробовал с spring cloud, я не смог получить как конфигурацию (в kafka.binder), так и транзакцию (kafka.binder). Он пытается создать несколько потоков-производителей, но суффикс транзакции для каждого потока-производителя не является порядковым номером (для начальных двух потоков я получаю порядковый номер 0,1, а для третьего потока я вижу транзакцию как добавленную с префиксом название группы название темы последующее число). и когда я попробовал ту же конфигурацию без spring cloud, она работает нормально, как и ожидалось. Если я сделал какую-либо неправильную конфигурацию, пожалуйста, поправьте меня.
5. Да; что-то не так, производитель не участвует в глобальной транзакции. Расследование…