#amazon-s3 #apache-kafka #apache-kafka-connect #confluent-platform
#amazon-s3 #apache-kafka #apache-kafka-подключиться #confluent-платформа #apache-kafka-connect
Вопрос:
Я боролся с проблемой, используя kafka connect и приемник S3.
Сначала структура:
{
Partition: number
Offset: number
Key: string
Message: json string
Timestamp: timestamp
}
Обычно при публикации в Kafka временная метка должна быть установлена производителем. К сожалению, похоже, что были случаи, когда этого не происходило. Это означает, что временная метка иногда может быть null
Для извлечения этой временной метки соединителю было присвоено следующее значение: "timestamp.extractor":"Record"
.
Теперь всегда можно быть уверенным, что Message
само поле также всегда содержит временную метку.
Message
:
{
timestamp: "2019-04-02T06:27:02.667Z"
metadata: {
creationTimestamp: "1554186422667"
}
}
Вопрос, однако, в том, что теперь я хотел бы использовать это поле для timestamp.extractor
Я думал, что этого будет достаточно, но, похоже, это не работает:
"timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",
Это также приводит к указателю NULL.
Есть идеи относительно того, как использовать временную метку из самой полезной нагрузки сообщения kafka вместо поля временной метки по умолчанию, которое установлено для kafka версии 0.10
РЕДАКТИРОВАТЬ: Полная конфигурация:
{ "name": "<name>",
"config": {
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"4",
"topics":"<topic>",
"flush.size":"100",
"s3.bucket.name":"<bucket name>",
"s3.region": "<region>",
"s3.part.size":"<partition size>",
"rotate.schedule.interval.ms":"86400000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"locale":"ENGLISH",
"timezone":"UTC",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",
"max.poll.interval.ms": "600000",
"request.timeout.ms": "610000",
"heartbeat.interval.ms": "6000",
"session.timeout.ms": "20000",
"s3.acl.canned":"bucket-owner-full-control"
}
}
ПРАВКА 2:
Структура полезной нагрузки сообщения Kafka:
{
"reference": "",
"clientId": "",
"gid": "",
"timestamp": "2019-03-19T15:27:55.526Z",
}
ПРАВКА 3:
{
"transforms": "convert_op_creationDateTime",
"transforms.convert_op_creationDateTime.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_op_creationDateTime.target.type": "Timestamp",
"transforms.convert_op_creationDateTime.field": "timestamp",
"transforms.convert_op_creationDateTime.format": "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"
}
Итак, я попытался выполнить преобразование объекта, но, похоже, я снова застрял на этом. Шаблон кажется недопустимым. Просматривая Интернет, кажется, что это допустимый SimpleDatePattern. Кажется, он жалуется на 'T'
. Также обновлена схема сообщения.
Комментарии:
1. Для уточнения: вы используете Kafka Connect в качестве приемника? И использовать преобразование одного сообщения для извлечения временной метки сообщения Kafka в поле сообщения, которое вы хотите записать в приемник? Можете ли вы поделиться своей полной конфигурацией Kafka Connect?
2. Обновлено сообщение!
3. Хорошо, теперь это приобретает больше смысла 🙂 Не могли бы вы поделиться схемой вашего сообщения, пожалуйста?
4. Вы имеете в виду это свойство?
Message: json string
?5. Не могли бы вы показать полную трассировку стека?
Ответ №1:
На основе схемы, которой вы поделились, вы должны настроить:
"timestamp.extractor":"RecordField",
"timestamp.field":"timestamp",
т.е. нет message
префикса к имени поля временной метки.
Комментарии:
1. Похоже, что это работает не полностью. В нем все еще отображается указатель null. Несмотря на то, что в сообщении действительно доступна временная метка (согласно kafka tools)
2. Может ли это быть связано с тем, что это UTC? Нужно ли это как-то преобразовывать?
Ответ №2:
Если данные представляют собой строку, то Connect попытается проанализировать их за миллисекунды — исходный код здесь.
В любом случае, message.timestamp
предполагается, что данные выглядят как { "message" : { "timestamp": ... } }
, так что просто timestamp
было бы правильно. И в любом случае наличие вложенных полей раньше было невозможно, поэтому вы можете уточнить, какая версия Connect у вас есть.
Я не совсем уверен, как бы вы получили instanceof Date
значение true при использовании JSON Converter, и даже если бы вы установили schema.enable = true
, то также в коде вы можете видеть, что есть только условия для типов схем чисел и строк, но все равно предполагается, что это миллисекунды.
Вы можете попробовать использовать преобразование TimestampConverter для преобразования вашей строки даты.
Комментарии:
1. Кажется, что я натыкаюсь на стену здесь. Я обновил вопрос дополнительной информацией. Я заметил, что у меня также был доступ к обычной временной метке, которая, afaik, идеально подошла бы для времени в формате «строка». Поможет ли это с использованием ExtractField или чего-то связанного для этого?
2. Не уверен, зачем вам нужно что-либо извлекать. Я сам не использовал TimestampConverter, потому что данные, с которыми я работаю, почти всегда соответствуют времени эпохи unix в миллисекундах. Модульные тесты для этого находятся здесь, хотя, если вы хотите взглянуть github.com/apache/kafka/blob/trunk/connect/transforms/src/test / …