#sql-server #apache-kafka #apache-kafka-connect
#sql-сервер #apache-kafka #apache-kafka-connect
Вопрос:
Мне нужен совет о том, как заставить kafka реплицировать структуру исходной таблицы в таблице назначения. Позвольте мне объяснить…
Исходная база данных: исходная таблица SQL Server :
CREATE TABLE dbo.PEOPLE(
ID NUMERIC(10) NOT NULL PRIMARY KEY,
FIRST_NAME VARCHAR(10),
LAST_NAME varchar(10),
AGE NUMERIC(3)
)
Целевая БД: PostgreSQL
Соединитель приемника Kafka:
name=pg-sink-connector_people
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
tasks.max=4
topics=myserver.dbo.PEOPLE
connection.url=jdbc:postgresql://localhost:5432/kafkadb
connection.user=postgres
connection.password=mypassword
insert.mode=upsert
pk.mode=record_key
pk.fields=ID
table.name.format=PEOPLE
auto.create=true
offset.storage.file.filename=C:/kafka_2.13-2.7.0/tmp/connect.offsets
bootstrap.servers=localhost:9092
plugin.path=C:/kafka_2.13-2.7.0/plugins
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_
auto.evolve=true
Когда я запускаю описанный выше соединитель, Kafka создает в PostgreSQL целевую таблицу, подобную этой:
CREATE TABLE public."PEOPLE" (
"before_ID" int8 NULL,
“before_FIRST_NAME" text NULL,
“before_LAST_NAME" text NULL,
"before_AGE" int8 NULL,
"after_ID" int8 NULL,
"after_FIRST_NAME" text NULL,
"after_LAST_NAME" text NULL,
"after_AGE" int8 NULL,
source_version text NOT NULL,
source_connector text NOT NULL,
source_name text NOT NULL,
source_ts_ms int8 NOT NULL,
source_snapshot text NULL DEFAULT 'false'::text,
source_db text NOT NULL,
source_schema text NOT NULL,
source_table text NOT NULL,
source_change_lsn text NULL,
source_commit_lsn text NULL,
source_event_serial_no int8 NULL,
op text NOT NULL,
ts_ms int8 NULL,
transaction_id text NULL,
transaction_total_order int8 NULL,
transaction_data_collection_order int8 NULL,
"ID" text NOT NULL,
CONSTRAINT "PEOPLE_pkey" PRIMARY KEY ("ID")
);
Дело в том, что мне не нужны эти поля до / после или другие, которые были созданы. Каков наилучший способ репликации точно такой же структуры моей исходной таблицы?
Спасибо!
Комментарии:
1. Одним из шагов было бы использование преобразования, отличного от flatten . Например, я считаю, что существует белый список, если вам нужны только определенные поля… В качестве альтернативы, если вы используете Debezium, у них есть собственное преобразование ExtractNewRecordState debezium.io/documentation/reference/configuration /…
Ответ №1:
Когда вы используете Debezium, он включает метаданные о записи изменения, которую он захватил, а также о состоянии до и после. Все эти данные передаются как вложенный объект.
На данный момент вы просто выравниваете все эти поля с Flatten
помощью преобразования одного сообщения. Если вам не нужны дополнительные поля, вы можете использовать ExtractNewRecordState
SMT, предоставляемый Debezium, именно для этой цели. Используйте его вместо Flatten
:
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
Комментарии:
1. Спасибо за ответ. Согласно моим тестам, ваше предложение работает очень хорошо. Единственный оставшийся вопрос: какие параметры я должен использовать для удаления строки в моей целевой таблице, которая была ранее удалена в исходной?
2. Это другой вопрос — отметьте этот вопрос как ответ (если вы считаете, что это так) и опубликуйте новый для вашего нового вопроса 🙂
3. добавьте transformes.unwrap.drop.tombstones=false для удаления