#apache-kafka #apache-kafka-connect
#apache-kafka #apache-kafka-connect
Вопрос:
Я использую kafka connect (соединитель spoolDir) в распределенном режиме. После запуска worker, когда я отправляю запрос rest api, получен ответ с EOFException. Я проверил, что соединитель нормально работает в автономном режиме.
Http-ответ:
{"error_code":500,"message":"IO Error trying to forward REST request:
java.io.EOFException: HttpConnectionOverHTTP@7801635a::SocketChannelEndPoint@45ad3c42{broker/168.183.37.116:8083<->/10.96.134.132:65453,ISHUT,fill=-,flush=-,to=522046/0}{io=0/0,kio=0,kro=1}
>HttpConnectionOverHTTP@7801635a(l:/10.96.134.132:65453 <-> r:broker/168.183.37.116:8083,closed=false)=>HttpChannelOverHTTP@2991a896(exchange=HttpExchange@45b72f1c req=TERMINATED/null@null res=PENDING/null@null
[send=HttpSenderOverHTTP@1b783029(req=QUEUED,snd=COMPLETED,failure=null[HttpGenerator@379b5028{s=START}],recv=HttpReceiverOverHTTP@13ea8b55(rsp=IDLE,failure=null)[HttpParser{s=CLOSED,0 of -1}]]"}
В журналах сервера отображается следующая ошибка:
ERROR IO error forwarding REST request: (org.apache.kafka.connect.runtime.rest.RestClient:143)
java.util.concurrent.ExecutionException: java.io.EOFException: HttpConnectionOverHTTP@7801635a::SocketChannelEndPoint@45ad3c42{broker/168.183.37.116:8083<->/10.96.134.132:65453,ISHUT,fill=-,flush=-,to=522046/0}{io=0/0,kio=0,kro=1}->HttpConnectionOverHTTP@7801635a(l:/10.96.134.132:65453 <-> r:broker/168.183.37.116:8083,closed=false)=>HttpChannelOverHTTP@2991a896(exchange=HttpExchange@45b72f1c req=TERMINATED/null@null res=PENDING/null@null)[send=HttpSenderOverHTTP@1b783029(req=QUEUED,snd=COMPLETED,failure=null)[HttpGenerator@379b5028{s=START}],recv=HttpReceiverOverHTTP@13ea8b55(rsp=IDLE,failure=null)[HttpParser{s=CLOSED,0 of -1}]]
at org.eclipse.jetty.client.util.FutureResponseListener.getResult(FutureResponseListener.java:118)
at org.eclipse.jetty.client.util.FutureResponseListener.get(FutureResponseListener.java:101)
at org.eclipse.jetty.client.HttpRequest.send(HttpRequest.java:711)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:125)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:65)
at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:369)
at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:164)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
...
Ниже приведен файл свойств:
bootstrap.servers=kaas-test:443
security.protocol=SSL
ssl.truststore.location=<truststore-location>
ssl.truststore.password=<truststore-password>
ssl.keystore.location=<keystore-location>
ssl.keystore.password=<keystore-password>
ssl.key.password=<key-password>
producer.security.protocol=SSL
producer.ssl.truststore.location=<truststore-location>
producer.ssl.keystore.location=<keystore-location>
producer.ssl.truststore.password=<truststore-password>
producer.ssl.keystore.password=<keystore-password>
producer.ssl.key.password=<key-password>
group.id=tenant
schema.generation.enabled=true
schema.generation.value.name=schemavalue
schema.generation.key.name=schemakey
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
rest.port=28082
rest.hostname=localhost
plugin.path=connect-jars/confluentinc-connect-transforms-1.3.2/,connect-jars/jcustenborder-kafka-connect-spooldir-2.0.46/
Как я могу это решить?
Комментарии:
1. @Onecricket я создаю только одного работника. Добавлен файл свойств в вопросе.
2. Что, если вы измените
rest.hostname
использование внешнего имени хоста сервера?3. Кроме того
rest.listeners
, я думаю, что это предпочтительное свойство Connect4. Существуют ли вообще другие кластеры connect? В вашей ошибке указано два IP-адреса…
l:/10.96.134.132:65453 <-> r:broker/168.183.37.116:8083
5. Кроме того, когда я попытался запустить соединитель с помощью docker (с теми же конфигурациями, плюс одно свойство для rest.advertised.host.name ), он работал нормально.