#python #apache-beam #gdal #google-dataflow #apache-beam-internals
# #python #apache-beam #gdal #google-поток данных #apache-beam-internals
Вопрос:
Я использую поток данных для обработки шейп-файла с примерно 4 миллионами объектов (всего около 2 ГБ) и загрузки геометрии в BigQuery, поэтому перед запуском моего конвейера я извлекаю объекты шейп-файла в список и инициализирую конвейер с помощью beam.Create(features)
. Я могу создать начальный список функций двумя способами:
- Экспортируйте каждую функцию в виде строки json, которую последующие
DoFn
s должны будут разобрать в dict:
features = [f.ExportToJson() for f in layer]
- Экспортируйте python dict, предварительно проанализированный из строки JSON
features = [json.loads(f.ExportToJson()) for f in layer]
При использовании опции 1 beam.Create(features)
требуется минута или около того, и конвейер продолжается. Использование варианта 2 beam.Create(features)
занимает более 3 часов на 6-ядерном i7 и, похоже, тратит здесь много времени:
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/typehints/trivial_inference.py", line 88, in <listcomp>
typehints.Union[[instance_to_type(v) for k, v in o.items()]],
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/typehints/trivial_inference.py", line 88, in instance_to_type
typehints.Union[[instance_to_type(v) for k, v in o.items()]],
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/typehints/trivial_inference.py", line 88, in <listcomp>
Это trivial_inference
то, что замедляется beam.Create
при передаче списка dicts? Могу ли я настроить beam.Create
, чтобы не делать то, что он пытается там сделать, или иным образом ускорить его, чтобы список dicts не был в 100 раз медленнее по сравнению со списком строк?
Комментарии:
1. Возможно, вы сможете попробовать запустить конвейер с
--no_pipeline_type_check
опцией, чтобы посмотреть, отличается ли он.
Ответ №1:
очень интересный результат!
Я предполагаю, что это происходит потому, что необходимо обработать все данные, которые он получает. Create
Ограниченный размер словарей может быть большим, потому что они обрабатываются как объекты Python, в то время как строки обрабатываются как строки Python.
Вы могли бы сделать:
p
| beam.Create([f.ExportToJson() for f in layer])
| beam.Map(json.loads)
Чтобы избежать дополнительного травления. Это помогает?