Как отслеживать строки (по идентификатору) с определенным значением столбца с помощью Kafka JDBC Connector?

#apache-kafka #apache-kafka-connect

#apache-kafka #apache-kafka-connect

Вопрос:

У меня есть таблица, содержащая большое количество записей. Есть столбец, определяющий тип записи. Я хотел бы собирать записи с определенным значением в этом столбце. Вроде:

 Select * FROM myVeryOwnTable WHERE type = "VERY_IMPORTANT_TYPE" 
  

Что я заметил, я не могу использовать WHERE предложение в пользовательском запросе, когда я выбираю инкрементный режим ( временная метка), иначе мне нужно было бы позаботиться о фильтрации самостоятельно.
Предпосылкой того, чего я хотел бы достичь, является то, что я использую Logstash для передачи некоторого типа данных из MySQL в ES. Это легко достижимо с помощью запроса, который может содержать предложение where . Однако с Kafka я могу передавать свои данные намного быстрее (почти мгновенно) после вставки новых строк в DB.

Спасибо за любые подсказки или советы.


Благодаря @wardziniak я смог это настроить.

 query=select * from (select * from myVeryOwnTable p where type = 'VERY_IMPORTANT_TYPE') p
topic.prefix=test-mysql-jdbc-
incrementing.column.name=id
  

однако я ожидал увидеть тему test-mysql-jdbc-myVeryOwnTable , поэтому я зарегистрировал своего потребителя в ней. Однако при использовании запроса, показанного выше, имя таблицы пропущено, поэтому моя тема была названа точно так, как префикс, определенный выше. Итак, я только что обновил свои свойства topic.prefix=test-mysql-jdbc-myVeryOwnTable , и, похоже, все работает просто отлично.

Ответ №1:

Вы можете использовать подзапрос в вашем свойстве Jdbc Source Connector query .

Пример конфигурации исходного соединителя JDBC:

 {
    ...
    "query": "select * from (select * from myVeryOwnTable p where type = 'VERY_IMPORTANT_TYPE') p",
    "incrementing.column.name": "id",
    ...
}
  

Комментарии:

1. Это умно. Я попробую это! Спасибо

2. извините, это не работает. Как я вижу, он выполняется WHERE id`>? УПОРЯДОЧИВАТЬ ПО id ASC`, но следует: WHERE p.id > ? ORDER BY p.id ASC Очевидно, что это просто предположение, но я думаю, что здесь проблема.

3. @user007, ты уверен, что это не работает (записи не фильтруются при опросе из базы данных)? Есть ли какие-либо исключения или ошибки? . Я правильно выполнил тест и его фильтрацию. Я вижу что-то подобное в журналах prepared SQL query: select * from (select * from myVeryOwnTable p where type = 'VERY_IMPORTANT_TYPE') p WHERE "id" > ? ORDER BY "id"

4. да, я вижу это: [2019-04-07 21:03:31,648] INFO Cluster ID: w15JXkZYRu68ZPyJ4JFplw (org.apache.kafka.clients.Metadata:365) [2019-04-07 21:03:31,660] INFO Begin using SQL query: select * FROM (select * from mydb.mytable ev WHERE ev.type = 10) ev WHERE "id" > ? ORDER BY "id" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:140) [2019-04-07 21:04:31,549] INFO WorkerSourceTask{id=test-source-mysql-jdbc-autoincrement-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398) но это не возвращает никаких записей моему потребителю.

5. @user007, убедитесь, что в таблице есть новые записи с большим приращением значения. JDBC Connector хранит информацию о последнем увеличенном значении, поэтому для сброса вам необходимо удалить эту информацию (для автономного режима удалите/tmp /connect.offsets) или создайте соединитель с новым именем.