#json #pyspark
#json #pyspark
Вопрос:
У меня есть фрейм данных из 10 000 записей, которые я хочу преобразовать в формат JSON и отправить обратно в веб-сервис. Но df.toJSON().collect() занимает много времени [~ 10 секунд]. Кто-нибудь может подсказать, есть ли способ сократить это время
df.toJSON().collect()
Комментарии:
1. Можете ли вы привести пример JSON и вашего кода?
2. @Kafels Это может быть так: [{‘item_code: 1234’, ‘кол-во: 2’}, {‘item_code: 2345, ‘кол-во: 3’}, …..]
3. Как вы перенаправляете json в службу? Можете ли вы сделать это распределенным способом вместо сбора в драйвер?
4. Распределенным способом означает использование каких-либо API-интерфейсов Spark?
Ответ №1:
это может быть несколько разных вещей …
- сериализация json может занять некоторое время, особенно если вы используете API R или Python, потому что это отдельный процесс, который должен перемещаться между собственными исполнителями JVM на рабочих узлах для сериализации / десериализации объектов
- если вы выполнили какие-либо «широкие преобразования», такие как агрегирование или объединение, до df.collect(), вы, скорее всего, вызвали перетасовку, которая приведет к записи на диск 200 разделов по умолчанию, поэтому при вызове collect он должен извлекать эти данные с диска, что медленнее, чем извлечение из ОЗУ
- хотя ваш набор данных невелик, вам может потребоваться увеличить ОЗУ исполнителя по умолчанию, ядра исполнителя (слоты), количество исполнителей и перенастроить количество разделов, чтобы получить больше параллелизма
проверьте количество разделов
df.rdd.getNumPartitions()
проверка разделов в случайном порядке
spark.conf.get("spark.sql.shuffle.partitions")
проверьте другие конфигурации, такие как ram, ядра и экземпляры исполнителя
spark.sparkContext.getConf().getAll()
искра — трудный зверь , с которым трудно справиться … лучше всего посетить их официальную документацию, чтобы узнать больше! https://spark.apache.org/
Комментарии:
1. Спасибо. Я проверю и обновлю, если найду какие-либо улучшения.
2. Еще один вопрос, который у меня возник: если та же логика реализована в Java или Scala, улучшит ли это производительность?
3. я думаю, это зависит от логики… обычно scala всегда быстрее, потому что это родной язык spark … кстати, если я не неправильно истолковал ваш вопрос, примерно 10 секунд не замедляются при сохранении или возврате к драйверу небольшого набора данных, такого как 10 тыс… выполняете ли вы какие-либо агрегации / объединения ранее? кроме того, collect() не рекомендуется использовать для больших наборов данных, поскольку он возвращает весь объект обратно в драйвер и может привести к его сбою
4. Да, я выполняю агрегацию и объединение перед сбором данных JSON. Хорошо, итак, для больших наборов данных, если я хочу иметь текст в формате JSON из 10 тыс. строк, вы предлагаете какой-либо альтернативный подход, кроме его сбора? Пожалуйста, сообщите.
5. зачем вы собираете данные? вы можете записать df в файлы json… ex => df.write.json(‘путь’)