#apache-kafka #apache-kafka-connect
#apache-kafka #apache-kafka-connect
Вопрос:
Я хочу использовать filepulse connector для загрузки XML-файлов в kafka.
ниже приведена моя среда:
- Win10 WSL, установлен Ubuntu
- загружена confluent platform 5.5.1 (см. «https://www.confluent.io/download /»), распакованный
- загруженный zip-файл версии 1.5.2 с github (https://github.com/streamthoughts/kafka-connect-file-pulse/releases ), разархивированный
- изменен «connect-standalone.properties», расположенный в разделе confluent path (etc/kafka/connect-standalone.properties), чтобы включить путь «/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib».
- Я не создавал никакой темы.
Я запустил zookeeper; kafka и попытался запустить kafka-connect standalone, как показано ниже:
$ zookeeper-server-start etc/kafka/zookeeper.properties
$ kafka-server-start etc/kafka/server.properties
$ connect-standalone
etc/kafka/connect-standalone.properties
/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/etc/quickstart-connect-file-pulse-csv.properties
но у меня есть сбои, см. Ниже
[2020-09-08 15:57:45,522] INFO Loading plugin from: /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-expression-1.5.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-09-08 15:57:45,541] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-expression-1.5.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
[2020-09-08 15:57:45,541] INFO Loading plugin from: /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-filters-1.5.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-09-08 15:57:45,553] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-filters-1.5.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
[2020-09-08 15:57:45,554] INFO Loading plugin from: /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-plugin-1.5.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-09-08 15:57:45,575] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:130)
java.lang.NoClassDefFoundError: io/streamthoughts/kafka/connect/filepulse/offset/OffsetManager
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.newInstance(Class.java:412)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:385)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:328)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:261)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:253)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:222)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:199)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:60)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:79)
Caused by: java.lang.ClassNotFoundException: io.streamthoughts.kafka.connect.filepulse.offset.OffsetManager
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 13 more
вопрос
Не могли бы вы помочь, в чем может быть исправление?
ps. ниже приведено то, что я пробовал для jdbc connector, просто чтобы посмотреть, работают ли другие разъемы или нет. У него не было исключений.
connect-standalone
etc/kafka/connect-standalone.properties
/home/min/confluent-5.5.1/etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties
pps. ниже content of file-pulse connector
(я не вносил изменений)
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic=connect-file-pulse-quickstart-csv
tasks.max=1
filters=ParseDelimitedRow
# Delimited Row filter
filters.ParseDelimitedRow.extractColumnName=headers
filters.ParseDelimitedRow.trimColumn=true
filters.ParseDelimitedRow.type=io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter
skip.headers=1
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader
# File scanning
fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker
fs.scan.directory.path=/tmp/kafka-connect/examples/
fs.scan.interval.ms=10000
# Internal Reporting
internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=connect-file-pulse-quickstart-csv
internal.kafka.reporter.topic=connect-file-pulse-status
# Track file by name and hash
offset.strategy=name hash
ppps. ниже приведена ключевая информация для connect-standalone.properties
файла (я добавил /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib
)
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java,/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib
# plugin.path=/usr/share/java,/home/min/confluent-5.5.1/share/java/kafka-connect-jdbc
Комментарии:
1. изначально я пытался загрузить более низкую версию filepulse (streamthoughts-kafka-connect-file-pulse-1.5.0/) отсюда confluent.io/hub/streamthoughts/kafka-connect-file-pulse . Я думал, что загрузка более высокой версии с github помогла бы.
2. для соединителя jdbc, поставляемого с платформой confluent, после успешного запуска я также проверил ниже
$ curl -s localhost:8083/connector-plugins|jq '.[].class'
и получил результаты ниже"io.confluent.connect.jdbc.JdbcSinkConnector" "io.confluent.connect.jdbc.JdbcSourceConnector" "org.apache.kafka.connect.file.FileStreamSinkConnector" "org.apache.kafka.connect.file.FileStreamSourceConnector" "org.apache.kafka.connect.mirror.MirrorCheckpointConnector" "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector" "org.apache.kafka.connect.mirror.MirrorSourceConnector"
3. Поместите
/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2
рядом с папкой jdbc и повторите попытку4. Я проверил исходный код, класс not found действительно существует в
connect-file-pulse-api
jar. Как я могу принудительно указать путь к классу?5. Есть ли особая причина не использовать установку confluent-hub вместо загрузки с Github? Кроме того, я видел, что разработчик реагирует на проблемы Github, поэтому вы можете спросить там
Ответ №1:
Я бы не назвал это идеальным ответом, но это именно то, как я заставил его работать.
Я обнаружил, что plugin.path
свойство для соединителя ‘file-pulse’ должно быть родительской папкой файлов jar.
итак, ниже сработало.
$ cat etc/kafka/connect-standalone.properties
извлечена ключевая информация:
plugin.path=/user/share/java,/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2
# plugin.path=/usr/share/java,/home/min/confluent-5.5.1/share/java/kafka-connect-jdbc
раньше это было как показано ниже, и создавало исключение, как я упоминал.
plugin.path=/user/share/java,/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib
# plugin.path=/usr/share/java,/home/min/confluent-5.5.1/share/java/kafka-connect-jdbc
возможно, я не понял приведенные ниже инструкции в connect-standalone.properties
файле свойств, или, возможно, файловые импульсные соединители не соответствовали стандартам / инструкциям, указанным здесь. конечно, путь к соединителям JDBC — это путь, содержащий файлы JAR.
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
Комментарии:
1. Автор подтвердил, что путь должен был быть установлен определенным образом. смотрите здесь: github.com/streamthoughts/kafka-connect-file-pulse/issues /…
2. указание пути до родительской папки сработало для меня, однако копирование JAR, доступных в библиотеках file pulse connector, в библиотеки Kafka также сработало.