Проблема с потоком данных Google Cloud и ошибкой «generic::data_loss»

# #google-cloud-dataflow #apache-beam

Вопрос:

Версия Apache Beam: 2.28.0 Python SDK

У нас есть конвейер потока данных, в котором мы пытаемся загрузить файл в качестве бокового ввода (размер файла меньше 1 МБ), используя стратегию, описанную в документе Шаблоны бокового ввода, о медленном обновлении входных данных на стороне глобального окна (https://beam.apache.org/documentation/patterns/side-inputs/).

Этот код выглядит примерно так:

       side_input = (
          pipeline
          | "Generate impulse" >> PeriodicImpulse(start_timestamp=Timestamp.now().micros/1000000, stop_timestamp=MAX_TIMESTAMP.micros/1000000, fire_interval=60*10)                    
          | "window side input" >> beam.WindowInto(window.GlobalWindows(),
                                                    trigger=Repeatedly(AfterCount(1)),
                                                    accumulation_mode=AccumulationMode.DISCARDING)          
          | "Create side input" >> beam.ParDo(CreateSideInput())

      )
 

Боковой ввод-это просто словарь, который мы хотим использовать ниже по потоку с нашими основными данными о событиях.

Наши основные данные о событиях мы читаем события pubsub, которые фактически являются событиями, указывающими на файлы, которые должны быть прочитаны из GCS. Мы просматриваем эти события с помощью 5-минутных окон "Window into data" >> beam.WindowInto(window.FixedWindows(5*60)) , а затем читаем файлы, в которых каждая строка файла становится объектом словаря (мы выполняем json.loads для каждой строки). Мы открываем окно, потому что по разным причинам нам нужно группировать их позже в процессе разработки.

После этого мы делаем следующее,

 events |
"enrich with side input data" >> beam.Map(lambda event, side_input_data: EnrichEvents(element,side_input_data), side_input_data=beam.pvalue.AsSingleton(side_input))
 

Чего бы это ни стоило,мы также попытались использовать beam.pvalue.AsDict и изменить генерацию побочных входов для выделения кортежей ключей и значений, но возникает та же проблема, о которой я расскажу ниже.

Проблема в том, что мы получаем эту действительно неясную ошибку из потока данных, которая кажется не рабочей ошибкой, а ошибкой задания. Ошибки обычно выглядят примерно так: Error message from worker: generic::data_loss: SDK claims to have processed 34289 but should have processed 33593 elements

Эти ошибки продолжают происходить иногда с одними и теми же значениями, а иногда с разными значениями. Мы понятия не имеем, как с этим бороться, и все это, похоже, вызывает проблемы со свежестью данных, которые постоянно растут. Мы хотели бы использовать побочные входы, так как это кажется подходящим шаблоном, учитывая размер имеющихся у нас данных и то, что мы хотим с ними делать. Не похоже, что данные не обрабатываются, но что-то кажется неправильным, особенно учитывая эти ошибки, которые мы наблюдаем.

Комментарии:

1. Есть замечание, связанное с самим конвейером луча, которое выделяется как причина любой потери данных. Я подозреваю, что это ошибка на стороне потока данных, и вам, вероятно, потребуется помощь в облачной поддержке. Но как бы то ни было, работники должны прерывать и повторять рабочие элементы, которые демонстрируют потерю данных, поэтому, если ваш конвейер все еще выдает правильные результаты, это, вероятно, безвредно.