Соединитель источника и приемника Kafka не работает для больших данных

#apache-kafka-connect

#apache-kafka-connect

Вопрос:

Я создаю конвейер данных, используя соединитель источника и приемника Kafka. Соединитель источника использует базу данных SQL и публикует в теме, а соединитель приемника подписывается на тему и помещает в другую базу данных SQL. В таблице 16 ГБ данных. Теперь проблема в том, что данные не передаются из одной базы данных в другую. Однако, если размер таблицы невелик, например 1000 строк, то данные успешно передаются.

Конфигурация соединителя источника:

 "config": {
       "connector.class": 
"io.confluent.connect.jdbc.JdbcSourceConnector",
       "tasks.max": "1",
       "connection.url": "",
       "mode": "incrementing",
       "incrementing.column.name": "ID",
       "topic.prefix": "migration_",
       "name": "jdbc-source",
       "validate.non.null": false,
       "batch.max.rows":5
     }
  

Журналы соединителя источника:

 INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit 
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit
[2019-03-08 16:48:55,403] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets(org.apache.kafka.connect.runtime.WorkerSourceTask:397)
  

Кто-нибудь может подсказать мне, как настроить мой соединитель источника Kafka для передачи больших объемов данных?

Комментарии:

1. Вам нужно копнуть немного глубже. «Данные не передаются» недостаточно конкретно. Передается ли что -нибудь из этого? Если да, то после какого момента это прекращается? Есть ли какие-либо ошибки в журнале? Все ли данные попадают в раздел Kafka и не передаются в базу данных приемника, или они даже не попадают в раздел?

2. @RobinMoffatt Если в таблице меньше строк (1000), то конвейер данных работает нормально. Источник помещается в тему, а приемник извлекает данные из темы и помещает в БД. Но когда имеется 1 миллион строк, соединитель источника фиксирует смещение, как показано в журналах выше. Эти журналы генерируются непрерывно., поэтому я не уверен, поместил ли он целые данные в тему или нет. В журнале нет ошибок. И я не мог видеть никаких журналов, связанных с соединителем приемника.

3. «Я не уверен, поместил ли он целые данные в тему или нет. » — используйте Consumer для проверки 🙂 kafkacat, kafka-console-consumer и т.д.

4. Возможно, это из-за того, что буфер производителя Kafka не способен обрабатывать столько сообщений в одном потоке. Попробуйте обойти параметры конфигурации kafka connect worker, такие как «смещение. flush.interval.ms » , «смещение. flush.timeout.ms «, «буфер.память», «пакет.размер»

5. У меня та же проблема. Вы когда-нибудь находили проблему?

Ответ №1:

Мне удалось преодолеть эту проблему, ограничив количество записей, возвращаемых в одном запросе к базе данных, например, 5000 за раз.

Решение будет зависеть от базы данных и диалекта SQL. Приведенные ниже примеры будут работать и корректно управлять смещениями для одной таблицы. Увеличивающийся столбец ID и временная метка должны быть установлены в соответствии с инструкциями, указанными здесь: https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/index.html#incremental-query-modes

Пример таблицы myTable содержит следующие столбцы:

  • id увеличивается при каждом добавлении новой записи
  • lastUpdatedTimestamp — обновляется каждый раз, когда обновляется запись
  • некоторые другие атрибуты

id и lastUpdatedTimestamp должен однозначно идентифицировать запись в наборе данных.

Соединитель формирует запрос следующим образом:

config.query Kafka Connect WHERE clause for a selected mode config.query.suffix

PostgreSQL / MySQL

 "config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "table.whitelist": "myTable",
    "query.suffix": "LIMIT 5000"
    ...
    }
  

Приведет к:

 SELECT *
FROM "myTable"
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC
LIMIT 5000
  

Или следующий, если вы хотите добавить дополнительное условие в предложение WHERE.

 "config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "query": "SELECT * FROM ( SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE Age > 18) myQuery",
    "query.suffix": "LIMIT 5000"
    ...
    }
  

Приведет к:

 SELECT *
FROM (
    SELECT id, lastUpdatedTimestamp, name, age
    FROM myTable
    WHERE Age > 18
    ) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC
LIMIT 5000
  

SQL Server

 "config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "query": "SELECT TOP 5000 * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable) myQuery",
    ...
    }
  

Приведет к:

 SELECT TOP 5000 *
FROM (
    SELECT id, lastUpdatedTimestamp, name, age
    FROM myTable
    WHERE Age > 18
    ) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC
  

Oracle

 "config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "query": "SELECT * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE ROWNUM <= 5000) myQuery",
    ...
    }
  

Приведет к:

 SELECT *
FROM (
    SELECT id, lastUpdatedTimestamp, name, age
    FROM myTable
    WHERE ROWNUM <= 5000
    ) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC
  

Этот подход не будет работать с bulk режимом. Он работает с timestamp incrementing режимом и может работать с timestamp или incrementing режимами в зависимости от характеристик таблицы.

Объединение многих таблиц — идея, которую я не тестировал!

Это усложняется, если запрос выполняет объединения во многих таблицах. Для этого потребуется следующее:

  • укажите идентификатор, который однозначно идентифицировал бы строку, например, объединенную tableA.id tableB.id
  • укажите временную метку последней обновленной записи таблицы, какой бы ни была последняя
  • упорядочивайте записи запроса соответствующим образом

Существует ограничение на то, какой длины может быть идентификатор из-за длинного типа данных Java, используемого Kafka Connect, т.Е. 9,223,372,036,854,775,807

Комментарии:

1. Привет @kaminzo . Я думаю, что это не сработает. Для oracle, если внутренний запрос, похоже, не учитывает смещение, управляемое задачей соединителя источника? Кстати, я предполагаю, что правильное управление смещениями является первостепенной задачей, подобной той, что используется в реализации по умолчанию.

2. Привет @simpleusr — Я отредактировал ответ, чтобы добавить примеры запросов, сгенерированных Kafka Connect. Окончательный запрос включает в себя ORDER BY — это должно корректно управлять смещениями, поскольку оно применяется во внешнем запросе