# #google-cloud-dataflow #apache-beam
Вопрос:
Версия Apache Beam: 2.28.0 Python SDK
У нас есть конвейер потока данных, в котором мы пытаемся загрузить файл в качестве бокового ввода (размер файла меньше 1 МБ), используя стратегию, описанную в документе Шаблоны бокового ввода, о медленном обновлении входных данных на стороне глобального окна (https://beam.apache.org/documentation/patterns/side-inputs/).
Этот код выглядит примерно так:
side_input = (
pipeline
| "Generate impulse" >> PeriodicImpulse(start_timestamp=Timestamp.now().micros/1000000, stop_timestamp=MAX_TIMESTAMP.micros/1000000, fire_interval=60*10)
| "window side input" >> beam.WindowInto(window.GlobalWindows(),
trigger=Repeatedly(AfterCount(1)),
accumulation_mode=AccumulationMode.DISCARDING)
| "Create side input" >> beam.ParDo(CreateSideInput())
)
Боковой ввод-это просто словарь, который мы хотим использовать ниже по потоку с нашими основными данными о событиях.
Наши основные данные о событиях мы читаем события pubsub, которые фактически являются событиями, указывающими на файлы, которые должны быть прочитаны из GCS. Мы просматриваем эти события с помощью 5-минутных окон "Window into data" >> beam.WindowInto(window.FixedWindows(5*60))
, а затем читаем файлы, в которых каждая строка файла становится объектом словаря (мы выполняем json.loads для каждой строки). Мы открываем окно, потому что по разным причинам нам нужно группировать их позже в процессе разработки.
После этого мы делаем следующее,
events |
"enrich with side input data" >> beam.Map(lambda event, side_input_data: EnrichEvents(element,side_input_data), side_input_data=beam.pvalue.AsSingleton(side_input))
Чего бы это ни стоило,мы также попытались использовать beam.pvalue.AsDict
и изменить генерацию побочных входов для выделения кортежей ключей и значений, но возникает та же проблема, о которой я расскажу ниже.
Проблема в том, что мы получаем эту действительно неясную ошибку из потока данных, которая кажется не рабочей ошибкой, а ошибкой задания. Ошибки обычно выглядят примерно так: Error message from worker: generic::data_loss: SDK claims to have processed 34289 but should have processed 33593 elements
Эти ошибки продолжают происходить иногда с одними и теми же значениями, а иногда с разными значениями. Мы понятия не имеем, как с этим бороться, и все это, похоже, вызывает проблемы со свежестью данных, которые постоянно растут. Мы хотели бы использовать побочные входы, так как это кажется подходящим шаблоном, учитывая размер имеющихся у нас данных и то, что мы хотим с ними делать. Не похоже, что данные не обрабатываются, но что-то кажется неправильным, особенно учитывая эти ошибки, которые мы наблюдаем.
Комментарии:
1. Есть замечание, связанное с самим конвейером луча, которое выделяется как причина любой потери данных. Я подозреваю, что это ошибка на стороне потока данных, и вам, вероятно, потребуется помощь в облачной поддержке. Но как бы то ни было, работники должны прерывать и повторять рабочие элементы, которые демонстрируют потерю данных, поэтому, если ваш конвейер все еще выдает правильные результаты, это, вероятно, безвредно.