#apache-kafka-connect
#apache-kafka-connect
Вопрос:
Я создаю конвейер данных, используя соединитель источника и приемника Kafka. Соединитель источника использует базу данных SQL и публикует в теме, а соединитель приемника подписывается на тему и помещает в другую базу данных SQL. В таблице 16 ГБ данных. Теперь проблема в том, что данные не передаются из одной базы данных в другую. Однако, если размер таблицы невелик, например 1000 строк, то данные успешно передаются.
Конфигурация соединителя источника:
"config": {
"connector.class":
"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "",
"mode": "incrementing",
"incrementing.column.name": "ID",
"topic.prefix": "migration_",
"name": "jdbc-source",
"validate.non.null": false,
"batch.max.rows":5
}
Журналы соединителя источника:
INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit
[2019-03-08 16:48:55,403] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets(org.apache.kafka.connect.runtime.WorkerSourceTask:397)
Кто-нибудь может подсказать мне, как настроить мой соединитель источника Kafka для передачи больших объемов данных?
Комментарии:
1. Вам нужно копнуть немного глубже. «Данные не передаются» недостаточно конкретно. Передается ли что -нибудь из этого? Если да, то после какого момента это прекращается? Есть ли какие-либо ошибки в журнале? Все ли данные попадают в раздел Kafka и не передаются в базу данных приемника, или они даже не попадают в раздел?
2. @RobinMoffatt Если в таблице меньше строк (1000), то конвейер данных работает нормально. Источник помещается в тему, а приемник извлекает данные из темы и помещает в БД. Но когда имеется 1 миллион строк, соединитель источника фиксирует смещение, как показано в журналах выше. Эти журналы генерируются непрерывно., поэтому я не уверен, поместил ли он целые данные в тему или нет. В журнале нет ошибок. И я не мог видеть никаких журналов, связанных с соединителем приемника.
3. «Я не уверен, поместил ли он целые данные в тему или нет. » — используйте Consumer для проверки 🙂 kafkacat, kafka-console-consumer и т.д.
4. Возможно, это из-за того, что буфер производителя Kafka не способен обрабатывать столько сообщений в одном потоке. Попробуйте обойти параметры конфигурации kafka connect worker, такие как «смещение. flush.interval.ms » , «смещение. flush.timeout.ms «, «буфер.память», «пакет.размер»
5. У меня та же проблема. Вы когда-нибудь находили проблему?
Ответ №1:
Мне удалось преодолеть эту проблему, ограничив количество записей, возвращаемых в одном запросе к базе данных, например, 5000 за раз.
Решение будет зависеть от базы данных и диалекта SQL. Приведенные ниже примеры будут работать и корректно управлять смещениями для одной таблицы. Увеличивающийся столбец ID и временная метка должны быть установлены в соответствии с инструкциями, указанными здесь: https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/index.html#incremental-query-modes
Пример таблицы myTable
содержит следующие столбцы:
id
увеличивается при каждом добавлении новой записиlastUpdatedTimestamp
— обновляется каждый раз, когда обновляется запись- некоторые другие атрибуты
id
и lastUpdatedTimestamp
должен однозначно идентифицировать запись в наборе данных.
Соединитель формирует запрос следующим образом:
config.query
Kafka Connect WHERE clause for a selected mode
config.query.suffix
PostgreSQL / MySQL
"config": {
...
"poll.interval.ms" : 10000,
"mode":"timestamp incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "lastUpdatedTimestamp",
"table.whitelist": "myTable",
"query.suffix": "LIMIT 5000"
...
}
Приведет к:
SELECT *
FROM "myTable"
WHERE "myTable"."lastUpdatedTimestamp" < ?
AND (
("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
OR
"myTable"."lastUpdatedTimestamp" > ?
)
ORDER BY
"myTable"."lastUpdatedTimestamp",
"myTable"."id" ASC
LIMIT 5000
Или следующий, если вы хотите добавить дополнительное условие в предложение WHERE.
"config": {
...
"poll.interval.ms" : 10000,
"mode":"timestamp incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "lastUpdatedTimestamp",
"query": "SELECT * FROM ( SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE Age > 18) myQuery",
"query.suffix": "LIMIT 5000"
...
}
Приведет к:
SELECT *
FROM (
SELECT id, lastUpdatedTimestamp, name, age
FROM myTable
WHERE Age > 18
) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
AND (
("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
OR
"myTable"."lastUpdatedTimestamp" > ?
)
ORDER BY
"myTable"."lastUpdatedTimestamp",
"myTable"."id" ASC
LIMIT 5000
SQL Server
"config": {
...
"poll.interval.ms" : 10000,
"mode":"timestamp incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "lastUpdatedTimestamp",
"query": "SELECT TOP 5000 * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable) myQuery",
...
}
Приведет к:
SELECT TOP 5000 *
FROM (
SELECT id, lastUpdatedTimestamp, name, age
FROM myTable
WHERE Age > 18
) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
AND (
("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
OR
"myTable"."lastUpdatedTimestamp" > ?
)
ORDER BY
"myTable"."lastUpdatedTimestamp",
"myTable"."id" ASC
Oracle
"config": {
...
"poll.interval.ms" : 10000,
"mode":"timestamp incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "lastUpdatedTimestamp",
"query": "SELECT * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE ROWNUM <= 5000) myQuery",
...
}
Приведет к:
SELECT *
FROM (
SELECT id, lastUpdatedTimestamp, name, age
FROM myTable
WHERE ROWNUM <= 5000
) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
AND (
("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
OR
"myTable"."lastUpdatedTimestamp" > ?
)
ORDER BY
"myTable"."lastUpdatedTimestamp",
"myTable"."id" ASC
Этот подход не будет работать с bulk
режимом. Он работает с timestamp incrementing
режимом и может работать с timestamp
или incrementing
режимами в зависимости от характеристик таблицы.
Объединение многих таблиц — идея, которую я не тестировал!
Это усложняется, если запрос выполняет объединения во многих таблицах. Для этого потребуется следующее:
- укажите идентификатор, который однозначно идентифицировал бы строку, например, объединенную tableA.id tableB.id
- укажите временную метку последней обновленной записи таблицы, какой бы ни была последняя
- упорядочивайте записи запроса соответствующим образом
Существует ограничение на то, какой длины может быть идентификатор из-за длинного типа данных Java, используемого Kafka Connect, т.Е. 9,223,372,036,854,775,807
Комментарии:
1. Привет @kaminzo . Я думаю, что это не сработает. Для oracle, если внутренний запрос, похоже, не учитывает смещение, управляемое задачей соединителя источника? Кстати, я предполагаю, что правильное управление смещениями является первостепенной задачей, подобной той, что используется в реализации по умолчанию.
2. Привет @simpleusr — Я отредактировал ответ, чтобы добавить примеры запросов, сгенерированных Kafka Connect. Окончательный запрос включает в себя
ORDER BY
— это должно корректно управлять смещениями, поскольку оно применяется во внешнем запросе