Как передать столбец массива в качестве аргумента в VectorUdf в .Net Spark?

#c# #apache-spark #user-defined-functions #apache-arrow #.net-spark

Вопрос:

Я пытаюсь реализовать векторный Udf в C# Spark.

Я создал среду .Net Spark, следуя Spark .Net. Вектор Udf (стрелка Apache и Microsoft.Данные.Анализ обоих) работал для меня для столбца IntegerType. Теперь я пытаюсь отправить столбец типа целочисленного массива в вектор Udf и не смог найти способ добиться этого.

использование

 using System;
using System.Linq;
using Microsoft.Data.Analysis;
using Microsoft.Spark.Sql;
using func = Microsoft.Spark.Sql.Functions;
using DataFrame = Microsoft.Spark.Sql.DataFrame;
using Arrow = Apache.Arrow;
 

программа

 SparkSession spark = SparkSession
                .Builder()
                .AppName("sample")
                .GetOrCreate();

DataFrame dataFrame = spark.Range(0, 100).Repartition(4);
            
            Func<Column, Column> array20 = func.Udf<int, int[]>(
                (col1) => Enumerable.Range(0, col1).ToArray());

            dataFrame = dataFrame.WithColumn("array", array20(dataFrame["id"]));

// Apache Arrow
            var arrowVectorUdf = ArrowFunctions.VectorUdf<Arrow.UInt64Array, Arrow.Int64Array>((id) =>
            {
                var int32Array = new Arrow.Int64Array.Builder();
                var count = id.Length;
                foreach (var item in id.Data.Children)
                {
                    int32Array.Append(item.Length   count);
                }
                return int32Array.Build();
            });

// Microsoft.Data.Analysis
            var dataFrameVector = DataFrameFunctions.VectorUdf<Int64DataFrameColumn, Int64DataFrameColumn>((id) => id   id.Length);
 

Работающий

             dataFrame = dataFrame.WithColumn("arrowVectorUdfId", arrowVectorUdf(dataFrame["id"]));

            dataFrame = dataFrame.WithColumn("dataFrameVectorId", dataFrameVector(dataFrame["id"]));
 

Не работает

             dataFrame = dataFrame.WithColumn("arrowVectorUdf", arrowVectorUdf(dataFrame["array"]));

            dataFrame = dataFrame.WithColumn("dataFrameVector", dataFrameVector(dataFrame["array"]));
 

Вышеуказанные Udfs будут работать, если я отправлю столбец «идентификатор» вместо столбца «массив». Я не уверен , какой тип должен быть аргументом Udfs для столбца «массив». Приведенный выше код приводит к той же ошибке, что и ниже для Apache.Стрелка и Microsoft.Данные.Анализ,

  [2021-03-25T07:02:05.9218517Z] [LAPTOP-0S8GNQ52] [Error] [TaskRunner] [0] Exiting with exception: System.IO.InvalidDataException: Arrow primitive 'List' is unsupported.
   at Apache.Arrow.Ipc.MessageSerializer.GetFieldArrowType(Field field)
   at Apache.Arrow.Ipc.MessageSerializer.GetSchema(Schema schema)
   at Apache.Arrow.Ipc.ArrowStreamReaderImplementation.<ReadSchema>b__12_0(Memory`1 buff)
   at Apache.Arrow.ArrayPoolExtensions.RentReturn(ArrayPool`1 pool, Int32 length, Action`1 action)
   at Apache.Arrow.Ipc.ArrowStreamReaderImplementation.ReadRecordBatch()
   at Microsoft.Spark.Worker.Command.ArrowBasedCommandExecutor.<GetInputIterator>d__2.MoveNext()
   at Microsoft.Spark.Worker.Command.ArrowOrDataFrameSqlCommandExecutor.ExecuteDataFrameSqlCommand(Stream inputStream, Stream outputStream, SqlCommand[] commands)
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Booleanamp; readComplete)
   at Microsoft.Spark.Worker.TaskRunner.Run()
[2021-03-25T07:02:05.9245061Z] [LAPTOP-0S8GNQ52] [Info] [TaskRunner] [0] Finished running 0 task(s).
[2021-03-25T07:02:05.9249567Z] [LAPTOP-0S8GNQ52] [Info] [SimpleWorker] RunSimpleWorker() finished successfully
21/03/25 12:32:05 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.IllegalArgumentException
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669)
        at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
        at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:163)
        at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:169)
        at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:160)
        at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$anon$1.read(PythonArrowOutput.scala:89)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$anon$1.read(PythonArrowOutput.scala:49)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
21/03/25 12:32:06 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
21/03/25 12:32:06 ERROR DotnetBackendHandler: Failed to execute 'showString' on 'org.apache.spark.sql.Dataset' with args=([Type=java.lang.Integer, Value: 20], [Type=java.lang.Integer, Value: 20], [Type=java.lang.Boolean, Value: false])
 

Комментарии:

1. вы говорите, что это работает с «идентификатором», но не со столбцом «массив»; можете ли вы отдельно вставить оба фрагмента кода — рабочие и нерабочие

2. @harshvchawla обновлено

3. я совсем не знаю spark, так что все это просто предположения: (и не знаю, как выглядит фрейм данных, когда вы вызываете перераспределение) — ваша функция. Udf требует аргумента int Array[] , поэтому, возможно, содержимое в столбце dataFram[«идентификатор»] является массивом int [], но содержимое столбца DataFrame[«массив»] имеет другой тип?

4. да, они разных типов, и массив udf требует разных аргументов.. я пытаюсь выяснить, что я должен в качестве типа аргумента.. я не смог найти документ или образец для достижения того же

5. как выглядят данные ur после вызова перераспределения; какие-либо сведения о примере, который вы можете опубликовать? также попробуйте выполнить отладку с помощью typeof или просто выполните команду toArray / toString, чтобы узнать, сообщает ли компилятор вам о какой-либо проблеме? Например, вы можете даже попытаться применить типы данных вместо var во время отладки, возможно

Ответ №1:

Это работает для меня с обоими вашими образцами кода. Я создал среду spark так же, как и вы, за исключением того, что среда не работала для меня с Hadoop 2.7, и я отдельно установил Hadoop 2.7.4