#mysql #jdbc #apache-kafka #apache-kafka-connect
#mysql #jdbc #apache-kafka #apache-kafka-connect
Вопрос:
Я использую JDBC connector для перемещения данных из MySQL в Kafka. Интересующие меня данные поступают из выбранных объединяющих 3 таблиц, поэтому я настроил свой соединитель с помощью mode:incrementing
и query
:
{
"name": "stats",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
"connection.url": "jdbc:mysql://DB_HOST:3306/SCHEMA?user=USERamp;password=PASSWORDamp;zeroDateTimeBehavior=CONVERT_TO_NULLamp;useSSL=false",
"mode": "incrementing",
"validate.non.null": "false",
"topic.prefix": "t",
"incrementing.column.name": "s.id",
"transforms": "createKey,extractString",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "uuid",
"transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractString.field": "uuid",
"quote.sql.identifiers":"never",
"query": "select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"batch.max.rows": "100",
"poll.interval.ms": "60000"
}
}
При проверке состояния соединителя я получаю, что он запущен:
curl http://conncet:8083/connectors/stats/status
{
"name": "stats",
"connector": {
"state": "RUNNING",
"worker_id": "connect-3:38083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect-1:18083"
}
],
"type": "source"
}
Но через час я все еще не вижу созданной темы. Я проверил в MySQL, с какими запросами выполняются show full processlist;
, и я вижу два запроса, подобных этому:
select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id WHERE s.id > -1 ORDER BY s.id ASC
Таким образом, в основном запрос такой же, как запрос, который я предоставил в query
в connector configuration plus WHERE s.id > -1 ORDER BY s.id ASC
, поскольку запрос в этом соединении выдает результирующий набор из 21 миллиона строк, MySQL отправляет данные в течение длительного времени. Когда я снова проверяю с помощью show full processlist;
, я вижу теперь 4 запроса, подобных этому, а затем 8, а затем 16, и так далее.
Вопросы следующие:
- Почему Kafka connect пытается получить ВСЕ строки сразу при добавлении:
s.id > -1 ORDER BY s.id ASC
. - Возможно ли настроить соединитель так, чтобы он этого не делал, и вместо этого извлекал меньший объем?
"batch.max.rows": "100"
Контролирует размер пакета только после первоначального опроса??
Обновить:
Для этого вопроса есть открытая тема. Я думаю, что этот вопрос можно закрыть.
Ответ №1:
query.suffix был добавлен в версии 5.5. Я использовал его для добавления оператора limit, и он отлично сработал, он просто добавляет limit в конец запроса.
смотрите проблему
Ответ №2:
Соединитель источника JDBC с incrementing
mode
и передан query
, выполните этот запрос со следующим предложением where: WHERE incrementingColumnName > lastIncrementedValue ORDER BY incrementingColumnName ASC
. (если вы используете инкрементный режим и запрос, вы не можете передать where
предложение туда).
При первом опросе lastIncrementedValue
равно -1, поэтому он пытается запросить все записи. После извлечения каждой записи lastIncrementedValue увеличивается, поэтому при следующем запросе будут опрашиваться только новые данные. batch.max.rows
указывает, сколько записей SourceTask::poll(...)
вернется в Kafka Connect framework. Это максимальный размер пакета, который будет отправлен в Kafka сразу.
Я думаю, когда вы извлекаете данные из одной таблицы, это работает быстрее, потому что запрос выполняется быстрее (менее сложный). Если вы выполняете эти запросы с использованием других инструментов SQL, он будет выполнять аналогично.
Комментарии:
1. К сожалению, это не отвечает на мой вопрос: почему Kafka connect пытается получить ВСЕ строки сразу при добавлении: s.id > -1 ПОРЯДОК ПО s.id ASC. На который, я думаю, теперь может быть ответ, поэтому я переформулирую. Хотя спасибо