Распределенный режим Kafka Connect — ошибка пересылки запроса REST

#apache-kafka #apache-kafka-connect

#apache-kafka #apache-kafka-connect

Вопрос:

Я использую kafka connect (соединитель spoolDir) в распределенном режиме. После запуска worker, когда я отправляю запрос rest api, получен ответ с EOFException. Я проверил, что соединитель нормально работает в автономном режиме.

Http-ответ:

 {"error_code":500,"message":"IO Error trying to forward REST request:
java.io.EOFException: HttpConnectionOverHTTP@7801635a::SocketChannelEndPoint@45ad3c42{broker/168.183.37.116:8083<->/10.96.134.132:65453,ISHUT,fill=-,flush=-,to=522046/0}{io=0/0,kio=0,kro=1}
>HttpConnectionOverHTTP@7801635a(l:/10.96.134.132:65453 <-> r:broker/168.183.37.116:8083,closed=false)=>HttpChannelOverHTTP@2991a896(exchange=HttpExchange@45b72f1c req=TERMINATED/null@null res=PENDING/null@null
[send=HttpSenderOverHTTP@1b783029(req=QUEUED,snd=COMPLETED,failure=null[HttpGenerator@379b5028{s=START}],recv=HttpReceiverOverHTTP@13ea8b55(rsp=IDLE,failure=null)[HttpParser{s=CLOSED,0 of -1}]]"}
  

В журналах сервера отображается следующая ошибка:

  ERROR IO error forwarding REST request:  (org.apache.kafka.connect.runtime.rest.RestClient:143)
java.util.concurrent.ExecutionException: java.io.EOFException: HttpConnectionOverHTTP@7801635a::SocketChannelEndPoint@45ad3c42{broker/168.183.37.116:8083<->/10.96.134.132:65453,ISHUT,fill=-,flush=-,to=522046/0}{io=0/0,kio=0,kro=1}->HttpConnectionOverHTTP@7801635a(l:/10.96.134.132:65453 <-> r:broker/168.183.37.116:8083,closed=false)=>HttpChannelOverHTTP@2991a896(exchange=HttpExchange@45b72f1c req=TERMINATED/null@null res=PENDING/null@null)[send=HttpSenderOverHTTP@1b783029(req=QUEUED,snd=COMPLETED,failure=null)[HttpGenerator@379b5028{s=START}],recv=HttpReceiverOverHTTP@13ea8b55(rsp=IDLE,failure=null)[HttpParser{s=CLOSED,0 of -1}]]
        at org.eclipse.jetty.client.util.FutureResponseListener.getResult(FutureResponseListener.java:118)
        at org.eclipse.jetty.client.util.FutureResponseListener.get(FutureResponseListener.java:101)
        at org.eclipse.jetty.client.HttpRequest.send(HttpRequest.java:711)
        at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:125)
        at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:65)
        at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:369)
        at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:164)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        ...
  

Ниже приведен файл свойств:

 bootstrap.servers=kaas-test:443
security.protocol=SSL
ssl.truststore.location=<truststore-location>
ssl.truststore.password=<truststore-password>
ssl.keystore.location=<keystore-location>
ssl.keystore.password=<keystore-password>
ssl.key.password=<key-password>

producer.security.protocol=SSL
producer.ssl.truststore.location=<truststore-location>
producer.ssl.keystore.location=<keystore-location>
producer.ssl.truststore.password=<truststore-password>
producer.ssl.keystore.password=<keystore-password>
producer.ssl.key.password=<key-password>

group.id=tenant

schema.generation.enabled=true
schema.generation.value.name=schemavalue
schema.generation.key.name=schemakey

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.topic=connect-offsets
offset.storage.replication.factor=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3

offset.flush.interval.ms=10000

rest.port=28082
rest.hostname=localhost

plugin.path=connect-jars/confluentinc-connect-transforms-1.3.2/,connect-jars/jcustenborder-kafka-connect-spooldir-2.0.46/
  

Как я могу это решить?

Комментарии:

1. @Onecricket я создаю только одного работника. Добавлен файл свойств в вопросе.

2. Что, если вы измените rest.hostname использование внешнего имени хоста сервера?

3. Кроме того rest.listeners , я думаю, что это предпочтительное свойство Connect

4. Существуют ли вообще другие кластеры connect? В вашей ошибке указано два IP-адреса… l:/10.96.134.132:65453 <-> r:broker/168.183.37.116:8083

5. Кроме того, когда я попытался запустить соединитель с помощью docker (с теми же конфигурациями, плюс одно свойство для rest.advertised.host.name ), он работал нормально.