#java #python #google-cloud-dataflow #apache-beam #spotify-scio
#java #python #google-облако-поток данных #apache-beam #spotify-scio
Вопрос:
Я хочу реализовать следующий сценарий, используя потоковый конвейер в Apache Beam (и запустив его в потоке данных Google):
- Чтение сообщений из Pub / Sub (строки JSON)
- Десериализация JSON
- Используйте пользовательское поле (скажем
timeStamp
) в качестве значения временной метки для обрабатывающего элемента - Применить фиксированное отображение
60 seconds
- Извлеките ключ из элементов и сгруппируйте по ключу
- << выполнить дальнейшую обработку >>
Я пытался решить эту проблему, используя как Java (Scala), так и Python, но ни одно из решений не сработало.
- Решение на Python
# p is beam.Pipeline()
_ = (p | beam.io.ReadFromPubSub(subscription="my_sub")
| beam.Map(add_timestamping)
| beam.WindowInto(window.FixedWindows(60))
| beam.Map(lambda elem: elem) # exracting the key somehow, not relevant here
| beam.GroupByKey()
# (...)
| beam.io.WriteToPubSub("output_topic")
)
p.run()
add_timestamping
функционирует согласно документации:
def add_timestamping(elem):
import json
import apache_beam as beam
msg = json.loads(elem)
unix_timestamp = msg['timeStamp'] / 1000
return beam.window.TimestampedValue(msg, unix_timestamp)
Вывод решения на Python:
- При использовании
DirectRunner
генерируются окна, и само управление окнами более или менее подходит, в зависимости от задержки. - При использовании
DataFlowRunner
, ВСЕ сообщения пропускаются со счетчиком, появляющимся в пользовательском интерфейсе потока данных: droppedDueToLateness.
- Решение Java / Scala (я использовал Scio, но это происходит и с clean Beam SDK в Java)
sc.pubsubSubscription[String]("my_sub")
.applyTransform(ParDo.of(new CustomTs()))
.withFixedWindows(Duration.standardSeconds(60))
.map(x => x) // exracting the key somehow, not relevant here
.groupByKey
// (...)
.saveAsPubsub("output_topic")
Добавление пользовательской временной метки в соответствии с документацией:
import io.circe.parser._
class CustomTs extends DoFn[String, String] {
@ProcessElement
def processElement(@Element element: String, out: DoFn.OutputReceiver[String]): Unit = {
val json = parse(element).right.get
val timestampMillis: Long = json.hcursor.downField("timeStamp").as[Long].getOrElse(0)
out.outputWithTimestamp(element, new Instant(timestampMillis))
}
}
Вывод решения Java / Scala:
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalArgumentException:
Cannot output with timestamp 2019-03-02T00:51:39.124Z.
Output timestamps must be no earlier than the timestamp of the current input
(2019-03-28T14:57:53.195Z) minus the allowed skew (0 milliseconds).
Я не могу использовать DoFn.getAllowedTimestampSkew
здесь, поскольку он уже устарел, и я не знаю, какие диапазоны исторических данных будут отправлены.
Для моего проекта крайне важно иметь возможность обрабатывать исторические данные (эти данные будут отправлены в Pub / Sub из какого-либо хранилища). Конвейер должен работать как с текущими данными, так и с историческими.
Мой вопрос таков: как обрабатывать данные с использованием пользовательских временных меток с возможностью работы в Windows, определенных с использованием Beam API?
Ответ №1:
Если у вас есть возможность извлечь временную метку в точке вставки в PubSub, вы сможете использовать указанные пользователем временные метки в качестве метаданных. Информация о том, как это сделать, задокументирована в SDK 1.9.
https://cloud.google.com/dataflow/model/pubsub-io#timestamps-and-record-ids
«Вы можете использовать указанные пользователем временные метки для точного контроля над тем, как элементы, считываемые из Cloud Pub / Sub, назначаются Windows в конвейере потока данных «.
Поскольку 1.9 устарел, в версии 2.11 вам понадобится https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.Строка-
Комментарии:
1. Это устарело
2. beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io / … я думаю, должна быть версия 2.11 той же версии, что была доступна для 1.9.
3. Используя этот метод, вы также установили допустимую длительность для своего набора данных?
4. Я хочу извлечь временную метку из полезной нагрузки сообщения Pub / Sub, а не из атрибута.
5. Я пытался использовать «timestampAttribute» вместе с «withAllowedLateness», но это не сработало (по крайней мере, для прямого запуска)