#apache-spark #apache-spark-sql #rapids
Вопрос:
Я новичок в Rapids, и у меня проблемы с пониманием поддерживаемых операций.
У меня есть данные в следующем формате:
------------ ----------
| kmer|source_seq|
------------ ----------
|TGTCGGTTTAA$| 4|
|ACCACCACCAC$| 8|
|GCATAATTTCC$| 1|
|CCGTCAAAGCG$| 7|
|CCGTCCCGTGG$| 6|
|GCGCTGTTATG$| 2|
|GAGCATAGGTG$| 5|
|CGGCGGATTCT$| 0|
|GGCGCGAGGGT$| 3|
|CCACCACCAC$A| 8|
|CACCACCAC$AA| 8|
|CCCAAAAAAAAA| 0|
|AAGAAAAAAAAA| 5|
|AAGAAAAAAAAA| 0|
|TGTAAAAAAAAA| 0|
|CCACAAAAAAAA| 8|
|AGACAAAAAAAA| 7|
|CCCCAAAAAAAA| 0|
|CAAGAAAAAAAA| 5|
|TAAGAAAAAAAA| 0|
------------ ----------
И я пытаюсь выяснить, какие «kmer»имеют какие «source_seq», используя следующий код:
val w = Window.partitionBy("kmer")
x.withColumn("source_seqs", collect_list("source_seq").over(w))
// Result is something like this:
------------ ---------- -----------
| kmer|source_seq|source_seqs|
------------ ---------- -----------
|AAAACAAGACCA| 2| [2]|
|AAAACAAGCAGC| 4| [4]|
|AAAACCACGAGC| 3| [3]|
|AAAACCGCCAAA| 7| [7]|
|AAAACCGGTGTG| 1| [1]|
|AAAACCTATATC| 5| [5]|
|AAAACGACTTCT| 6| [6]|
|AAAACGCGCAAG| 3| [3]|
|AAAAGGCCTATT| 7| [7]|
|AAAAGGCGTTCG| 3| [3]|
|AAAAGGCTGTGA| 1| [1]|
|AAAAGGTCTACC| 2| [2]|
|AAAAGTCGAGCA| 7| [7, 0]|
|AAAAGTCGAGCA| 0| [7, 0]|
|AAAATCCGATCA| 0| [0]|
|AAAATCGAGCGG| 0| [0]|
|AAAATCGTTGAA| 7| [7]|
|AAAATGGACAAG| 1| [1]|
|AAAATTGCACCA| 3| [3]|
|AAACACCGCCGT| 3| [3]|
------------ ---------- -----------
В документации по операторам, поддерживаемым Spark Rapids, упоминается collect_list
, что они поддерживаются только окном, что я и делаю в своем коде, насколько мне известно.
Однако, взглянув на план запроса, легко увидеть, что collect_list
он не выполняется графическим процессором:
scala> x.withColumn("source_seqs", collect_list("source_seq").over(w)).explain
== Physical Plan ==
Window [collect_list(source_seq#302L, 0, 0) windowspecdefinition(kmer#301, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS max_source#658], [kmer#301]
- GpuColumnarToRow false
- GpuSort [kmer#301 ASC NULLS FIRST], false, RequireSingleBatch, 0
- GpuCoalesceBatches RequireSingleBatch
- GpuShuffleCoalesce 2147483647
- GpuColumnarExchange gpuhashpartitioning(kmer#301, 200), ENSURE_REQUIREMENTS, [id=#1496]
- GpuFileGpuScan csv [kmer#301,source_seq#302L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/cloud-user/phase1/example/1620833755/part-00000], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<kmer:string,source_seq:bigint>
В отличие от аналогичного запроса с другой функцией, где мы можем видеть окно, выполняемое с помощью графического процессора:
scala> x.withColumn("min_source", min("source_seq").over(w)).explain
== Physical Plan ==
GpuColumnarToRow false
- GpuWindow [gpumin(source_seq#302L) gpuwindowspecdefinition(kmer#301, gpuspecifiedwindowframe(RowFrame, gpuspecialframeboundary(unboundedpreceding$()), gpuspecialframeboundary(unboundedfollowing$()))) AS max_source#648L], [kmer#301], false
- GpuSort [kmer#301 ASC NULLS FIRST], false, RequireSingleBatch, 0
- GpuCoalesceBatches RequireSingleBatch
- GpuShuffleCoalesce 2147483647
- GpuColumnarExchange gpuhashpartitioning(kmer#301, 200), ENSURE_REQUIREMENTS, [id=#1431]
- GpuFileGpuScan csv [kmer#301,source_seq#302L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/cloud-user/phase1/example/1620833755/part-00000], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<kmer:string,source_seq:bigint>
Я как — то неправильно понимаю документацию по поддерживаемым операциям или я неправильно написал код? Любая помощь в этом будет признательна.
Ответ №1:
Да, как упоминал Митхун, выражение spark.rapids.sql.Список коллекций начал быть верным, начиная с версии 0.5. Однако в версии 0.4 это неверно: https://github.com/NVIDIA/spark-rapids/blob/branch-0.4/docs/configs.md
Вот план, который я протестировал на версии 0.5 :
val w = Window.partitionBy("name")
val resultdf=dfread.withColumn("values", collect_list("value").over(w))
resultdf.explain
== Physical Plan ==
GpuColumnarToRow false
- GpuWindow [collect_list(value#134L, 0, 0) gpuwindowspecdefinition(name#133, gpuspecifiedwindowframe(RowFrame, gpuspecialframeboundary(unboundedpreceding$()), gpuspecialframeboundary(unboundedfollowing$()))) AS values#138], [name#133], false
- GpuCoalesceBatches RequireSingleBatch
- GpuSort [name#133 ASC NULLS FIRST], false, com.nvidia.spark.rapids.OutOfCoreSort$@28e73bd1
- GpuShuffleCoalesce 2147483647
- GpuColumnarExchange gpuhashpartitioning(name#133, 200), ENSURE_REQUIREMENTS, [id=#563]
- GpuFileGpuScan csv [name#133,value#134L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/tmp/df], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,value:bigint>
Комментарии:
1. Эй, спасибо за подсказку, я действительно использовал версию 0.4, я не знал, что 0.5 вышел. Спасибо!
Ответ №2:
Кенни. Могу я, пожалуйста, узнать, какую версию rapids-4-spark
плагина вы используете, и версию Spark?
Начальная реализация GPU COLLECT_LIST()
была отключена по умолчанию, поскольку ее поведение не соответствовало значениям Spark, т. е. нулевым значениям. (Версия GPU сохранила значения null в строках агрегированного массива, в то время как Spark удалил их.) Правка: Поведение было исправлено в версии 0.5.
Если у вас нет нулей в столбце агрегации (и вы используете rapids-4-spark
0.4), вы можете попробовать включить оператор, установив spark.rapids.sql.expression.CollectList=true
.
В общем, можно изучить причину, по которой оператор не работал на графическом процессоре, установив spark.rapids.sql.explain=NOT_ON_GPU
. Это должно вывести причину на консоль.
Если вы все еще испытываете трудности или неправильное поведение с rapids-4-spark
плагином, пожалуйста, не стесняйтесь сообщать об ошибке на GitHub проекта. Мы были бы рады продолжить расследование.
Комментарии:
1. Эй, спасибо за помощь, это действительно была проблема с версией, обновление с 0.4 до 0.5 исправило проблему. Насколько я могу видеть,
collect_set
все еще не поддерживается, однако?
Ответ №3:
collect_set для агрегирования и создания окон будет поддерживаться в предстоящем выпуске 21.08 (RAPIDS Spark переходит на управление версиями календаря).