Запрос соединителя Kafka Connect JDBC в режиме увеличения загружает большие наборы данных при первоначальном опросе

#mysql #jdbc #apache-kafka #apache-kafka-connect

#mysql #jdbc #apache-kafka #apache-kafka-connect

Вопрос:

Я использую JDBC connector для перемещения данных из MySQL в Kafka. Интересующие меня данные поступают из выбранных объединяющих 3 таблиц, поэтому я настроил свой соединитель с помощью mode:incrementing и query :

 {
    "name": "stats",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
        "connection.url": "jdbc:mysql://DB_HOST:3306/SCHEMA?user=USERamp;password=PASSWORDamp;zeroDateTimeBehavior=CONVERT_TO_NULLamp;useSSL=false",
        "mode": "incrementing",
        "validate.non.null": "false",
        "topic.prefix": "t",
        "incrementing.column.name": "s.id",
        "transforms": "createKey,extractString",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "uuid",
        "transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractString.field": "uuid",
        "quote.sql.identifiers":"never",
        "query": "select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id",
        "errors.tolerance": "all",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "batch.max.rows": "100",
        "poll.interval.ms": "60000"
    }
}
  

При проверке состояния соединителя я получаю, что он запущен:

 curl http://conncet:8083/connectors/stats/status

{
    "name": "stats",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect-3:38083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "connect-1:18083"
        }
    ],
    "type": "source"
}
  

Но через час я все еще не вижу созданной темы. Я проверил в MySQL, с какими запросами выполняются show full processlist; , и я вижу два запроса, подобных этому:

 select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id WHERE s.id > -1 ORDER BY s.id ASC
  

Таким образом, в основном запрос такой же, как запрос, который я предоставил в query в connector configuration plus WHERE s.id > -1 ORDER BY s.id ASC , поскольку запрос в этом соединении выдает результирующий набор из 21 миллиона строк, MySQL отправляет данные в течение длительного времени. Когда я снова проверяю с помощью show full processlist; , я вижу теперь 4 запроса, подобных этому, а затем 8, а затем 16, и так далее.

Вопросы следующие:

  1. Почему Kafka connect пытается получить ВСЕ строки сразу при добавлении: s.id > -1 ORDER BY s.id ASC .
  2. Возможно ли настроить соединитель так, чтобы он этого не делал, и вместо этого извлекал меньший объем?
  3. "batch.max.rows": "100" Контролирует размер пакета только после первоначального опроса??

Обновить:

Для этого вопроса есть открытая тема. Я думаю, что этот вопрос можно закрыть.

Ответ №1:

query.suffix был добавлен в версии 5.5. Я использовал его для добавления оператора limit, и он отлично сработал, он просто добавляет limit в конец запроса.

смотрите проблему

Ответ №2:

Соединитель источника JDBC с incrementing mode и передан query , выполните этот запрос со следующим предложением where: WHERE incrementingColumnName > lastIncrementedValue ORDER BY incrementingColumnName ASC . (если вы используете инкрементный режим и запрос, вы не можете передать where предложение туда).

При первом опросе lastIncrementedValue равно -1, поэтому он пытается запросить все записи. После извлечения каждой записи lastIncrementedValue увеличивается, поэтому при следующем запросе будут опрашиваться только новые данные. batch.max.rows указывает, сколько записей SourceTask::poll(...) вернется в Kafka Connect framework. Это максимальный размер пакета, который будет отправлен в Kafka сразу.

Я думаю, когда вы извлекаете данные из одной таблицы, это работает быстрее, потому что запрос выполняется быстрее (менее сложный). Если вы выполняете эти запросы с использованием других инструментов SQL, он будет выполнять аналогично.

Комментарии:

1. К сожалению, это не отвечает на мой вопрос: почему Kafka connect пытается получить ВСЕ строки сразу при добавлении: s.id > -1 ПОРЯДОК ПО s.id ASC. На который, я думаю, теперь может быть ответ, поэтому я переформулирую. Хотя спасибо