Преобразование фрейма данных в JSON занимает много времени

#json #pyspark

#json #pyspark

Вопрос:

У меня есть фрейм данных из 10 000 записей, которые я хочу преобразовать в формат JSON и отправить обратно в веб-сервис. Но df.toJSON().collect() занимает много времени [~ 10 секунд]. Кто-нибудь может подсказать, есть ли способ сократить это время

df.toJSON().collect()

Комментарии:

1. Можете ли вы привести пример JSON и вашего кода?

2. @Kafels Это может быть так: [{‘item_code: 1234’, ‘кол-во: 2’}, {‘item_code: 2345, ‘кол-во: 3’}, …..]

3. Как вы перенаправляете json в службу? Можете ли вы сделать это распределенным способом вместо сбора в драйвер?

4. Распределенным способом означает использование каких-либо API-интерфейсов Spark?

Ответ №1:

это может быть несколько разных вещей …

  1. сериализация json может занять некоторое время, особенно если вы используете API R или Python, потому что это отдельный процесс, который должен перемещаться между собственными исполнителями JVM на рабочих узлах для сериализации / десериализации объектов
  2. если вы выполнили какие-либо «широкие преобразования», такие как агрегирование или объединение, до df.collect(), вы, скорее всего, вызвали перетасовку, которая приведет к записи на диск 200 разделов по умолчанию, поэтому при вызове collect он должен извлекать эти данные с диска, что медленнее, чем извлечение из ОЗУ
  3. хотя ваш набор данных невелик, вам может потребоваться увеличить ОЗУ исполнителя по умолчанию, ядра исполнителя (слоты), количество исполнителей и перенастроить количество разделов, чтобы получить больше параллелизма

проверьте количество разделов

 df.rdd.getNumPartitions()
 

проверка разделов в случайном порядке

 spark.conf.get("spark.sql.shuffle.partitions")
 

проверьте другие конфигурации, такие как ram, ядра и экземпляры исполнителя

 spark.sparkContext.getConf().getAll()
 

искра — трудный зверь , с которым трудно справиться … лучше всего посетить их официальную документацию, чтобы узнать больше! https://spark.apache.org/

Комментарии:

1. Спасибо. Я проверю и обновлю, если найду какие-либо улучшения.

2. Еще один вопрос, который у меня возник: если та же логика реализована в Java или Scala, улучшит ли это производительность?

3. я думаю, это зависит от логики… обычно scala всегда быстрее, потому что это родной язык spark … кстати, если я не неправильно истолковал ваш вопрос, примерно 10 секунд не замедляются при сохранении или возврате к драйверу небольшого набора данных, такого как 10 тыс… выполняете ли вы какие-либо агрегации / объединения ранее? кроме того, collect() не рекомендуется использовать для больших наборов данных, поскольку он возвращает весь объект обратно в драйвер и может привести к его сбою

4. Да, я выполняю агрегацию и объединение перед сбором данных JSON. Хорошо, итак, для больших наборов данных, если я хочу иметь текст в формате JSON из 10 тыс. строк, вы предлагаете какой-либо альтернативный подход, кроме его сбора? Пожалуйста, сообщите.

5. зачем вы собираете данные? вы можете записать df в файлы json… ex => df.write.json(‘путь’)