I'm in trouble trying to remove rows from dataframe based on two-column list of items to filter. For example for this dataframe:
df = spark.createDataFrame([(100, 'A', 304), (200, 'B', 305), (300, 'C', 306)], ['number', 'letter', 'id'])
df.show()
+------+------+---+
|number|letter| id|
+------+------+---+
| 100| A|304|
| 200| B|305|
| 300| C|306|
+------+------+---+
I can easily remove rows using isin
on one column:
df.where(~col('number').isin([100, 200])).show()
+------+------+---+
|number|letter| id|
+------+------+---+
| 300| C|306|
+------+------+---+
But when I try to remove them by two columns I get an exception:
df.where(~array('number', 'letter').isin([(100, 'A'), (200, 'B')])).show()
Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [100, A]
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
at org.apache.spark.sql.functions$.lit(functions.scala:101)
at org.apache.spark.sql.functions.lit(functions.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
After some investigation I realized that root cause of the problem is creating literals from non-primitive types. I tried the following code in pyspark:
lit((100, 'A'))
lit([100, 'A'])
and the following in scala-spark:
lit((100, "A"))
lit(List(100, "A"))
lit(Seq(100, "A"))
lit(Array(100, "A"))
but with no luck... Does anyone know the way to create array literal in spark/pyspark? Or is there another method to filter dataframe by two columns?
Create PySpark ArrayType You can create an instance of an ArrayType using ArraType() class, This takes arguments valueType and one optional argument valueContainsNull to specify if a value can accept null, by default it takes True. valueType should be a PySpark type that extends DataType class.
A literal (also known as a constant) represents a fixed data value. Spark SQL supports the following literals: String Literal. Binary Literal.
pyspark.sql.functions. lit (col)[source] Creates a Column of literal value. New in version 1.3.
PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. Both these functions return Column type as return…
To create an array literal in spark you need to create an array from a series of columns, where a column is created from the lit
function:
scala> array(lit(100), lit("A"))
res1: org.apache.spark.sql.Column = array(100, A)
First of all you probably want struct
not arrays
. Remember that Spark SQL doesn't support heterogeneous arrays so array(1, 'a')
is casted to array<string>
.
So query could look like this:
choices = [(100, 'A'), (200, 'B')]
target = [
struct(
lit(number).alias("number").cast("long"),
lit(letter).alias("letter").cast("string"))
for number, letter in choices]
query = struct("number", "letter").isin(target)
This seems to generate valid expression:
query
Column<b'(named_struct(NamePlaceholder(), number, NamePlaceholder(), letter) IN (named_struct(col1, CAST(100 AS `number` AS BIGINT), col2, CAST(A AS `letter` AS STRING)), named_struct(col1, CAST(200 AS `number` AS BIGINT), col2, CAST(B AS `letter` AS STRING))))'>
But for some reason fails on analyzer:
df.where(~query)
AnalysisException Traceback (most recent call last)
...
AnalysisException: "cannot resolve '(named_struct('number', `number`, 'letter', `letter`) IN (named_struct('col1', CAST(100 AS BIGINT), 'col2', CAST('A' AS STRING)), named_struct('col1', CAST(200 AS BIGINT), 'col2', CAST('B' AS STRING))))' due to data type mismatch: Arguments must be same type;;\n'Filter NOT named_struct(number, number#0L, letter, letter#1) IN (named_struct(col1, cast(100 as bigint), col2, cast(A as string)),named_struct(col1, cast(200 as bigint), col2, cast(B as string)))\n+- LogicalRDD [number#0L, letter#1, id#2L]\n"
Strangely enough with SQL following fails as well:
df.createOrReplaceTempView("df")
spark.sql("SELECT * FROM df WHERE struct(letter, letter) IN (struct(CAST(1 AS bigint), 'a'))")
AnalysisException: "cannot resolve '(named_struct('letter', df.`letter`, 'letter', df.`letter`) IN (named_struct('col1', CAST(1 AS BIGINT), 'col2', 'a')))' due to data type mismatch: Arguments must be same type; line 1 pos 46;\n'Project [*]\n+- 'Filter named_struct(letter, letter#1, letter, letter#1) IN (named_struct(col1, cast(1 as bigint), col2, a))\n +- SubqueryAlias df\n +- LogicalRDD [number#0L, letter#1, id#2L]\n"
but when replaced with literals on both sides:
spark.sql("SELECT * FROM df WHERE struct(CAST(1 AS bigint), 'a') IN (struct(CAST(1 AS bigint), 'a'))")
DataFrame[number: bigint, letter: string, id: bigint]
works fine so it looks like a bug.
That being said left anti join should work just fine here:
from pyspark.sql.functions import broadcast
df.join(
broadcast(spark.createDataFrame(choices, ("number", "letter"))),
["number", "letter"],
"leftanti"
)
+------+------+---+
|number|letter| id|
+------+------+---+
| 300| C|306|
+------+------+---+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With