#sql-server #jdbc #apache-kafka
#sql-server #jdbc #apache-kafka
Вопрос:
Я использую соединитель приемника Kafka JDBC для передачи данных на Azure SQL server. Я протестировал соединитель с одной базой данных, и он работал нормально, но когда я добавил больше баз данных, я начал видеть следующую ошибку:
Инструкция USE не поддерживается для переключения между базами данных. Используйте новое соединение для подключения к другой базе данных.
Конфигурация:
tasks.max: 1
topics: topic_name
connection.url: jdbc:sqlserver://server:port;database=dbname;user=dbuser
connection.user: dbuser
connection.password: dbpass
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones: false
auto.create: true
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: true
insert.mode: upsert
delete.enabled: true
pk.mode: record_key
Стек:
2020-12-10 11:56:36,990 ERROR WorkerSinkTask{id=NAME-sqlserver-jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: USE statement is not supported to switch between databases. Use a new connection to connect to a different database.
(org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-NAME-sqlserver-jdbc-sink-0]
org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: USE statement is not supported to switch between databases. Use a new connection to connect to a different database.
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: USE statement is not supported to switch between databases. Use a new connection to connect to a different database.
Комментарии:
1. Непонятно, что вы подразумеваете под «добавлением дополнительных баз данных». Вы сделали это в конфигурации соединителя?
2. Спасибо, @OneCricketeer Вначале я подумал, что проблема связана с наличием нескольких баз данных на сервере БД, но оказалось, что в названии темы есть префикс.dbo.имя_таблицы. вместо просто имя_таблицы. Я опубликовал подробности в своем ответе.
3. Я собирался сказать — это стандартный вариант использования — иметь несколько схем на сервере БД, так что это не будет проблемой
Ответ №1:
Я определил проблему, вначале я думал, что проблема связана с наличием нескольких баз данных внутри сервера БД, но оказалось, что prefix.dbo.table_name
в нем есть название темы. вместо того, чтобы просто table_name
. Следовательно, соединитель определяется prefix.dbo
как другая база данных.
Решение состоит в том, чтобы использовать transform dropPrefix.
Например, для сохранения данных из раздела hello.dbo.table1
hello.dbo.table2
в базу данных table1
и table2
в базу данных используйте следующую конфигурацию:
tasks.max: 1
topics: hello.dbo.table1, hello.dbo.table2
connection.url: jdbc:sqlserver://server:port;database=dbname;user=dbuser
connection.user: dbuser
connection.password: dbpass
transforms: dropPrefix,unwrap
transforms.dropPrefix.type: org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex: hello.dbo.(.*)
transforms.dropPrefix.replacement: $1
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones: false
auto.create: true
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: true
insert.mode: upsert
delete.enabled: true
pk.mode: record_key
Ответ №2:
Если он не работает как единый соединитель, вам нужно будет создать один соединитель для каждой базы данных.
Комментарии:
1. Это один соединитель для каждой базы данных. В качестве примера можно просто перенести данные из раздела в одну базу данных.
2. это противоречит тому, что вы сказали в своем вопросе: «когда я добавил больше баз данных». Пожалуйста, уточните и, возможно, включите в свой вопрос фактическую конфигурацию Sink connector, которая выдает ошибку, которую вы видите.
3. Извините, что не совсем ясно выразился. Я имел в виду добавление дополнительных баз данных на сервер БД, а не в соединитель. Я определил проблему, она связана с именем темы
prefix.dbo.tablename
, которое рассматривается соединителем как другая база данных.table.name.format
работает нормально, но когда у меня есть только одна тема. Здесь возникает проблема: github.com/confluentinc/kafka-connect-jdbc/issues/731 для поддержки префикса таблицы, который был бы идеальным, но, похоже, он все еще открыт.