Merge two spark sql columns of type Array[string] into a new Array[string] column

I have two columns in a Spark SQL DataFrame with each entry in either column as an array of strings.

val  ngramDataFrame = Seq(
  (Seq("curious", "bought", "20"), Seq("iwa", "was", "asj"))
).toDF("filtered_words", "ngrams_array")

I want to merge the arrays in each row to make a single array in a new column. My code is as follows:

def concat_array(firstarray: Array[String], 
                 secondarray: Array[String]) : Array[String] = 
                                     { (firstarray ++ secondarray).toArray }
val concatUDF = udf(concat_array _)
val concatFrame = ngramDataFrame.withColumn("full_array", concatUDF($"filtered_words", $"ngrams_array"))

I can successfully use the concat_array function on two arrays. However when I run the above code, I get the following exception:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 1 times, most recent failure: Lost task 0.0 in stage 16.0 (TID 12, localhost): org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array, array) => array) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String; at $line80.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:76) ... 13 more Driver stacktrace:

2 Answers

In Spark 2.4 or later you can use concat (if you want to keep duplicates):

  "full_array", concat($"filtered_words", $"ngrams_array")
|      filtered_words|   ngrams_array|          full_array|
|[curious, bought,...|[iwa, was, asj]|[curious, bought,...|

or array_union (if you want to drop duplicates):

   array_union($"filtered_words", $"ngrams_array")

These can be also composed from the other higher order functions, for example

   flatten(array($"filtered_words", $"ngrams_array"))

with duplicates, and

   array_distinct(flatten(array($"filtered_words", $"ngrams_array")))


On a side note, you shouldn't use WrappedArray when working with ArrayType columns. Instead you should expect the guaranteed interface, which is Seq. So the udf should use function with following signature:

(Seq[String], Seq[String]) => Seq[String]

Please refer to SQL Programming Guide for details.

Arjun there is an error in the udf you had created.when you are passing the array type columns .data type is not Array[String] it is WrappedArray[String].below i am pasting the modified udf along with output.

val SparkCtxt = new SparkContext(sparkConf)

val sqlContext = new SQLContext(SparkCtxt)

import sqlContext.implicits

import org.apache.spark.sql.functions._
val temp=SparkCtxt.parallelize(Seq(Row(Array("String1","String2"),Array("String3","String4"))))
val df= sqlContext.createDataFrame(temp,
  )    )

def concat_array(firstarray: mutable.WrappedArray[String],
                 secondarray: mutable.WrappedArray[String]) : mutable.WrappedArray[String] =
 (firstarray ++ secondarray)
val concatUDF = udf(concat_array _)
val df2=df.withColumn("udftest",concatUDF(df.col("Col1"), df.col("Col2")))


|              Col1|              Col2|             udftest|
|[String1, String2]|[String3, String4]|[String1, String2...|

WrappedArray(String1, String2, String3, String4)

