Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unable to create array literal in spark/pyspark

I'm in trouble trying to remove rows from dataframe based on two-column list of items to filter. For example for this dataframe:

df = spark.createDataFrame([(100, 'A', 304), (200, 'B', 305), (300, 'C', 306)], ['number', 'letter', 'id'])
df.show()
+------+------+---+
|number|letter| id|
+------+------+---+
|   100|     A|304|
|   200|     B|305|
|   300|     C|306|
+------+------+---+

I can easily remove rows using isin on one column:

df.where(~col('number').isin([100, 200])).show()
+------+------+---+
|number|letter| id|
+------+------+---+
|   300|     C|306|
+------+------+---+

But when I try to remove them by two columns I get an exception:

df.where(~array('number', 'letter').isin([(100, 'A'), (200, 'B')])).show()

Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [100, A]
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
    at org.apache.spark.sql.functions$.lit(functions.scala:101)
    at org.apache.spark.sql.functions.lit(functions.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

After some investigation I realized that root cause of the problem is creating literals from non-primitive types. I tried the following code in pyspark:

lit((100, 'A'))
lit([100, 'A'])

and the following in scala-spark:

lit((100, "A"))
lit(List(100, "A"))
lit(Seq(100, "A"))
lit(Array(100, "A"))

but with no luck... Does anyone know the way to create array literal in spark/pyspark? Or is there another method to filter dataframe by two columns?

like image 783
Mariusz Avatar asked Jan 06 '17 18:01

Mariusz


People also ask

How do you create an array in PySpark?

Create PySpark ArrayType You can create an instance of an ArrayType using ArraType() class, This takes arguments valueType and one optional argument valueContainsNull to specify if a value can accept null, by default it takes True. valueType should be a PySpark type that extends DataType class.

What is PySpark literal?

A literal (also known as a constant) represents a fixed data value. Spark SQL supports the following literals: String Literal. Binary Literal.

What is col and lit in PySpark?

pyspark.sql.functions. lit (col)[source] Creates a Column of literal value. New in version 1.3.

What is typedLit?

PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. Both these functions return Column type as return…


2 Answers

To create an array literal in spark you need to create an array from a series of columns, where a column is created from the lit function:

scala> array(lit(100), lit("A"))
res1: org.apache.spark.sql.Column = array(100, A)
like image 163
Justin Pihony Avatar answered Oct 08 '22 05:10

Justin Pihony


First of all you probably want struct not arrays. Remember that Spark SQL doesn't support heterogeneous arrays so array(1, 'a') is casted to array<string>.

So query could look like this:

choices = [(100, 'A'), (200, 'B')]

target = [
    struct(
        lit(number).alias("number").cast("long"), 
        lit(letter).alias("letter").cast("string")) 
    for number, letter  in choices]

query = struct("number", "letter").isin(target)

This seems to generate valid expression:

query
Column<b'(named_struct(NamePlaceholder(), number, NamePlaceholder(), letter) IN (named_struct(col1, CAST(100 AS `number` AS BIGINT), col2, CAST(A AS `letter` AS STRING)), named_struct(col1, CAST(200 AS `number` AS BIGINT), col2, CAST(B AS `letter` AS STRING))))'>

But for some reason fails on analyzer:

df.where(~query)
AnalysisException                         Traceback (most recent call last)
...
AnalysisException: "cannot resolve '(named_struct('number', `number`, 'letter', `letter`) IN (named_struct('col1', CAST(100 AS BIGINT), 'col2', CAST('A' AS STRING)), named_struct('col1', CAST(200 AS BIGINT), 'col2', CAST('B' AS STRING))))' due to data type mismatch: Arguments must be same type;;\n'Filter NOT named_struct(number, number#0L, letter, letter#1) IN (named_struct(col1, cast(100 as bigint), col2, cast(A as string)),named_struct(col1, cast(200 as bigint), col2, cast(B as string)))\n+- LogicalRDD [number#0L, letter#1, id#2L]\n"

Strangely enough with SQL following fails as well:

df.createOrReplaceTempView("df")

spark.sql("SELECT * FROM df WHERE struct(letter, letter) IN (struct(CAST(1 AS bigint), 'a'))")
AnalysisException: "cannot resolve '(named_struct('letter', df.`letter`, 'letter', df.`letter`) IN (named_struct('col1', CAST(1 AS BIGINT), 'col2', 'a')))' due to data type mismatch: Arguments must be same type; line 1 pos 46;\n'Project [*]\n+- 'Filter named_struct(letter, letter#1, letter, letter#1) IN (named_struct(col1, cast(1 as bigint), col2, a))\n   +- SubqueryAlias df\n      +- LogicalRDD [number#0L, letter#1, id#2L]\n"

but when replaced with literals on both sides:

spark.sql("SELECT * FROM df WHERE struct(CAST(1 AS bigint), 'a') IN (struct(CAST(1 AS bigint), 'a'))")
DataFrame[number: bigint, letter: string, id: bigint]

works fine so it looks like a bug.

That being said left anti join should work just fine here:

from pyspark.sql.functions import broadcast

df.join(
    broadcast(spark.createDataFrame(choices, ("number", "letter"))), 
    ["number", "letter"],
    "leftanti"
 )
+------+------+---+
|number|letter| id|
+------+------+---+
|   300|     C|306|
+------+------+---+
like image 42
zero323 Avatar answered Oct 08 '22 06:10

zero323