Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass array column as argument in VectorUdf in .Net Spark?

I'm trying to implement Vector Udf in C# Spark.

I have created .Net Spark environment by following Spark .Net. Vector Udf (Apache arrow and Microsoft.Data.Analysis both) worked for me for IntegerType column. Now, trying to send the Integer array type column to Vector Udf and couldn't find the way to achieve this.

usings

using System;
using System.Linq;
using Microsoft.Data.Analysis;
using Microsoft.Spark.Sql;
using func = Microsoft.Spark.Sql.Functions;
using DataFrame = Microsoft.Spark.Sql.DataFrame;
using Arrow = Apache.Arrow;

program

SparkSession spark = SparkSession
                .Builder()
                .AppName("sample")
                .GetOrCreate();

DataFrame dataFrame = spark.Range(0, 100).Repartition(4);
            
            Func<Column, Column> array20 = func.Udf<int, int[]>(
                (col1) => Enumerable.Range(0, col1).ToArray());

            dataFrame = dataFrame.WithColumn("array", array20(dataFrame["id"]));

// Apache Arrow
            var arrowVectorUdf = ArrowFunctions.VectorUdf<Arrow.UInt64Array, Arrow.Int64Array>((id) =>
            {
                var int32Array = new Arrow.Int64Array.Builder();
                var count = id.Length;
                foreach (var item in id.Data.Children)
                {
                    int32Array.Append(item.Length + count);
                }
                return int32Array.Build();
            });

// Microsoft.Data.Analysis
            var dataFrameVector = DataFrameFunctions.VectorUdf<Int64DataFrameColumn, Int64DataFrameColumn>((id) => id + id.Length);

Working

            dataFrame = dataFrame.WithColumn("arrowVectorUdfId", arrowVectorUdf(dataFrame["id"]));

            dataFrame = dataFrame.WithColumn("dataFrameVectorId", dataFrameVector(dataFrame["id"]));

Not working

            dataFrame = dataFrame.WithColumn("arrowVectorUdf", arrowVectorUdf(dataFrame["array"]));

            dataFrame = dataFrame.WithColumn("dataFrameVector", dataFrameVector(dataFrame["array"]));

Above Udfs will work, if i send the "id" column instead of "array" column. I'm not sure , what type should be the argument of the Udfs for "array" column. Above code results same error like below for Apache.Arrow and Microsoft.Data.Analysis,

 [2021-03-25T07:02:05.9218517Z] [LAPTOP-0S8GNQ52] [Error] [TaskRunner] [0] Exiting with exception: System.IO.InvalidDataException: Arrow primitive 'List' is unsupported.
   at Apache.Arrow.Ipc.MessageSerializer.GetFieldArrowType(Field field)
   at Apache.Arrow.Ipc.MessageSerializer.GetSchema(Schema schema)
   at Apache.Arrow.Ipc.ArrowStreamReaderImplementation.<ReadSchema>b__12_0(Memory`1 buff)
   at Apache.Arrow.ArrayPoolExtensions.RentReturn(ArrayPool`1 pool, Int32 length, Action`1 action)
   at Apache.Arrow.Ipc.ArrowStreamReaderImplementation.ReadRecordBatch()
   at Microsoft.Spark.Worker.Command.ArrowBasedCommandExecutor.<GetInputIterator>d__2.MoveNext()
   at Microsoft.Spark.Worker.Command.ArrowOrDataFrameSqlCommandExecutor.ExecuteDataFrameSqlCommand(Stream inputStream, Stream outputStream, SqlCommand[] commands)
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete)
   at Microsoft.Spark.Worker.TaskRunner.Run()
[2021-03-25T07:02:05.9245061Z] [LAPTOP-0S8GNQ52] [Info] [TaskRunner] [0] Finished running 0 task(s).
[2021-03-25T07:02:05.9249567Z] [LAPTOP-0S8GNQ52] [Info] [SimpleWorker] RunSimpleWorker() finished successfully
21/03/25 12:32:05 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.IllegalArgumentException
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669)
        at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
        at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:163)
        at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:169)
        at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:160)
        at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:89)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
21/03/25 12:32:06 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
21/03/25 12:32:06 ERROR DotnetBackendHandler: Failed to execute 'showString' on 'org.apache.spark.sql.Dataset' with args=([Type=java.lang.Integer, Value: 20], [Type=java.lang.Integer, Value: 20], [Type=java.lang.Boolean, Value: false])
like image 717
WPFUser Avatar asked Mar 25 '21 07:03

WPFUser


1 Answers

It works for me with both of your code samples. I've created the spark environment just like you, except that the environment was not working for me with Hadoop 2.7 and I separately installed Hadoop 2.7.4

like image 79
Hamed Avatar answered Sep 20 '22 03:09

Hamed