Как заставить kafka реплицировать структуру исходной таблицы в таблице назначения

#sql-server #apache-kafka #apache-kafka-connect

#sql-сервер #apache-kafka #apache-kafka-connect

Вопрос:

Мне нужен совет о том, как заставить kafka реплицировать структуру исходной таблицы в таблице назначения. Позвольте мне объяснить…

Исходная база данных: исходная таблица SQL Server :

 CREATE TABLE dbo.PEOPLE(
    ID NUMERIC(10) NOT NULL PRIMARY KEY,
    FIRST_NAME VARCHAR(10),
    LAST_NAME varchar(10),
    AGE NUMERIC(3)
)
 

Целевая БД: PostgreSQL

Соединитель приемника Kafka:

 name=pg-sink-connector_people

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

tasks.max=4

topics=myserver.dbo.PEOPLE

connection.url=jdbc:postgresql://localhost:5432/kafkadb

connection.user=postgres

connection.password=mypassword

insert.mode=upsert

pk.mode=record_key

pk.fields=ID

table.name.format=PEOPLE

auto.create=true

offset.storage.file.filename=C:/kafka_2.13-2.7.0/tmp/connect.offsets

bootstrap.servers=localhost:9092

plugin.path=C:/kafka_2.13-2.7.0/plugins

transforms=flatten

transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value

transforms.flatten.delimiter=_

auto.evolve=true
 

Когда я запускаю описанный выше соединитель, Kafka создает в PostgreSQL целевую таблицу, подобную этой:

 CREATE TABLE public."PEOPLE" (

    "before_ID" int8 NULL,

    “before_FIRST_NAME" text NULL,

    “before_LAST_NAME" text NULL,

    "before_AGE" int8 NULL,

    "after_ID" int8 NULL,

    "after_FIRST_NAME" text NULL,

    "after_LAST_NAME" text NULL,

    "after_AGE" int8 NULL,

    source_version text NOT NULL,

    source_connector text NOT NULL,

    source_name text NOT NULL,

    source_ts_ms int8 NOT NULL,

    source_snapshot text NULL DEFAULT 'false'::text,

    source_db text NOT NULL,

    source_schema text NOT NULL,

    source_table text NOT NULL,

    source_change_lsn text NULL,

    source_commit_lsn text NULL,

    source_event_serial_no int8 NULL,

    op text NOT NULL,

    ts_ms int8 NULL,

    transaction_id text NULL,

    transaction_total_order int8 NULL,

    transaction_data_collection_order int8 NULL,

    "ID" text NOT NULL,

    CONSTRAINT "PEOPLE_pkey" PRIMARY KEY ("ID")
);
 

Дело в том, что мне не нужны эти поля до / после или другие, которые были созданы. Каков наилучший способ репликации точно такой же структуры моей исходной таблицы?

Спасибо!

Комментарии:

1. Одним из шагов было бы использование преобразования, отличного от flatten . Например, я считаю, что существует белый список, если вам нужны только определенные поля… В качестве альтернативы, если вы используете Debezium, у них есть собственное преобразование ExtractNewRecordState debezium.io/documentation/reference/configuration /…

Ответ №1:

Когда вы используете Debezium, он включает метаданные о записи изменения, которую он захватил, а также о состоянии до и после. Все эти данные передаются как вложенный объект.

На данный момент вы просто выравниваете все эти поля с Flatten помощью преобразования одного сообщения. Если вам не нужны дополнительные поля, вы можете использовать ExtractNewRecordState SMT, предоставляемый Debezium, именно для этой цели. Используйте его вместо Flatten :

 transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
 

Комментарии:

1. Спасибо за ответ. Согласно моим тестам, ваше предложение работает очень хорошо. Единственный оставшийся вопрос: какие параметры я должен использовать для удаления строки в моей целевой таблице, которая была ранее удалена в исходной?

2. Это другой вопрос — отметьте этот вопрос как ответ (если вы считаете, что это так) и опубликуйте новый для вашего нового вопроса 🙂

3. добавьте transformes.unwrap.drop.tombstones=false для удаления