Кафка подключает приемник Jdbc с преобразователем protobuf — Как использовать нулевые значения?

#apache-kafka #protocol-buffers #apache-kafka-connect

#apache-kafka #буферы протокола #apache-kafka-connect

Вопрос:

Как говорится в названии, у меня есть определение protobuff, которое импортирует «google / protobuf / wrappers.proto».

Когда я пытаюсь подключиться к mysql через соединитель kafka jdbc, я получаю сообщение об ошибке

Неподдерживаемый тип исходных данных: STRUCT.

Но у меня нет никаких структур.

В документах confluent упоминаются параметры с нулевым значением, но я не могу понять, где мне нужно его включить.

Это мои настройки protobuff и connector

     {
      "transforms": "TimestampConverter,RenameField",
      "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
      "transforms.TimestampConverter.format": "yyyy-MM-dd'''T'''HH:mm:ss",
      "transforms.TimestampConverter.target.type": "Timestamp",
      "transforms.TimestampConverter.field": "timestampUtc,timestampLocal",

      "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.RenameField.renames": "Id:MonitoringEventId,TimestampUtc:Date,EventType:Type,DeviceType:GeneratedBy",
      "transforms.RenameField.blacklist": "Duration,TypeId,TimestampLocal",

      "value.converter.schema.registry.url": "http://localhost:8081",
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    }
 
 syntax = "proto3";

import "google/protobuf/wrappers.proto";

message MonitoringEventDAO {
    uint64 Id = 1;
    string TimestampUtc = 2;
    string TimestampLocal = 3;
    uint32 Duration = 4;

    uint32 BaseStationId = 5;
    google.protobuf.Int32Value CameraId = 6;
    uint32 AccountId = 7;
    google.protobuf.Int32Value SubjectId = 8;

    uint32 DeviceType = 9;
    uint32 TypeId = 10;
    string Message = 11;

    string Version = 12;
    uint32 EventType = 13;
    string Hash = 14;

    google.protobuf.StringValue StorageUrl = 15;
    bool PresentationUploaded = 16;
    uint32 DataType = 17;
    google.protobuf.Int32Value Frames = 18;
    google.protobuf.Int32Value Fps = 19;
    google.protobuf.Int32Value SampleRate = 20;
}
 
 CREATE TABLE `Events` (
  `MonitoringEventId` int NOT NULL AUTO_INCREMENT,
  `BaseStationId` int DEFAULT NULL,
  `CameraId` int DEFAULT NULL,
  `AccountId` int DEFAULT NULL,
  `SubjectId` int DEFAULT NULL,
  `GeneratedBy` int NOT NULL,
  `Date` datetime(6) NOT NULL,
  `Message` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `Hash` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `Version` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `Type` int NOT NULL,
  `ReviewMonitoringEventReviewId` int DEFAULT NULL,
  `BreathingDataUrl` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `VideoUrl` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `DataType` int NOT NULL,
  `StorageUrl` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `Uploaded` tinyint(1) NOT NULL,
  `PresentationStorageUrl` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `PresentationUploaded` tinyint(1) NOT NULL,
  `Frames` int DEFAULT NULL,
  `Fps` int DEFAULT NULL,
  `SampleRate` int DEFAULT NULL,
  PRIMARY KEY (`MonitoringEventId`),
  KEY `IX_Events_AccountId` (`AccountId`),
  KEY `IX_Events_BaseStationId` (`BaseStationId`),
  KEY `IX_Events_CameraId` (`CameraId`),
  KEY `IX_Events_Date` (`Date`),
  KEY `IX_Events_GeneratedBy` (`GeneratedBy`),
  KEY `IX_Events_Hash` (`Hash`),
  KEY `IX_Events_ReviewMonitoringEventReviewId` (`ReviewMonitoringEventReviewId`),
  KEY `IX_Events_SubjectId` (`SubjectId`),
  KEY `IX_Events_Type` (`Type`),
  CONSTRAINT `FK_Events_Accounts_AccountId` FOREIGN KEY (`AccountId`) REFERENCES `Accounts` (`AccountId`) ON DELETE RESTRICT,
  CONSTRAINT `FK_Events_BaseStations_BaseStationId` FOREIGN KEY (`BaseStationId`) REFERENCES `BaseStations` (`BaseStationId`) ON DELETE RESTRICT,
  CONSTRAINT `FK_Events_Cameras_CameraId` FOREIGN KEY (`CameraId`) REFERENCES `Cameras` (`CameraId`) ON DELETE RESTRICT,
  CONSTRAINT `FK_Events_MonitoringEventReview_ReviewMonitoringEventReviewId` FOREIGN KEY (`ReviewMonitoringEventReviewId`) REFERENCES `MonitoringEventReview` (`MonitoringEventReviewId`) ON DELETE RESTRICT,
  CONSTRAINT `FK_Events_Subjects_SubjectId` FOREIGN KEY (`SubjectId`) REFERENCES `Subjects` (`SubjectId`) ON DELETE RESTRICT
) ENGINE=InnoDB AUTO_INCREMENT=162 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
 
 
    org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT
    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1578)
    at io.confluent.connect.jdbc.dialect.DatabaseDialect.bindField(DatabaseDialect.java:608)
    at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:186)
    at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindNonKeyFields(PreparedStatementBinder.java:172)
    at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:103)
    at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:184)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:79)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:563)
    ... 10 more
 

Комментарии:

1. поле может быть нулевым или нет, в зависимости от схемы. можете ли вы опубликовать свою схему?

2. Моя схема базы данных? Я добавил его сейчас

3. О, я говорил о схеме в реестре схем, поскольку "value.converter.schema.registry.url": "http://localhost:8081" в вашей конфигурации есть строка. у вас должно быть что-то вроде localhost:8081/subjects/your_schema-value

4. Я добавил его, это файл protobuf

5. Не могли бы вы показать больше трассировки стека? У вас есть структуры — целое значение равно единице. Кроме того, на что настроен ваш конвертер ключей?