Приемник Kafka Connect S3 — как использовать временную метку из самого сообщения [извлечение метки времени]

#amazon-s3 #apache-kafka #apache-kafka-connect #confluent-platform

#amazon-s3 #apache-kafka #apache-kafka-подключиться #confluent-платформа #apache-kafka-connect

Вопрос:

Я боролся с проблемой, используя kafka connect и приемник S3.

Сначала структура:

 {
   Partition: number
   Offset: number
   Key: string
   Message: json string
   Timestamp: timestamp
}
  

Обычно при публикации в Kafka временная метка должна быть установлена производителем. К сожалению, похоже, что были случаи, когда этого не происходило. Это означает, что временная метка иногда может быть null

Для извлечения этой временной метки соединителю было присвоено следующее значение: "timestamp.extractor":"Record" .

Теперь всегда можно быть уверенным, что Message само поле также всегда содержит временную метку.

Message :

 {
   timestamp: "2019-04-02T06:27:02.667Z"
   metadata: {
     creationTimestamp: "1554186422667"
   }
}
  

Вопрос, однако, в том, что теперь я хотел бы использовать это поле для timestamp.extractor

Я думал, что этого будет достаточно, но, похоже, это не работает:

 "timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",
  

Это также приводит к указателю NULL.

Есть идеи относительно того, как использовать временную метку из самой полезной нагрузки сообщения kafka вместо поля временной метки по умолчанию, которое установлено для kafka версии 0.10

РЕДАКТИРОВАТЬ: Полная конфигурация:

 { "name": "<name>",
  "config": {
    "connector.class":"io.confluent.connect.s3.S3SinkConnector",
    "tasks.max":"4",
    "topics":"<topic>",
    "flush.size":"100",
    "s3.bucket.name":"<bucket name>",
    "s3.region": "<region>",
    "s3.part.size":"<partition size>",
    "rotate.schedule.interval.ms":"86400000",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",
    "format.class":"io.confluent.connect.s3.format.json.JsonFormat",
    "locale":"ENGLISH",
    "timezone":"UTC",
    "schema.generator.class":"io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partition.duration.ms": "3600000",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
    "timestamp.extractor":"RecordField",
    "timestamp.field":"message.timestamp",
    "max.poll.interval.ms": "600000",
    "request.timeout.ms": "610000",
    "heartbeat.interval.ms": "6000",
    "session.timeout.ms": "20000",
    "s3.acl.canned":"bucket-owner-full-control"
  }
}
  

ПРАВКА 2:
Структура полезной нагрузки сообщения Kafka:

 {
  "reference": "",
  "clientId": "",
  "gid": "",
  "timestamp": "2019-03-19T15:27:55.526Z",
}
  

ПРАВКА 3:

 {
"transforms": "convert_op_creationDateTime",
"transforms.convert_op_creationDateTime.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_op_creationDateTime.target.type": "Timestamp",
"transforms.convert_op_creationDateTime.field": "timestamp",
"transforms.convert_op_creationDateTime.format": "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"
}
  

Итак, я попытался выполнить преобразование объекта, но, похоже, я снова застрял на этом. Шаблон кажется недопустимым. Просматривая Интернет, кажется, что это допустимый SimpleDatePattern. Кажется, он жалуется на 'T' . Также обновлена схема сообщения.

Комментарии:

1. Для уточнения: вы используете Kafka Connect в качестве приемника? И использовать преобразование одного сообщения для извлечения временной метки сообщения Kafka в поле сообщения, которое вы хотите записать в приемник? Можете ли вы поделиться своей полной конфигурацией Kafka Connect?

2. Обновлено сообщение!

3. Хорошо, теперь это приобретает больше смысла 🙂 Не могли бы вы поделиться схемой вашего сообщения, пожалуйста?

4. Вы имеете в виду это свойство? Message: json string ?

5. Не могли бы вы показать полную трассировку стека?

Ответ №1:

На основе схемы, которой вы поделились, вы должны настроить:

     "timestamp.extractor":"RecordField",
    "timestamp.field":"timestamp",
  

т.е. нет message префикса к имени поля временной метки.

Комментарии:

1. Похоже, что это работает не полностью. В нем все еще отображается указатель null. Несмотря на то, что в сообщении действительно доступна временная метка (согласно kafka tools)

2. Может ли это быть связано с тем, что это UTC? Нужно ли это как-то преобразовывать?

Ответ №2:

Если данные представляют собой строку, то Connect попытается проанализировать их за миллисекундыисходный код здесь.

В любом случае, message.timestamp предполагается, что данные выглядят как { "message" : { "timestamp": ... } } , так что просто timestamp было бы правильно. И в любом случае наличие вложенных полей раньше было невозможно, поэтому вы можете уточнить, какая версия Connect у вас есть.

Я не совсем уверен, как бы вы получили instanceof Date значение true при использовании JSON Converter, и даже если бы вы установили schema.enable = true , то также в коде вы можете видеть, что есть только условия для типов схем чисел и строк, но все равно предполагается, что это миллисекунды.

Вы можете попробовать использовать преобразование TimestampConverter для преобразования вашей строки даты.

Комментарии:

1. Кажется, что я натыкаюсь на стену здесь. Я обновил вопрос дополнительной информацией. Я заметил, что у меня также был доступ к обычной временной метке, которая, afaik, идеально подошла бы для времени в формате «строка». Поможет ли это с использованием ExtractField или чего-то связанного для этого?

2. Не уверен, зачем вам нужно что-либо извлекать. Я сам не использовал TimestampConverter, потому что данные, с которыми я работаю, почти всегда соответствуют времени эпохи unix в миллисекундах. Модульные тесты для этого находятся здесь, хотя, если вы хотите взглянуть github.com/apache/kafka/blob/trunk/connect/transforms/src/test / …