#apache-kafka #deserialization #kafka-consumer-api #apache-kafka-connect #apache-kafka-mirrormaker
Вопрос:
Я собираюсь использовать исходный разъем Kafka MirrorMaker 2, и по этой причине я попытался развернуть его в своем кластере Kafka в песочнице (созданном из Confluent docker compose).
- Я создал тему
my_test_topic
, в которой будут храниться значения в формате JSON. И настроенMirrorSourceConnector
для репликации в тот же кластер Кафки. Целевая темаsource_cluster.my_test_topic
создается автоматически. Вот конфигурация соединителя:
{
"name": "source-to-target-mm2-source-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"enabled": true,
"topics": "my_test_topic",
"source.cluster.alias": "source_cluster",
"target.cluster.alias": "target_cluster",
"source.cluster.bootstrap.servers": "broker:29092",
"target.cluster.bootstrap.servers": "broker:29092"
}
}
- Я помещаю данные в исходную тему через kafkacat:
kafkacat -P -b broker:29092 -t my_test_topic
> {"item":[{"price":"100"},{"price":"120"}],"another_item":[{"height":"50","width":"60"}]}
- Когда я пытаюсь получить данные из реплицированной темы (она же целевая тема), я получаю следующий результат, который, я думаю, имеет формат base64:
kafkacat -C -b broker:29092 -t source_cluster.my_test_topic -o beginning -e
> "eyJpdGVtIjpbeyJwcmljZSI6IjEwMCJ9LHsicHJpY2UiOiIxMjAifV0sImFub3RoZXJfaXRlbSI6W3siaGVpZ2h0IjoiNTAiLCJ3aWR0aCI6IjYwIn1dfQ=="
Это явно не то, чего я ожидал. Поэтому я также попробовал некоторые другие варианты:
- изменил класс преобразователя значений на
org.apache.kafka.connect.storage.StringConverter
и потребляемое значение[B@3d5e46ef
из целевой темы - изменил класс преобразователя значений на
org.apache.kafka.connect.converters.ByteArrayConverter
и, наконец, использовал ожидаемое значение{"item":[{"price":"100"},{"price":"120"}],"another_item":[{"height":"50","width":"60"}]}
из целевой темы
Какова причина, по которой JsonConverter и StringConverter не работают, в то время как ByteArrayConverter работает?
Кроме того, я попробовал некоторые другие параметры конфигурации соединителя:
"source.cluster.consumer.value-deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
с преобразователем строк"source.cluster.consumer.value-deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
с преобразователем строк
но они оба не сработали, и результат был похож на [B@5d6b605c
.
Оказывает ли "source.cluster.consumer.value-deserializer"
конфигурация какое-либо влияние в этом случае?
В то же время, когда я использую FileStreamSourceConnector для чтения JSON из файла, как StringConverter, так и JsonConverter работают нормально, в то время как ByteArrayConverter выдает ошибку Invalid schema type for ByteArrayConverter: STRING
.
Комментарии:
1. MirrorMaker ожидает
BYTES
тип схемы подключения с обеих сторон… Неясно насчет base64, но для меня это имеет смысл (исходный код находится на github, если вы хотели его проверить). Файловый поток может работать с JSON, но только в том случае, если вы используетеHoistField
transformer