Сбой соединителя приемника Kafka MQTT с помощью HashMap не поддерживается

#apache-kafka #mqtt #apache-kafka-connect

Вопрос:

Вот моя конфигурация разъема раковины Кафки-MQTT

 name=anonymous
confluent.topic.bootstrap.servers= localhost:9092
connector.class=io.confluent.connect.mqtt.MqttSinkConnector
confluent.topic.replication.factor=1
tasks.max=1
mqtt.server.uri=tcp://127.0.0.1:1883
topics=mqtt
                     
 

Когда я запускаю Kafka-подключиться, это не удается с этим сообщением

     [2021-07-22 17:35:37,565] ERROR WorkerSinkTask{id=anonymous-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.util.HashMap is not a supported type. (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
java.lang.UnsupportedOperationException: java.util.HashMap is not a supported type.
    at io.confluent.connect.mqtt.SinkConverter.convert(SinkConverter.java:39)
    at io.confluent.connect.mqtt.MqttSinkTask.put(MqttSinkTask.java:95)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 

Версия Kafka-MQTT 1.4.1 Есть какие-либо указания на то, где она ошибается?

Комментарии:

1. В последний раз, когда я проверял, соединитель MQTT поддерживал только данные в байтах/строках. Пожалуйста, покажите, какие данные вы отправляете

2. У меня есть JsonConverter в моей конфигурации Kafka connect, и я использую данные json

3. Как для ключа, так и для ценности? Или только ценность? Вы добавили конфигурацию value.converter.schemas.enable=false ? Если нет, то что произойдет, когда вы это сделаете?

4. @OneCricketeer это работает после замены JsonConverter на StringConverter

Ответ №1:

Проверьте конвертеры ключей/значений по умолчанию , как вы запускаете свой kafka connect? Поделитесь своим файлом конфигурации подключения , какие значения у вас есть для преобразователей значений/ключей

Попробуйте добавить это определение конфигурации

  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.storage.StringConverter"
 

Комментарии:

1. У меня есть JsonConverter как для ключа, так и для значения в конфигурации Kafka connect.

2. та же ошибка изменила JsonConverter на StingConverter

3. Мой плохой, это работает, у меня был JsonConverter в 2 местах. Так что работает только преобразователь строк.