Как использовать соединитель Debezium SQL Server со встроенным подключением ksqlDB?

#sql-server #apache-kafka #apache-kafka-connect #ksqldb #debezium

#sql-сервер #apache-kafka #apache-kafka-connect #ksqldb #debezium

Вопрос:

После дня, проведенного за экспериментами с различными параметрами конфигурации, я не смог заставить соединитель Debezium SQL Server работать со встроенным подключением ksqlDB Kafka Connect. Похоже, что на веб-сайтах ksqlDB или Debezium нет четких указаний о том, как это настроить.

Какая документация предполагает, что нужно просто установить соответствующий плагин Kafka Connect в указанное место, и все должно волшебным образом работать из коробки … но, к сожалению, мне не повезло, и сообщения об ошибках, которые я получаю, не дают мне никакой обратной связи для работы.

Мой docker-compose.yml выглядит следующим образом, и в моем файле .env у меня DEBEZIUM_VERSION= 1.3 и KSQLDB_VERSION= 0.11.0:

 version: '3.7'
services:
  # ***********************
  # * Kafka               *
  # ***********************
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    networks:
      - infrastructure
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888

  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    depends_on:
      - zookeeper
    networks:
      - infrastructure
    ports:
     - 9092:9092
     - 19092:19092
    environment:
     - BROKER_ID=1
     - ZOOKEEPER_CONNECT=zookeeper:2181
     - ALLOW_PLAINTEXT_LISTENER=yes

  schema-registry:
    image: confluentinc/cp-schema-registry
    depends_on:
      - zookeeper
      - kafka
    networks:
      - infrastructure
    ports:
     - 8181:8181
     - 8081:8081
    environment:
     - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
     - SCHEMA_REGISTRY_HOST_NAME=schema-registry
     - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
     - SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS=GET,POST,PUT,OPTIONS
     - SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN=*

  schema-registry-ui:
    image: landoop/schema-registry-ui
    depends_on:
      - schema-registry
    networks:
      - infrastructure
    ports:
     - 8000:8000
    environment:
     - SCHEMAREGISTRY_URL=http://schema-registry:8081
     - PROXY=true

  ksqldb-server:
    image: confluentinc/ksqldb-server:${KSQLDB_VERSION}
    build:
      context: ksqldb-sqlserver
      args:
        KSQLDB_VERSION: ${KSQLDB_VERSION}
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - kafka
      - schema-registry
    networks:
      - infrastructure
    ports:
      - "8088:8088"
    #volumes:
    #  - "./confluent-hub-components/:/usr/share/kafka/plugins/"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      KSQL_CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "ksql-connect-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "ksql-connect-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "ksql-connect-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/confluent-hub-components"

  # *-----------------------------*
    # To connect to the DB: 
    #   docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
    # *-----------------------------*  
  ksqldb-cli:
    image: confluentinc/ksqldb-cli:${KSQLDB_VERSION}
    container_name: ksqldb-cli
    depends_on:
      - kafka
      - ksqldb-server
    networks:
      - infrastructure
    entrypoint: /bin/sh
    tty: true

networks:
  infrastructure:
    name: infrastructure
  

Мой файл Dockerfile для службы ksqldb-sqlserver выше, который в основном просто принимает изображение по умолчанию, а затем копирует содержимое подключаемого модуля Debezium SQL Server connector в /usr/share/confluent-hub-components, выглядит следующим образом:

 ARG KSQLDB_VERSION

FROM confluentinc/ksqldb-server:${KSQLDB_VERSION}

ARG DEBEZIUM_VERSION=1.3.0.Beta2

ENV PLUGINS_DIR=/usr/share/confluent-hub-components/
ENV CONNECT_PLUGIN_PATH="/usr/share/confluent-hub-components"
USER root
RUN mkdir -p $PLUGINS_DIR amp;amp; chown -R appuser: $PLUGINS_DIR
USER appuser
RUN cd $PLUGINS_DIR 
    amp;amp; curl -sO https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/$DEBEZIUM_VERSION/debezium-connector-sqlserver-$DEBEZIUM_VERSION-plugin.tar.gz 
    amp;amp; tar -xzf debezium-connector-sqlserver-$DEBEZIUM_VERSION-plugin.tar.gz 
    amp;amp; rm debezium-connector-sqlserver-$DEBEZIUM_VERSION-plugin.tar.gz
  

И, наконец, инструкция, которую я использую для создания соединителя в ksqldb-cli, выглядит следующим образом:

 CREATE SOURCE CONNECTOR accounts_reader WITH (
    'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
    'database.hostname' = 'host.docker.internal',
    'database.port' = '1433',
    'database.user' = '******',
    'database.password' = ''******',
    'database.dbname' = ''******',
    'database.server.name' = ''******',
    'table.whitelist' = ''******',
    'database.history.kafka.bootstrap.servers' = 'kafka:9092',
    'database.history.kafka.topic' = ''******',
    'tasks.max' = '1'
);
  

CLI ksqlDB возвращает «Ошибка запроса HTTP ERROR 500 failed», и при проверке журналов Docker для ksqldb-server я вижу следующее загадочное сообщение (которое мне пришлось немного сократить из-за ограничений на количество символов в стеке):

 [2020-09-17 21:34:15,983] INFO Received: KsqlRequest{ksql='CREATE SOURCE CONNECTOR accounts_reader WITH (
    'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
    'database.hostname' = 'host.docker.internal',
    'database.port' = '1433',
    'database.user' = '******',
    'database.password' = '******',
    'database.dbname' = '******',
    'database.server.name' = '******',
    'table.whitelist' = '******',
    'database.history.kafka.bootstrap.servers' = 'kafka:9092',
    'database.history.kafka.topic' = '******',
    'tasks.max' = '1'
);', configOverrides={}, requestProperties={}, commandSequenceNumber=Optional[-1]} (io.confluent.ksql.rest.server.resources.KsqlResource:223)
[2020-09-17 21:34:16,156] WARN /connectors (org.eclipse.jetty.server.HttpChannel:600)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/debezium/DebeziumException
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:501)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/debezium/DebeziumException
    at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
    at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
    ... 30 more
...
...
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
    at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
    ... 34 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at io.debezium.connector.sqlserver.SqlServerConnector.config(SqlServerConnector.java:57)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
    at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
[2020-09-17 21:34:16,180] WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:768)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:501)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
    at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
    ... 34 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at io.debezium.connector.sqlserver.SqlServerConnector.config(SqlServerConnector.java:57)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
    at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
[2020-09-17 21:34:16,191] WARN /connectors (org.eclipse.jetty.server.HttpChannel:600)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:501)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:834)
...
...
[2020-09-17 21:34:16,192] WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:768)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:501)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
    at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
    ... 34 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at io.debezium.connector.sqlserver.SqlServerConnector.config(SqlServerConnector.java:57)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
    at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
[2020-09-17 21:34:16,194] WARN Failed to query connect cluster after 3 attempts. (io.confluent.ksql.services.DefaultConnectClient:278)
[2020-09-17 21:34:16,195] WARN Did not CREATE connector ACCOUNTS_READER: <html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
<title>Error 500 Request failed.</title>
</head>
<body><h2>HTTP ERROR 500 Request failed.</h2>
<table>
<tr><th>URI:</th><td>/connectors</td></tr>
<tr><th>STATUS:</th><td>500</td></tr>
<tr><th>MESSAGE:</th><td>Request failed.</td></tr>
<tr><th>SERVLET:</th><td>org.glassfish.jersey.servlet.ServletContainer-7c956dda</td></tr>
</table>
<hr><a href="http://eclipse.org/jetty">Powered by Jetty:// 9.4.30.v20200611</a><hr/>

</body>
</html>
 (io.confluent.ksql.services.DefaultConnectClient:115)
[2020-09-17 21:34:16,196] INFO Processed successfully: KsqlRequest{ksql='CREATE SOURCE CONNECTOR accounts_reader WITH (
    'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
    'database.hostname' = 'host.docker.internal',
    'database.port' = '1433',
    'database.user' = '******',
    'database.password' = '******',
    'database.dbname' = '******',
    'database.server.name' = '******',
    'table.whitelist' = '******',
    'database.history.kafka.bootstrap.servers' = 'kafka:9092',
    'database.history.kafka.topic' = '******',
    'tasks.max' = '1'
);', configOverrides={}, requestProperties={}, commandSequenceNumber=Optional[-1]} (io.confluent.ksql.rest.server.resources.KsqlResource:261)
  

Я не уверен на 100%, но я не думаю, что это означает, что класс SqlServerConnectorConfig отсутствует, скорее, произошла фундаментальная ошибка при инициализации (вероятно, что-то более фундаментальное, чем некоторые ошибки в конфигурации соединителя).

Может ли кто-нибудь указать, где я ошибся, или, в качестве альтернативы, простой способ настроить исходные соединители Debezium и приемные соединители JDBC в ksqlDB embedded Connect?

Ответ №1:

Вот рабочий Docker Compose для вас. Из него вы можете узнать, как создать свой собственный образ Docker, если хотите

 ---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:5.5.1
    container_name: broker
    depends_on:
      - zookeeper
    ports:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines:
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 9092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup.
    # If the latter is true, you will need to change the value 'localhost' in
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use kafka:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.1
    container_name: schema-registry
    ports:
      - "8081:8081"
    depends_on:
      - zookeeper
      - broker
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181

  ksqldb:
    # *-----------------------------*
    # To connect to ksqlDB CLI
    #   docker exec --interactive --tty ksqldb ksql http://localhost:8088
    # *-----------------------------*
    image: confluentinc/ksqldb-server:0.11.0
    container_name: ksqldb
    depends_on:
      - broker
    ports:
      - "8088:8088"
      - "8083:8083"
    user: root
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
      # Setting KSQL_KSQL_CONNECT_WORKER_CONFIG enables embedded Kafka Connect
      KSQL_KSQL_CONNECT_WORKER_CONFIG: "/etc/ksqldb/connect.properties"
      # Kafka Connect config below
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: 'ksqldb'
      KSQL_CONNECT_REST_PORT: 8083
      KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
      KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      KSQL_CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      KSQL_CONNECT_PLUGIN_PATH: '/usr/share/java'

    command:
      # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        # ------ hack to workaround absence of confluent-hub client
        # mkdir -p /usr/share/confluent-hub-components/
        # confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ debezium/debezium-connector-sqlserver:1.2.2
        curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip -o /tmp/kafka-connect-mssql.zip
        yum install -y unzip
        unzip /tmp/kafka-connect-mssql.zip -d /usr/share/java/
        # ----------------------------------------------------------
        #
        echo "Launching ksqlDB"
        /usr/bin/docker/run amp;

        sleep infinity

  

Вы можете найти полную копию Docker Compose, включая образец контейнера MS SQL, настроенного для CDC, здесь

Полный пошаговый и рабочий пример смотрите В этом блоге: https://rmoff.net/2020/09/18/using-the-debezium-ms-sql-connector-with-ksqldb-embedded-kafka-connect /

Комментарии:

1. Спасибо за это, очень признателен — и очень полезная статья тоже! В конечном итоге я надеялся использовать кластер серверов ksqlDB со встроенным подключением, чтобы полностью заменить автономные экземпляры Connect, в основном для уменьшения количества движущихся частей. Есть ли большие накладные расходы для этого (поскольку объем вычислений, которые мы будем выполнять изначально, будет довольно низким)?

2. Просто чтобы уточнить, под «накладными расходами» я подразумеваю значительное дополнительное использование оперативной памяти / ресурсов (даже когда расширенные функции ksqlDB, такие как объединения и агрегирование, используются не часто) или любые недостатки, которые сделали бы предпочтительным поддержание отдельного кластера Connect.

3. Встроенное подключение имеет смысл, пока (или если) вы не поймете, что вам нужно масштабировать свой Connect или ksqlDB и не сможете сделать это независимо. Для более подробного обсуждения этого, возможно, перейдите к cnfl.io/slack и присоединяйтесь к каналу #ksqldb.

4. Спасибо — подойдет!