beam.Create() со списком dicts работает чрезвычайно медленно по сравнению со списком строк

#python #apache-beam #gdal #google-dataflow #apache-beam-internals

# #python #apache-beam #gdal #google-поток данных #apache-beam-internals

Вопрос:

Я использую поток данных для обработки шейп-файла с примерно 4 миллионами объектов (всего около 2 ГБ) и загрузки геометрии в BigQuery, поэтому перед запуском моего конвейера я извлекаю объекты шейп-файла в список и инициализирую конвейер с помощью beam.Create(features) . Я могу создать начальный список функций двумя способами:

  1. Экспортируйте каждую функцию в виде строки json, которую последующие DoFn s должны будут разобрать в dict:
 features = [f.ExportToJson() for f in layer]
 
  1. Экспортируйте python dict, предварительно проанализированный из строки JSON
 features = [json.loads(f.ExportToJson()) for f in layer]
 

При использовании опции 1 beam.Create(features) требуется минута или около того, и конвейер продолжается. Использование варианта 2 beam.Create(features) занимает более 3 часов на 6-ядерном i7 и, похоже, тратит здесь много времени:

   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/typehints/trivial_inference.py", line 88, in <listcomp>
    typehints.Union[[instance_to_type(v) for k, v in o.items()]],
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/typehints/trivial_inference.py", line 88, in instance_to_type
    typehints.Union[[instance_to_type(v) for k, v in o.items()]],
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/typehints/trivial_inference.py", line 88, in <listcomp>
 

Это trivial_inference то, что замедляется beam.Create при передаче списка dicts? Могу ли я настроить beam.Create , чтобы не делать то, что он пытается там сделать, или иным образом ускорить его, чтобы список dicts не был в 100 раз медленнее по сравнению со списком строк?

Комментарии:

1. Возможно, вы сможете попробовать запустить конвейер с --no_pipeline_type_check опцией, чтобы посмотреть, отличается ли он.

Ответ №1:

очень интересный результат!

Я предполагаю, что это происходит потому, что необходимо обработать все данные, которые он получает. Create Ограниченный размер словарей может быть большим, потому что они обрабатываются как объекты Python, в то время как строки обрабатываются как строки Python.

Вы могли бы сделать:

 p
| beam.Create([f.ExportToJson() for f in layer])
| beam.Map(json.loads)
 

Чтобы избежать дополнительного травления. Это помогает?