Spark RAPIDS — Операция, не замененная версией GPU

#apache-spark #apache-spark-sql #rapids

Вопрос:

Я новичок в Rapids, и у меня проблемы с пониманием поддерживаемых операций.

У меня есть данные в следующем формате:

  ------------ ---------- 
|        kmer|source_seq|
 ------------ ---------- 
|TGTCGGTTTAA$|         4|
|ACCACCACCAC$|         8|
|GCATAATTTCC$|         1|
|CCGTCAAAGCG$|         7|
|CCGTCCCGTGG$|         6|
|GCGCTGTTATG$|         2|
|GAGCATAGGTG$|         5|
|CGGCGGATTCT$|         0|
|GGCGCGAGGGT$|         3|
|CCACCACCAC$A|         8|
|CACCACCAC$AA|         8|
|CCCAAAAAAAAA|         0|
|AAGAAAAAAAAA|         5|
|AAGAAAAAAAAA|         0|
|TGTAAAAAAAAA|         0|
|CCACAAAAAAAA|         8|
|AGACAAAAAAAA|         7|
|CCCCAAAAAAAA|         0|
|CAAGAAAAAAAA|         5|
|TAAGAAAAAAAA|         0|
 ------------ ---------- 
 

И я пытаюсь выяснить, какие «kmer»имеют какие «source_seq», используя следующий код:

 val w = Window.partitionBy("kmer")
x.withColumn("source_seqs", collect_list("source_seq").over(w))

// Result is something like this:
 ------------ ---------- -----------                                            
|        kmer|source_seq|source_seqs|
 ------------ ---------- ----------- 
|AAAACAAGACCA|         2|        [2]|
|AAAACAAGCAGC|         4|        [4]|
|AAAACCACGAGC|         3|        [3]|
|AAAACCGCCAAA|         7|        [7]|
|AAAACCGGTGTG|         1|        [1]|
|AAAACCTATATC|         5|        [5]|
|AAAACGACTTCT|         6|        [6]|
|AAAACGCGCAAG|         3|        [3]|
|AAAAGGCCTATT|         7|        [7]|
|AAAAGGCGTTCG|         3|        [3]|
|AAAAGGCTGTGA|         1|        [1]|
|AAAAGGTCTACC|         2|        [2]|
|AAAAGTCGAGCA|         7|     [7, 0]|
|AAAAGTCGAGCA|         0|     [7, 0]|
|AAAATCCGATCA|         0|        [0]|
|AAAATCGAGCGG|         0|        [0]|
|AAAATCGTTGAA|         7|        [7]|
|AAAATGGACAAG|         1|        [1]|
|AAAATTGCACCA|         3|        [3]|
|AAACACCGCCGT|         3|        [3]|
 ------------ ---------- ----------- 

 

В документации по операторам, поддерживаемым Spark Rapids, упоминается collect_list , что они поддерживаются только окном, что я и делаю в своем коде, насколько мне известно.

Однако, взглянув на план запроса, легко увидеть, что collect_list он не выполняется графическим процессором:

 scala> x.withColumn("source_seqs", collect_list("source_seq").over(w)).explain
== Physical Plan ==
Window [collect_list(source_seq#302L, 0, 0) windowspecdefinition(kmer#301, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS max_source#658], [kmer#301]
 - GpuColumnarToRow false
    - GpuSort [kmer#301 ASC NULLS FIRST], false, RequireSingleBatch, 0
       - GpuCoalesceBatches RequireSingleBatch
          - GpuShuffleCoalesce 2147483647
             - GpuColumnarExchange gpuhashpartitioning(kmer#301, 200), ENSURE_REQUIREMENTS, [id=#1496]
                - GpuFileGpuScan csv [kmer#301,source_seq#302L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/cloud-user/phase1/example/1620833755/part-00000], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<kmer:string,source_seq:bigint>

 

В отличие от аналогичного запроса с другой функцией, где мы можем видеть окно, выполняемое с помощью графического процессора:

 scala> x.withColumn("min_source", min("source_seq").over(w)).explain
== Physical Plan ==
GpuColumnarToRow false
 - GpuWindow [gpumin(source_seq#302L) gpuwindowspecdefinition(kmer#301, gpuspecifiedwindowframe(RowFrame, gpuspecialframeboundary(unboundedpreceding$()), gpuspecialframeboundary(unboundedfollowing$()))) AS max_source#648L], [kmer#301], false
    - GpuSort [kmer#301 ASC NULLS FIRST], false, RequireSingleBatch, 0
       - GpuCoalesceBatches RequireSingleBatch
          - GpuShuffleCoalesce 2147483647
             - GpuColumnarExchange gpuhashpartitioning(kmer#301, 200), ENSURE_REQUIREMENTS, [id=#1431]
                - GpuFileGpuScan csv [kmer#301,source_seq#302L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/cloud-user/phase1/example/1620833755/part-00000], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<kmer:string,source_seq:bigint>
 

Я как — то неправильно понимаю документацию по поддерживаемым операциям или я неправильно написал код? Любая помощь в этом будет признательна.

Ответ №1:

Да, как упоминал Митхун, выражение spark.rapids.sql.Список коллекций начал быть верным, начиная с версии 0.5. Однако в версии 0.4 это неверно: https://github.com/NVIDIA/spark-rapids/blob/branch-0.4/docs/configs.md

Вот план, который я протестировал на версии 0.5 :

 val w = Window.partitionBy("name")
val resultdf=dfread.withColumn("values", collect_list("value").over(w))
resultdf.explain

== Physical Plan ==
GpuColumnarToRow false
 - GpuWindow [collect_list(value#134L, 0, 0) gpuwindowspecdefinition(name#133, gpuspecifiedwindowframe(RowFrame, gpuspecialframeboundary(unboundedpreceding$()), gpuspecialframeboundary(unboundedfollowing$()))) AS values#138], [name#133], false
    - GpuCoalesceBatches RequireSingleBatch
       - GpuSort [name#133 ASC NULLS FIRST], false, com.nvidia.spark.rapids.OutOfCoreSort$@28e73bd1
          - GpuShuffleCoalesce 2147483647
             - GpuColumnarExchange gpuhashpartitioning(name#133, 200), ENSURE_REQUIREMENTS, [id=#563]
                - GpuFileGpuScan csv [name#133,value#134L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/tmp/df], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,value:bigint>
 

Комментарии:

1. Эй, спасибо за подсказку, я действительно использовал версию 0.4, я не знал, что 0.5 вышел. Спасибо!

Ответ №2:

Кенни. Могу я, пожалуйста, узнать, какую версию rapids-4-spark плагина вы используете, и версию Spark?

Начальная реализация GPU COLLECT_LIST() была отключена по умолчанию, поскольку ее поведение не соответствовало значениям Spark, т. е. нулевым значениям. (Версия GPU сохранила значения null в строках агрегированного массива, в то время как Spark удалил их.) Правка: Поведение было исправлено в версии 0.5.

Если у вас нет нулей в столбце агрегации (и вы используете rapids-4-spark 0.4), вы можете попробовать включить оператор, установив spark.rapids.sql.expression.CollectList=true .

В общем, можно изучить причину, по которой оператор не работал на графическом процессоре, установив spark.rapids.sql.explain=NOT_ON_GPU . Это должно вывести причину на консоль.

Если вы все еще испытываете трудности или неправильное поведение с rapids-4-spark плагином, пожалуйста, не стесняйтесь сообщать об ошибке на GitHub проекта. Мы были бы рады продолжить расследование.

Комментарии:

1. Эй, спасибо за помощь, это действительно была проблема с версией, обновление с 0.4 до 0.5 исправило проблему. Насколько я могу видеть, collect_set все еще не поддерживается, однако?

Ответ №3:

collect_set для агрегирования и создания окон будет поддерживаться в предстоящем выпуске 21.08 (RAPIDS Spark переходит на управление версиями календаря).