не удалось запустить kafka-connect file-pulse connector standalone (исключение класса не найдено в диспетчере смещений)

#apache-kafka #apache-kafka-connect

#apache-kafka #apache-kafka-connect

Вопрос:

Я хочу использовать filepulse connector для загрузки XML-файлов в kafka.

ниже приведена моя среда:

  • Win10 WSL, установлен Ubuntu
  • загружена confluent platform 5.5.1 (см. «https://www.confluent.io/download /»), распакованный
  • загруженный zip-файл версии 1.5.2 с github (https://github.com/streamthoughts/kafka-connect-file-pulse/releases ), разархивированный
  • изменен «connect-standalone.properties», расположенный в разделе confluent path (etc/kafka/connect-standalone.properties), чтобы включить путь «/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib».
  • Я не создавал никакой темы.

Я запустил zookeeper; kafka и попытался запустить kafka-connect standalone, как показано ниже:

 $ zookeeper-server-start etc/kafka/zookeeper.properties
$ kafka-server-start etc/kafka/server.properties
$ connect-standalone 
etc/kafka/connect-standalone.properties 
/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/etc/quickstart-connect-file-pulse-csv.properties
  

но у меня есть сбои, см. Ниже

 [2020-09-08 15:57:45,522] INFO Loading plugin from: /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-expression-1.5.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-09-08 15:57:45,541] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-expression-1.5.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
[2020-09-08 15:57:45,541] INFO Loading plugin from: /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-filters-1.5.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-09-08 15:57:45,553] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-filters-1.5.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
[2020-09-08 15:57:45,554] INFO Loading plugin from: /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-plugin-1.5.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-09-08 15:57:45,575] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:130)
java.lang.NoClassDefFoundError: io/streamthoughts/kafka/connect/filepulse/offset/OffsetManager
    at java.lang.Class.getDeclaredConstructors0(Native Method)
    at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
    at java.lang.Class.getConstructor0(Class.java:3075)
    at java.lang.Class.newInstance(Class.java:412)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:385)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:328)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:261)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:253)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:222)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:199)
    at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:60)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:79)
Caused by: java.lang.ClassNotFoundException: io.streamthoughts.kafka.connect.filepulse.offset.OffsetManager
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 13 more
  

вопрос
Не могли бы вы помочь, в чем может быть исправление?

ps. ниже приведено то, что я пробовал для jdbc connector, просто чтобы посмотреть, работают ли другие разъемы или нет. У него не было исключений.

 connect-standalone 
etc/kafka/connect-standalone.properties 
/home/min/confluent-5.5.1/etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties
  

pps. ниже content of file-pulse connector (я не вносил изменений)

 connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic=connect-file-pulse-quickstart-csv
tasks.max=1

filters=ParseDelimitedRow

# Delimited Row filter
filters.ParseDelimitedRow.extractColumnName=headers
filters.ParseDelimitedRow.trimColumn=true
filters.ParseDelimitedRow.type=io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter
skip.headers=1
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader

# File scanning
fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker
fs.scan.directory.path=/tmp/kafka-connect/examples/
fs.scan.interval.ms=10000

# Internal Reporting
internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=connect-file-pulse-quickstart-csv
internal.kafka.reporter.topic=connect-file-pulse-status

# Track file by name and hash
offset.strategy=name hash
  

ppps. ниже приведена ключевая информация для connect-standalone.properties файла (я добавил /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib )

 
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

plugin.path=/usr/share/java,/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib
# plugin.path=/usr/share/java,/home/min/confluent-5.5.1/share/java/kafka-connect-jdbc
  

Комментарии:

1. изначально я пытался загрузить более низкую версию filepulse (streamthoughts-kafka-connect-file-pulse-1.5.0/) отсюда confluent.io/hub/streamthoughts/kafka-connect-file-pulse . Я думал, что загрузка более высокой версии с github помогла бы.

2. для соединителя jdbc, поставляемого с платформой confluent, после успешного запуска я также проверил ниже $ curl -s localhost:8083/connector-plugins|jq '.[].class' и получил результаты ниже "io.confluent.connect.jdbc.JdbcSinkConnector" "io.confluent.connect.jdbc.JdbcSourceConnector" "org.apache.kafka.connect.file.FileStreamSinkConnector" "org.apache.kafka.connect.file.FileStreamSourceConnector" "org.apache.kafka.connect.mirror.MirrorCheckpointConnector" "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector" "org.apache.kafka.connect.mirror.MirrorSourceConnector"

3. Поместите /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2 рядом с папкой jdbc и повторите попытку

4. Я проверил исходный код, класс not found действительно существует в connect-file-pulse-api jar. Как я могу принудительно указать путь к классу?

5. Есть ли особая причина не использовать установку confluent-hub вместо загрузки с Github? Кроме того, я видел, что разработчик реагирует на проблемы Github, поэтому вы можете спросить там

Ответ №1:

Я бы не назвал это идеальным ответом, но это именно то, как я заставил его работать.

Я обнаружил, что plugin.path свойство для соединителя ‘file-pulse’ должно быть родительской папкой файлов jar.

итак, ниже сработало.

 $ cat etc/kafka/connect-standalone.properties
  

извлечена ключевая информация:

 plugin.path=/user/share/java,/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2
# plugin.path=/usr/share/java,/home/min/confluent-5.5.1/share/java/kafka-connect-jdbc
  

раньше это было как показано ниже, и создавало исключение, как я упоминал.

 plugin.path=/user/share/java,/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib
# plugin.path=/usr/share/java,/home/min/confluent-5.5.1/share/java/kafka-connect-jdbc
  

возможно, я не понял приведенные ниже инструкции в connect-standalone.properties файле свойств, или, возможно, файловые импульсные соединители не соответствовали стандартам / инструкциям, указанным здесь. конечно, путь к соединителям JDBC — это путь, содержащий файлы JAR.

 # a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
  

Комментарии:

1. Автор подтвердил, что путь должен был быть установлен определенным образом. смотрите здесь: github.com/streamthoughts/kafka-connect-file-pulse/issues /…

2. указание пути до родительской папки сработало для меня, однако копирование JAR, доступных в библиотеках file pulse connector, в библиотеки Kafka также сработало.