Соединитель приемника Kafka JDBC — инструкция USE не поддерживается для переключения между базами данных

#sql-server #jdbc #apache-kafka

#sql-server #jdbc #apache-kafka

Вопрос:

Я использую соединитель приемника Kafka JDBC для передачи данных на Azure SQL server. Я протестировал соединитель с одной базой данных, и он работал нормально, но когда я добавил больше баз данных, я начал видеть следующую ошибку:

Инструкция USE не поддерживается для переключения между базами данных. Используйте новое соединение для подключения к другой базе данных.

Конфигурация:

     tasks.max: 1
    topics: topic_name
    connection.url: jdbc:sqlserver://server:port;database=dbname;user=dbuser
    connection.user: dbuser
    connection.password: dbpass
    transforms: unwrap
    transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
    transforms.unwrap.drop.tombstones: false
    auto.create: true
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: true
    insert.mode: upsert
    delete.enabled: true
    pk.mode: record_key
 

Стек:

 2020-12-10 11:56:36,990 ERROR WorkerSinkTask{id=NAME-sqlserver-jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: USE statement is not supported to switch between databases. Use a new connection to connect to a different database.
 (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-NAME-sqlserver-jdbc-sink-0]
org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: USE statement is not supported to switch between databases. Use a new connection to connect to a different database.

        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: USE statement is not supported to switch between databases. Use a new connection to connect to a different database.

 

Комментарии:

1. Непонятно, что вы подразумеваете под «добавлением дополнительных баз данных». Вы сделали это в конфигурации соединителя?

2. Спасибо, @OneCricketeer Вначале я подумал, что проблема связана с наличием нескольких баз данных на сервере БД, но оказалось, что в названии темы есть префикс.dbo.имя_таблицы. вместо просто имя_таблицы. Я опубликовал подробности в своем ответе.

3. Я собирался сказать — это стандартный вариант использования — иметь несколько схем на сервере БД, так что это не будет проблемой

Ответ №1:

Я определил проблему, вначале я думал, что проблема связана с наличием нескольких баз данных внутри сервера БД, но оказалось, что prefix.dbo.table_name в нем есть название темы. вместо того, чтобы просто table_name . Следовательно, соединитель определяется prefix.dbo как другая база данных.

Решение состоит в том, чтобы использовать transform dropPrefix.

Например, для сохранения данных из раздела hello.dbo.table1 hello.dbo.table2 в базу данных table1 и table2 в базу данных используйте следующую конфигурацию:

     tasks.max: 1
    topics: hello.dbo.table1, hello.dbo.table2
    connection.url: jdbc:sqlserver://server:port;database=dbname;user=dbuser
    connection.user: dbuser
    connection.password: dbpass
    transforms: dropPrefix,unwrap
    transforms.dropPrefix.type: org.apache.kafka.connect.transforms.RegexRouter
    transforms.dropPrefix.regex: hello.dbo.(.*)
    transforms.dropPrefix.replacement: $1
    transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
    transforms.unwrap.drop.tombstones: false
    auto.create: true
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: true
    insert.mode: upsert
    delete.enabled: true
    pk.mode: record_key
 

Ответ №2:

Если он не работает как единый соединитель, вам нужно будет создать один соединитель для каждой базы данных.

Комментарии:

1. Это один соединитель для каждой базы данных. В качестве примера можно просто перенести данные из раздела в одну базу данных.

2. это противоречит тому, что вы сказали в своем вопросе: «когда я добавил больше баз данных». Пожалуйста, уточните и, возможно, включите в свой вопрос фактическую конфигурацию Sink connector, которая выдает ошибку, которую вы видите.

3. Извините, что не совсем ясно выразился. Я имел в виду добавление дополнительных баз данных на сервер БД, а не в соединитель. Я определил проблему, она связана с именем темы prefix.dbo.tablename , которое рассматривается соединителем как другая база данных. table.name.format работает нормально, но когда у меня есть только одна тема. Здесь возникает проблема: github.com/confluentinc/kafka-connect-jdbc/issues/731 для поддержки префикса таблицы, который был бы идеальным, но, похоже, он все еще открыт.