#apache-kafka #protocol-buffers #apache-kafka-connect
#apache-kafka #буферы протокола #apache-kafka-connect
Вопрос:
Как говорится в названии, у меня есть определение protobuff, которое импортирует «google / protobuf / wrappers.proto».
Когда я пытаюсь подключиться к mysql через соединитель kafka jdbc, я получаю сообщение об ошибке
Неподдерживаемый тип исходных данных: STRUCT.
Но у меня нет никаких структур.
В документах confluent упоминаются параметры с нулевым значением, но я не могу понять, где мне нужно его включить.
Это мои настройки protobuff и connector
{
"transforms": "TimestampConverter,RenameField",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd'''T'''HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "timestampUtc,timestampLocal",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "Id:MonitoringEventId,TimestampUtc:Date,EventType:Type,DeviceType:GeneratedBy",
"transforms.RenameField.blacklist": "Duration,TypeId,TimestampLocal",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
}
syntax = "proto3";
import "google/protobuf/wrappers.proto";
message MonitoringEventDAO {
uint64 Id = 1;
string TimestampUtc = 2;
string TimestampLocal = 3;
uint32 Duration = 4;
uint32 BaseStationId = 5;
google.protobuf.Int32Value CameraId = 6;
uint32 AccountId = 7;
google.protobuf.Int32Value SubjectId = 8;
uint32 DeviceType = 9;
uint32 TypeId = 10;
string Message = 11;
string Version = 12;
uint32 EventType = 13;
string Hash = 14;
google.protobuf.StringValue StorageUrl = 15;
bool PresentationUploaded = 16;
uint32 DataType = 17;
google.protobuf.Int32Value Frames = 18;
google.protobuf.Int32Value Fps = 19;
google.protobuf.Int32Value SampleRate = 20;
}
CREATE TABLE `Events` (
`MonitoringEventId` int NOT NULL AUTO_INCREMENT,
`BaseStationId` int DEFAULT NULL,
`CameraId` int DEFAULT NULL,
`AccountId` int DEFAULT NULL,
`SubjectId` int DEFAULT NULL,
`GeneratedBy` int NOT NULL,
`Date` datetime(6) NOT NULL,
`Message` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`Hash` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`Version` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
`Type` int NOT NULL,
`ReviewMonitoringEventReviewId` int DEFAULT NULL,
`BreathingDataUrl` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
`VideoUrl` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
`DataType` int NOT NULL,
`StorageUrl` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
`Uploaded` tinyint(1) NOT NULL,
`PresentationStorageUrl` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
`PresentationUploaded` tinyint(1) NOT NULL,
`Frames` int DEFAULT NULL,
`Fps` int DEFAULT NULL,
`SampleRate` int DEFAULT NULL,
PRIMARY KEY (`MonitoringEventId`),
KEY `IX_Events_AccountId` (`AccountId`),
KEY `IX_Events_BaseStationId` (`BaseStationId`),
KEY `IX_Events_CameraId` (`CameraId`),
KEY `IX_Events_Date` (`Date`),
KEY `IX_Events_GeneratedBy` (`GeneratedBy`),
KEY `IX_Events_Hash` (`Hash`),
KEY `IX_Events_ReviewMonitoringEventReviewId` (`ReviewMonitoringEventReviewId`),
KEY `IX_Events_SubjectId` (`SubjectId`),
KEY `IX_Events_Type` (`Type`),
CONSTRAINT `FK_Events_Accounts_AccountId` FOREIGN KEY (`AccountId`) REFERENCES `Accounts` (`AccountId`) ON DELETE RESTRICT,
CONSTRAINT `FK_Events_BaseStations_BaseStationId` FOREIGN KEY (`BaseStationId`) REFERENCES `BaseStations` (`BaseStationId`) ON DELETE RESTRICT,
CONSTRAINT `FK_Events_Cameras_CameraId` FOREIGN KEY (`CameraId`) REFERENCES `Cameras` (`CameraId`) ON DELETE RESTRICT,
CONSTRAINT `FK_Events_MonitoringEventReview_ReviewMonitoringEventReviewId` FOREIGN KEY (`ReviewMonitoringEventReviewId`) REFERENCES `MonitoringEventReview` (`MonitoringEventReviewId`) ON DELETE RESTRICT,
CONSTRAINT `FK_Events_Subjects_SubjectId` FOREIGN KEY (`SubjectId`) REFERENCES `Subjects` (`SubjectId`) ON DELETE RESTRICT
) ENGINE=InnoDB AUTO_INCREMENT=162 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1578)
at io.confluent.connect.jdbc.dialect.DatabaseDialect.bindField(DatabaseDialect.java:608)
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:186)
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindNonKeyFields(PreparedStatementBinder.java:172)
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:103)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:184)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:79)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:563)
... 10 more
Комментарии:
1. поле может быть нулевым или нет, в зависимости от схемы. можете ли вы опубликовать свою схему?
2. Моя схема базы данных? Я добавил его сейчас
3. О, я говорил о схеме в реестре схем, поскольку
"value.converter.schema.registry.url": "http://localhost:8081"
в вашей конфигурации есть строка. у вас должно быть что-то вроде localhost:8081/subjects/your_schema-value4. Я добавил его, это файл protobuf
5. Не могли бы вы показать больше трассировки стека? У вас есть структуры — целое значение равно единице. Кроме того, на что настроен ваш конвертер ключей?