#elasticsearch #apache-kafka #apache-kafka-connect
#elasticsearch #apache-kafka #apache-kafka-connect
Вопрос:
я пытаюсь создать соединитель приемника для elastic cloud. Это конфигурация моего соединителя приемника Elasticsearch (с ksqldb).
create sink connector elastic_writer with (
'connector.class'='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url'='********',
'connection.username'='********',
'connection.password'='********',
'type.name'='kafka-connect',
'topics.regex'='sqlserver.dbo.*',
'schema.ignore'='true');
Когда я создаю соединитель приемника, я сначала получаю эту ошибку.
[2020-11-02 08:56:37,480] INFO Index 'sqlserver.dbo.quotations' not found in local cache; checking for existence (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,486] INFO Index 'sqlserver.dbo.quotations' not found in Elasticsearch. Error message: 403 Forbidden (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,486] INFO Requesting Elasticsearch create index 'sqlserver.dbo.quotations' (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,494] INFO Index 'sqlserver.dbo.quotations' not found in local cache; checking for existence (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,503] INFO Index 'sqlserver.dbo.quotations' not found in Elasticsearch. Error message: 403 Forbidden (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,504] WARN Failed to create index sqlserver.dbo.quotations with attempt 1/6, will attempt retry after 62 ms. Failure reason: Could not create index 'sqlserver.dbo.quotations': 403 Forbidden (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
затем он циклически повторяет все попытки, пока я, наконец, не получу следующую ошибку, и задача не будет выполнена.
[2020-11-02 08:56:40,245] ERROR WorkerSinkTask{id=ELASTIC_WRITER-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Could not create index 'sqlserver.dbo.quotations': 403 Forbidden
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndex(JestElasticsearchClient.java:451)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:421)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:374)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:131)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:71)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:679)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-11-02 08:56:40,246] ERROR WorkerSinkTask{id=ELASTIC_WRITER-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
Есть идеи о том, как решить эту проблему? Я уже пытался создать индексы перед созданием соединителя приемника, однако это не устранило проблему, и kafka connect выдал точно такую же ошибку.
Ответ №1:
Это исключение 403 выдается, когда соединитель приемника elastic не может подключиться к службе elastic. Проверьте настройки брандмауэра и / или фильтры, примененные при развертывании elastic cloud