Как правильно распараллелить агрегацию нескольких файлов JSON в PySpark

#apache-spark #pyspark

#apache-spark #pyspark

Вопрос:

У меня есть большой набор файлов json_list на S3 с некоторыми журналами, которые я хотел бы объединить (в основном просто подсчитайте количество запросов по пути, местоположению и т.д.) Я делал следующее, но, судя по журналам, я не уверен, что это действительно распараллеливается .. сначала загрузка отдельных файлов S3 один за другим занимает около 3 минут, а затем остальные все еще кажутся разделенными на 1000 выполнений.. Я думал, что Spark разбил бы это на сам подход с уменьшением карты, но, возможно, я совершенно неправильно понял, что он делает и чего не делает. Не мог бы кто-нибудь дать подсказку, пожалуйста.

 df = (
    spark.read
    .json(test_paths, schema=schema)
    .filter(col('method') == 'GET')
    .filter((col('status_code') == 200) | (col('status_code') == 206))
    .withColumn('date', from_unixtime('timestamp').cast(DateType()))
    .groupBy('path', 'client_country_code', 'date', 'file_size')
    .count()
)
  

Вот журнал драйверов для 1000 URL-адресов:

 20/11/15 19:15:23 INFO InMemoryFileIndex: Listing leaf files and directories in parallel under 1000 paths. The first several paths are: s3n://bucket../10004.json_lines.gz.
20/11/15 19:15:23 INFO SparkContext: Starting job: json at NativeMethodAccessorImpl.java:0
20/11/15 19:15:23 INFO DAGScheduler: Got job 49 (json at NativeMethodAccessorImpl.java:0) with 1000 output partitions
20/11/15 19:15:23 INFO DAGScheduler: Final stage: ResultStage 75 (json at NativeMethodAccessorImpl.java:0)
20/11/15 19:15:23 INFO DAGScheduler: Parents of final stage: List()
20/11/15 19:15:23 INFO DAGScheduler: Missing parents: List()
20/11/15 19:15:23 INFO DAGScheduler: Submitting ResultStage 75 (MapPartitionsRDD[206] at json at NativeMethodAccessorImpl.java:0), which has no missing parents
20/11/15 19:15:23 INFO MemoryStore: Block broadcast_77 stored as values in memory (estimated size 84.3 KiB, free 2.2 GiB)
20/11/15 19:15:23 INFO MemoryStore: Block broadcast_77_piece0 stored as bytes in memory (estimated size 29.9 KiB, free 2.2 GiB)
20/11/15 19:15:23 INFO BlockManagerInfo: Added broadcast_77_piece0 in memory on e05e979b7108:34999 (size: 29.9 KiB, free: 2.2 GiB)
20/11/15 19:15:23 INFO SparkContext: Created broadcast 77 from broadcast at DAGScheduler.scala:1223
20/11/15 19:15:23 INFO DAGScheduler: Submitting 1000 missing tasks from ResultStage 75 (MapPartitionsRDD[206] at json at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
20/11/15 19:15:23 INFO TaskSchedulerImpl: Adding task set 75.0 with 1000 tasks
20/11/15 19:15:23 INFO TaskSetManager: Starting task 0.0 in stage 75.0 (TID 33224, e05e979b7108, executor driver, partition 0, PROCESS_LOCAL, 7473 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 1.0 in stage 75.0 (TID 33225, e05e979b7108, executor driver, partition 1, PROCESS_LOCAL, 7473 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 2.0 in stage 75.0 (TID 33226, e05e979b7108, executor driver, partition 2, PROCESS_LOCAL, 7474 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 3.0 in stage 75.0 (TID 33227, e05e979b7108, executor driver, partition 3, PROCESS_LOCAL, 7475 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 4.0 in stage 75.0 (TID 33228, e05e979b7108, executor driver, partition 4, PROCESS_LOCAL, 7476 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 5.0 in stage 75.0 (TID 33229, e05e979b7108, executor driver, partition 5, PROCESS_LOCAL, 7477 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 6.0 in stage 75.0 (TID 33230, e05e979b7108, executor driver, partition 6, PROCESS_LOCAL, 7477 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 7.0 in stage 75.0 (TID 33231, e05e979b7108, executor driver, partition 7, PROCESS_LOCAL, 7477 bytes)
20/11/15 19:15:23 INFO Executor: Running task 0.0 in stage 75.0 (TID 33224)
20/11/15 19:15:23 INFO Executor: Running task 1.0 in stage 75.0 (TID 33225)
20/11/15 19:15:23 INFO Executor: Running task 2.0 in stage 75.0 (TID 33226)
20/11/15 19:15:23 INFO Executor: Running task 5.0 in stage 75.0 (TID 33229)
20/11/15 19:15:23 INFO Executor: Running task 3.0 in stage 75.0 (TID 33227)
20/11/15 19:15:23 INFO Executor: Running task 7.0 in stage 75.0 (TID 33231)
20/11/15 19:15:23 INFO Executor: Running task 6.0 in stage 75.0 (TID 33230)
20/11/15 19:15:23 INFO Executor: Running task 4.0 in stage 75.0 (TID 33228)
20/11/15 19:15:24 INFO Executor: Finished task 1.0 in stage 75.0 (TID 33225). 2025 bytes result sent to driver
20/11/15 19:15:24 INFO Executor: Finished task 0.0 in stage 75.0 (TID 33224). 2025 bytes result sent to driver
20/11/15 19:15:24 INFO TaskSetManager: Starting task 8.0 in stage 75.0 (TID 33232, e05e979b7108, executor driver, partition 8, PROCESS_LOCAL, 7477 bytes)
20/11/15 19:15:24 INFO TaskSetManager: Finished task 1.0 in stage 75.0 (TID 33225) in 567 ms on e05e979b7108 (executor driver) (1/1000)
20/11/15 19:15:24 INFO Executor: Running task 8.0 in stage 75.0 (TID 33232)
20/11/15 19:15:24 INFO Executor: Finished task 6.0 in stage 75.0 (TID 33230). 2033 bytes result sent to driver
20/11/15 19:15:24 INFO TaskSetManager: Starting task 9.0 in stage 75.0 (TID 33233, e05e979b7108, executor driver, partition 9, PROCESS_LOCAL, 7477 bytes)
20/11/15 19:15:24 INFO TaskSetManager: Starting task 10.0 in stage 75.0 (TID 33234, e05e979b7108, executor driver, partition 10, PROCESS_LOCAL, 7477 bytes)
20/11/15 19:15:24 INFO TaskSetManager: Finished task 0.0 in stage 75.0 (TID 33224) in 570 ms on e05e979b7108 (executor driver) (2/1000)
20/11/15 19:15:24 INFO Executor: Running task 9.0 in stage 75.0 (TID 33233)
20/11/15 19:15:24 INFO Executor: Running task 10.0 in stage 75.0 (TID 33234)
20/11/15 19:15:24 INFO TaskSetManager: Finished task 6.0 in stage 75.0 (TID 33230) in 571 ms on e05e979b7108 (executor driver) (3/1000)
....
20/11/15 19:15:43 INFO TaskSetManager: Finished task 998.0 in stage 75.0 (TID 34222) in 158 ms on e05e979b7108 (executor driver) (999/1000)
20/11/15 19:15:43 INFO Executor: Finished task 999.0 in stage 75.0 (TID 34223). 2033 bytes result sent to driver
20/11/15 19:15:43 INFO TaskSetManager: Finished task 999.0 in stage 75.0 (TID 34223) in 175 ms on e05e979b7108 (executor driver) (1000/1000)
20/11/15 19:15:43 INFO TaskSchedulerImpl: Removed TaskSet 75.0, whose tasks have all completed, from pool
20/11/15 19:15:43 INFO DAGScheduler: ResultStage 75 (json at NativeMethodAccessorImpl.java:0) finished in 19.850 s
20/11/15 19:15:43 INFO DAGScheduler: Job 49 is finished. Cancelling potential speculative or zombie tasks for this job
20/11/15 19:15:43 INFO TaskSchedulerImpl: Killing all running tasks in stage 75: Stage finished
20/11/15 19:15:43 INFO DAGScheduler: Job 49 finished: json at NativeMethodAccessorImpl.java:0, took 19.890458 s
20/11/15 19:15:43 INFO InMemoryFileIndex: It took 19936 ms to list leaf files for 1000 paths.
  

Комментарии:

1. Можете ли вы включить консоль драйвера с этапами для этого выполнения?

2. конечно, добавлено в op

3. У вас есть 1000 задач, соответствующих разделам, которые используются для обработки фрейма данных. В веб-консоли вы должны увидеть tasks x executor, там вы видите, как Spark распараллеливает

Ответ №1:

Существует много накладных расходов на настройку, особенно с большим количеством небольших файлов. JSON также является очень неэффективным форматом хранения, поскольку каждый раз потребуется читать весь файл целиком. В идеале каждый файл должен быть размером более 64 МБ, чтобы предоставить работникам spark достаточно данных для эффективной обработки.

Рассматривали ли вы возможность сделать шаг 1 вашего рабочего процесса простым чтением файлов JSON, а затем сохранить в столбчатом формате, таком как Parquet, в меньшее количество файлов.?