Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merge two spark sql columns of type Array[string] into a new Array[string] column

I have two columns in a Spark SQL DataFrame with each entry in either column as an array of strings.

val  ngramDataFrame = Seq(
  (Seq("curious", "bought", "20"), Seq("iwa", "was", "asj"))
).toDF("filtered_words", "ngrams_array")

I want to merge the arrays in each row to make a single array in a new column. My code is as follows:

def concat_array(firstarray: Array[String], 
                 secondarray: Array[String]) : Array[String] = 
                                     { (firstarray ++ secondarray).toArray }
val concatUDF = udf(concat_array _)
val concatFrame = ngramDataFrame.withColumn("full_array", concatUDF($"filtered_words", $"ngrams_array"))

I can successfully use the concat_array function on two arrays. However when I run the above code, I get the following exception:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 1 times, most recent failure: Lost task 0.0 in stage 16.0 (TID 12, localhost): org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array, array) => array) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String; at $line80.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:76) ... 13 more Driver stacktrace:

like image 349
Arjun Mishra Avatar asked Mar 07 '18 20:03

Arjun Mishra


People also ask

How do I combine two columns in spark?

Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.

How do you change a string to an array in PySpark?

PySpark SQL provides split() function to convert delimiter separated String to an Array ( StringType to ArrayType ) column on DataFrame. This can be done by splitting a string column based on a delimiter like space, comma, pipe e.t.c, and converting it into ArrayType.

How do you merge elements in an array?

To merge elements from one array to another, we must first iterate(loop) through all the array elements. In the loop, we will retrieve each element from an array and insert(using the array push() method) to another array. Now, we can call the merge() function and pass two arrays as the arguments for merging.

How do I concatenate strings in PySpark?

PySpark Concatenate Using concat() concat() function of Pyspark SQL is used to concatenate multiple DataFrame columns into a single column. It can also be used to concatenate column types string, binary, and compatible array columns.


2 Answers

In Spark 2.4 or later you can use concat (if you want to keep duplicates):

ngramDataFrame.withColumn(
  "full_array", concat($"filtered_words", $"ngrams_array")
).show
+--------------------+---------------+--------------------+
|      filtered_words|   ngrams_array|          full_array|
+--------------------+---------------+--------------------+
|[curious, bought,...|[iwa, was, asj]|[curious, bought,...|
+--------------------+---------------+--------------------+

or array_union (if you want to drop duplicates):

ngramDataFrame.withColumn(
  "full_array",
   array_union($"filtered_words", $"ngrams_array")
)

These can be also composed from the other higher order functions, for example

ngramDataFrame.withColumn(
   "full_array",
   flatten(array($"filtered_words", $"ngrams_array"))
)

with duplicates, and

ngramDataFrame.withColumn(
   "full_array",
   array_distinct(flatten(array($"filtered_words", $"ngrams_array")))
)

without.

On a side note, you shouldn't use WrappedArray when working with ArrayType columns. Instead you should expect the guaranteed interface, which is Seq. So the udf should use function with following signature:

(Seq[String], Seq[String]) => Seq[String]

Please refer to SQL Programming Guide for details.

like image 75
zero323 Avatar answered Sep 18 '22 09:09

zero323


Arjun there is an error in the udf you had created.when you are passing the array type columns .data type is not Array[String] it is WrappedArray[String].below i am pasting the modified udf along with output.

val SparkCtxt = new SparkContext(sparkConf)

val sqlContext = new SQLContext(SparkCtxt)

import sqlContext.implicits

import org.apache.spark.sql.functions._
val temp=SparkCtxt.parallelize(Seq(Row(Array("String1","String2"),Array("String3","String4"))))
val df= sqlContext.createDataFrame(temp,
  StructType(List(
    StructField("Col1",ArrayType(StringType),true),
    StructField("Col2",ArrayType(StringType),true)
  )
  )    )

def concat_array(firstarray: mutable.WrappedArray[String],
                 secondarray: mutable.WrappedArray[String]) : mutable.WrappedArray[String] =
{
 (firstarray ++ secondarray)
}
val concatUDF = udf(concat_array _)
val df2=df.withColumn("udftest",concatUDF(df.col("Col1"), df.col("Col2")))
df2.select("udftest").foreach(each=>{println("***********")
println(each(0))})
df2.show(true)

OUTPUT:

+------------------+------------------+--------------------+
|              Col1|              Col2|             udftest|
+------------------+------------------+--------------------+
|[String1, String2]|[String3, String4]|[String1, String2...|
+------------------+------------------+--------------------+

WrappedArray(String1, String2, String3, String4)

like image 39
sai pradeep kumar kotha Avatar answered Sep 18 '22 09:09

sai pradeep kumar kotha