Соединитель приемника Elasticsearch выдает 403 запрещенных исключения при попытке создать индексы из тем

#elasticsearch #apache-kafka #apache-kafka-connect

#elasticsearch #apache-kafka #apache-kafka-connect

Вопрос:

я пытаюсь создать соединитель приемника для elastic cloud. Это конфигурация моего соединителя приемника Elasticsearch (с ksqldb).

 create sink connector elastic_writer with (
'connector.class'='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url'='********',
'connection.username'='********',
'connection.password'='********',
'type.name'='kafka-connect',
'topics.regex'='sqlserver.dbo.*',
'schema.ignore'='true');
  

Когда я создаю соединитель приемника, я сначала получаю эту ошибку.

 [2020-11-02 08:56:37,480] INFO Index 'sqlserver.dbo.quotations' not found in local cache; checking for existence (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,486] INFO Index 'sqlserver.dbo.quotations' not found in Elasticsearch. Error message: 403 Forbidden (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,486] INFO Requesting Elasticsearch create index 'sqlserver.dbo.quotations' (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,494] INFO Index 'sqlserver.dbo.quotations' not found in local cache; checking for existence (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,503] INFO Index 'sqlserver.dbo.quotations' not found in Elasticsearch. Error message: 403 Forbidden (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,504] WARN Failed to create index sqlserver.dbo.quotations with attempt 1/6, will attempt retry after 62 ms. Failure reason: Could not create index 'sqlserver.dbo.quotations': 403 Forbidden (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
  

затем он циклически повторяет все попытки, пока я, наконец, не получу следующую ошибку, и задача не будет выполнена.

 [2020-11-02 08:56:40,245] ERROR WorkerSinkTask{id=ELASTIC_WRITER-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Could not create index 'sqlserver.dbo.quotations': 403 Forbidden
        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndex(JestElasticsearchClient.java:451)
        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:421)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:374)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:131)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:614)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:71)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:679)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:451)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
[2020-11-02 08:56:40,246] ERROR WorkerSinkTask{id=ELASTIC_WRITER-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
  

Есть идеи о том, как решить эту проблему? Я уже пытался создать индексы перед созданием соединителя приемника, однако это не устранило проблему, и kafka connect выдал точно такую же ошибку.

Ответ №1:

Это исключение 403 выдается, когда соединитель приемника elastic не может подключиться к службе elastic. Проверьте настройки брандмауэра и / или фильтры, примененные при развертывании elastic cloud