Соединитель источника Кафки десериализует значение JSON в вывод с неопределенным форматом

#apache-kafka #deserialization #kafka-consumer-api #apache-kafka-connect #apache-kafka-mirrormaker

Вопрос:

Я собираюсь использовать исходный разъем Kafka MirrorMaker 2, и по этой причине я попытался развернуть его в своем кластере Kafka в песочнице (созданном из Confluent docker compose).

  1. Я создал тему my_test_topic , в которой будут храниться значения в формате JSON. И настроен MirrorSourceConnector для репликации в тот же кластер Кафки. Целевая тема source_cluster.my_test_topic создается автоматически. Вот конфигурация соединителя:
 {
  "name": "source-to-target-mm2-source-connector",
  "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "enabled": true,
    "topics": "my_test_topic",
    "source.cluster.alias": "source_cluster",
    "target.cluster.alias": "target_cluster",
    "source.cluster.bootstrap.servers": "broker:29092",
    "target.cluster.bootstrap.servers": "broker:29092"
  }
}
 
  1. Я помещаю данные в исходную тему через kafkacat:
 kafkacat -P -b broker:29092 -t my_test_topic

> {"item":[{"price":"100"},{"price":"120"}],"another_item":[{"height":"50","width":"60"}]}
 
  1. Когда я пытаюсь получить данные из реплицированной темы (она же целевая тема), я получаю следующий результат, который, я думаю, имеет формат base64:
 kafkacat -C -b broker:29092 -t source_cluster.my_test_topic -o beginning -e

> "eyJpdGVtIjpbeyJwcmljZSI6IjEwMCJ9LHsicHJpY2UiOiIxMjAifV0sImFub3RoZXJfaXRlbSI6W3siaGVpZ2h0IjoiNTAiLCJ3aWR0aCI6IjYwIn1dfQ=="
 

Это явно не то, чего я ожидал. Поэтому я также попробовал некоторые другие варианты:

  • изменил класс преобразователя значений на org.apache.kafka.connect.storage.StringConverter и потребляемое значение [B@3d5e46ef из целевой темы
  • изменил класс преобразователя значений на org.apache.kafka.connect.converters.ByteArrayConverter и, наконец, использовал ожидаемое значение {"item":[{"price":"100"},{"price":"120"}],"another_item":[{"height":"50","width":"60"}]} из целевой темы

Какова причина, по которой JsonConverter и StringConverter не работают, в то время как ByteArrayConverter работает?

Кроме того, я попробовал некоторые другие параметры конфигурации соединителя:

  • "source.cluster.consumer.value-deserializer": "org.apache.kafka.common.serialization.StringDeserializer" с преобразователем строк
  • "source.cluster.consumer.value-deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer" с преобразователем строк

но они оба не сработали, и результат был похож на [B@5d6b605c .

Оказывает ли "source.cluster.consumer.value-deserializer" конфигурация какое-либо влияние в этом случае?


В то же время, когда я использую FileStreamSourceConnector для чтения JSON из файла, как StringConverter, так и JsonConverter работают нормально, в то время как ByteArrayConverter выдает ошибку Invalid schema type for ByteArrayConverter: STRING .

Комментарии:

1. MirrorMaker ожидает BYTES тип схемы подключения с обеих сторон… Неясно насчет base64, но для меня это имеет смысл (исходный код находится на github, если вы хотели его проверить). Файловый поток может работать с JSON, но только в том случае, если вы используете HoistField transformer