Пользовательская временная метка и управление окнами для Pub / Sub в потоке данных (Apache Beam)

#java #python #google-cloud-dataflow #apache-beam #spotify-scio

#java #python #google-облако-поток данных #apache-beam #spotify-scio

Вопрос:

Я хочу реализовать следующий сценарий, используя потоковый конвейер в Apache Beam (и запустив его в потоке данных Google):

  1. Чтение сообщений из Pub / Sub (строки JSON)
  2. Десериализация JSON
  3. Используйте пользовательское поле (скажем timeStamp ) в качестве значения временной метки для обрабатывающего элемента
  4. Применить фиксированное отображение 60 seconds
  5. Извлеките ключ из элементов и сгруппируйте по ключу
  6. << выполнить дальнейшую обработку >>

Я пытался решить эту проблему, используя как Java (Scala), так и Python, но ни одно из решений не сработало.

  1. Решение на Python
 # p is beam.Pipeline()
_ = (p | beam.io.ReadFromPubSub(subscription="my_sub")
        | beam.Map(add_timestamping)
        | beam.WindowInto(window.FixedWindows(60))
        | beam.Map(lambda elem: elem) # exracting the key somehow, not relevant here
        | beam.GroupByKey()
        # (...)
        | beam.io.WriteToPubSub("output_topic")
        )
p.run()
  

add_timestamping функционирует согласно документации:

 def add_timestamping(elem):
    import json
    import apache_beam as beam
    msg = json.loads(elem)
    unix_timestamp = msg['timeStamp'] / 1000
    return beam.window.TimestampedValue(msg, unix_timestamp)
  

Вывод решения на Python:

  1. При использовании DirectRunner генерируются окна, и само управление окнами более или менее подходит, в зависимости от задержки.
  2. При использовании DataFlowRunner , ВСЕ сообщения пропускаются со счетчиком, появляющимся в пользовательском интерфейсе потока данных: droppedDueToLateness.

  1. Решение Java / Scala (я использовал Scio, но это происходит и с clean Beam SDK в Java)
 sc.pubsubSubscription[String]("my_sub")
    .applyTransform(ParDo.of(new CustomTs()))
    .withFixedWindows(Duration.standardSeconds(60))
    .map(x => x) // exracting the key somehow, not relevant here
    .groupByKey
    // (...)
    .saveAsPubsub("output_topic")
  

Добавление пользовательской временной метки в соответствии с документацией:

 import io.circe.parser._
class CustomTs extends DoFn[String, String] {
  @ProcessElement
  def processElement(@Element element: String, out: DoFn.OutputReceiver[String]): Unit = {
    val json = parse(element).right.get
    val timestampMillis: Long = json.hcursor.downField("timeStamp").as[Long].getOrElse(0)
    out.outputWithTimestamp(element, new Instant(timestampMillis))
  }
}
  

Вывод решения Java / Scala:

 Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalArgumentException:
 Cannot output with timestamp 2019-03-02T00:51:39.124Z. 
 Output timestamps must be no earlier than the timestamp of the current input
 (2019-03-28T14:57:53.195Z) minus the allowed skew (0 milliseconds).
  

Я не могу использовать DoFn.getAllowedTimestampSkew здесь, поскольку он уже устарел, и я не знаю, какие диапазоны исторических данных будут отправлены.


Для моего проекта крайне важно иметь возможность обрабатывать исторические данные (эти данные будут отправлены в Pub / Sub из какого-либо хранилища). Конвейер должен работать как с текущими данными, так и с историческими.

Мой вопрос таков: как обрабатывать данные с использованием пользовательских временных меток с возможностью работы в Windows, определенных с использованием Beam API?

Ответ №1:

Если у вас есть возможность извлечь временную метку в точке вставки в PubSub, вы сможете использовать указанные пользователем временные метки в качестве метаданных. Информация о том, как это сделать, задокументирована в SDK 1.9.

https://cloud.google.com/dataflow/model/pubsub-io#timestamps-and-record-ids

«Вы можете использовать указанные пользователем временные метки для точного контроля над тем, как элементы, считываемые из Cloud Pub / Sub, назначаются Windows в конвейере потока данных «.

Поскольку 1.9 устарел, в версии 2.11 вам понадобится https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.Строка-

Комментарии:

1. Это устарело

2. beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io / … я думаю, должна быть версия 2.11 той же версии, что была доступна для 1.9.

3. Используя этот метод, вы также установили допустимую длительность для своего набора данных?

4. Я хочу извлечь временную метку из полезной нагрузки сообщения Pub / Sub, а не из атрибута.

5. Я пытался использовать «timestampAttribute» вместе с «withAllowedLateness», но это не сработало (по крайней мере, для прямого запуска)