#elasticsearch #apache-kafka #apache-kafka-connect #debezium
Вопрос:
У меня есть этот разъем и приемник, который в основном создает тему с «Test.dbo.TEST_A» и запишите в индекс ES «Тест». Я установил «ключ.игнорировать»: «ложь», чтобы обновления строк также обновлялись в ES, и «преобразования.развернуть.добавить.поля»:»таблица», чтобы отслеживать, к какой таблице принадлежит документ.
{
"name": "Test-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "192.168.1.234",
"database.port": "1433",
"database.user": "user",
"database.password": "pass",
"database.dbname": "Test",
"database.server.name": "MyServer",
"table.include.list": "dbo.TEST_A",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testA",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields":"table"
}
}
{
"name": "elastic-sink-test",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "TEST_A",
"connection.url": "http://localhost:9200/",
"string.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.enable": "false",
"schema.ignore": "true",
"transforms": "topicRoute,unwrap,key",
"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "(.*).dbo.TEST_A", /* Use the database name */
"transforms.topicRoute.replacement": "$1",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "Id",
"key.ignore": "false",
"type.name": "TEST_A",
"behavior.on.null.values": "delete"
}
}
Но когда я добавляю другой соединитель/приемник, чтобы включить другую таблицу «TEST_B» из базы данных.
Похоже, что всякий раз, когда идентификаторы из TEST_A и TEST_B совпадают, одна из строк удаляется из ES?
Возможно ли при такой настройке иметь один индекс = один dabase или единственное решение-иметь один индекс на таблицу? Причина, по которой я хочу иметь один индекс = один dabase, заключается в уменьшении количества индексов при добавлении в ES большего количества баз данных.
Комментарии:
1. хм, в моем «Тестовом соединителе» я, возможно, смогу создать поле «UniqueID» с именем таблицы id и использовать его в качестве идентификатора документа ES. Я думаю о docs.confluent.io/platform/current/connect/transforms/… или Ksql, или есть какие-либо другие лучшие варианты?
Ответ №1:
Вы считываете изменения данных из разных баз данных/таблиц и записываете их в один и тот же индекс ElasticSearch с идентификатором документа ES, равным идентификатору записи БД. И, как вы можете видеть, если идентификаторы записей базы данных столкнутся, идентификаторы индексных документов также столкнутся, что приведет к удалению старых документов.
У вас есть несколько вариантов здесь:
- Индекс ElasticSearch на имя БД/таблицы: вы можете реализовать это с помощью различных соединителей или с помощью пользовательского преобразования одного сообщения (SMT).
- Глобально уникальные записи БД: Если вы управляете схемой исходных таблиц, вы можете установить первичный ключ в UUID. Это предотвратит конфликты идентификаторов.
- Как вы упомянули в комментариях, установите идентификатор документа ES в DB/Table/ID. Вы можете реализовать это изменение с помощью SMT