Возможно ли иметь один индекс Elasticsearch для одной базы данных с таблицами, использующими debezium и кафку?

#elasticsearch #apache-kafka #apache-kafka-connect #debezium

Вопрос:

У меня есть этот разъем и приемник, который в основном создает тему с «Test.dbo.TEST_A» и запишите в индекс ES «Тест». Я установил «ключ.игнорировать»: «ложь», чтобы обновления строк также обновлялись в ES, и «преобразования.развернуть.добавить.поля»:»таблица», чтобы отслеживать, к какой таблице принадлежит документ.

 {
    "name": "Test-connector", 
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
        "tasks.max": "1",
        "database.hostname": "192.168.1.234", 
        "database.port": "1433", 
        "database.user": "user", 
        "database.password": "pass", 
        "database.dbname": "Test", 
        "database.server.name": "MyServer",
        "table.include.list": "dbo.TEST_A",
        "database.history.kafka.bootstrap.servers": "kafka:9092", 
        "database.history.kafka.topic": "dbhistory.testA",

        "transforms": "unwrap",

        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "transforms.unwrap.add.fields":"table"
    }
}
 
 {
    "name": "elastic-sink-test",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "TEST_A",
        "connection.url": "http://localhost:9200/",
        "string.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schema.enable": "false",
        "schema.ignore": "true",

        "transforms": "topicRoute,unwrap,key",

        "transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.topicRoute.regex": "(.*).dbo.TEST_A",                          /* Use the database name */
        "transforms.topicRoute.replacement": "$1",

        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",    
        "transforms.unwrap.drop.tombstones": "false",    

        "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.key.field": "Id",       

        "key.ignore": "false",                                                        
        "type.name": "TEST_A",
        "behavior.on.null.values": "delete"                                                     
    }
}
 

Но когда я добавляю другой соединитель/приемник, чтобы включить другую таблицу «TEST_B» из базы данных.
Похоже, что всякий раз, когда идентификаторы из TEST_A и TEST_B совпадают, одна из строк удаляется из ES?

Возможно ли при такой настройке иметь один индекс = один dabase или единственное решение-иметь один индекс на таблицу? Причина, по которой я хочу иметь один индекс = один dabase, заключается в уменьшении количества индексов при добавлении в ES большего количества баз данных.

Комментарии:

1. хм, в моем «Тестовом соединителе» я, возможно, смогу создать поле «UniqueID» с именем таблицы id и использовать его в качестве идентификатора документа ES. Я думаю о docs.confluent.io/platform/current/connect/transforms/… или Ksql, или есть какие-либо другие лучшие варианты?

Ответ №1:

Вы считываете изменения данных из разных баз данных/таблиц и записываете их в один и тот же индекс ElasticSearch с идентификатором документа ES, равным идентификатору записи БД. И, как вы можете видеть, если идентификаторы записей базы данных столкнутся, идентификаторы индексных документов также столкнутся, что приведет к удалению старых документов.

У вас есть несколько вариантов здесь:

  • Индекс ElasticSearch на имя БД/таблицы: вы можете реализовать это с помощью различных соединителей или с помощью пользовательского преобразования одного сообщения (SMT).
  • Глобально уникальные записи БД: Если вы управляете схемой исходных таблиц, вы можете установить первичный ключ в UUID. Это предотвратит конфликты идентификаторов.
  • Как вы упомянули в комментариях, установите идентификатор документа ES в DB/Table/ID. Вы можете реализовать это изменение с помощью SMT