We are trying to filter rows that contain empty arrays in a field using PySpark. Here is the schema of the DF:
root
|-- created_at: timestamp (nullable = true)
|-- screen_name: string (nullable = true)
|-- text: string (nullable = true)
|-- retweet_count: long (nullable = true)
|-- favorite_count: long (nullable = true)
|-- in_reply_to_status_id: long (nullable = true)
|-- in_reply_to_user_id: long (nullable = true)
|-- in_reply_to_screen_name: string (nullable = true)
|-- user_mentions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- id_str: string (nullable = true)
| | |-- indices: array (nullable = true)
| | | |-- element: long (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- screen_name: string (nullable = true)
|-- hashtags: array (nullable = true)
| |-- element: string (containsNull = true)
We are trying two approaches.
First, defining UDF that can modify the rows like this
empty_array_to_null = udf(lambda arr: None if len(arr) == 0 else arr, ArrayType(StructType()))
and use it to exclude the rows in df.select(empty_array_to_null(df.user_mentions))
.
The other approach is to have the following UDF:
is_empty = udf(lambda x: len(x) == 0, BooleanType())
and use it in df.filter(is_empty(df.user_mentions))
Both approaches throw errors. First approach yields the following:
An error occurred while calling o3061.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1603.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1603.0 (TID 41390, 10.0.0.11): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 0 fields are required while 5 values are provided.
at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:136)
at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$fromJava$1.apply(EvaluatePython.scala:122)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
Second approach throws the following:
Some of types cannot be determined by the first 100 rows, please try again with sampling
Traceback (most recent call last):
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 57, in toDF
return sparkSession.createDataFrame(self, schema, sampleRatio)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 522, in createDataFrame
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 360, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 347, in _inferSchema
raise ValueError("Some of types cannot be determined by the "
ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
...
Update: Added sample data...
+--------------------+--------------+--------------------+-------------+--------------+---------------------+-------------------+-----------------------+-------------+--------------------+
| created_at| screen_name| text|retweet_count|favorite_count|in_reply_to_status_id|in_reply_to_user_id|in_reply_to_screen_name|user_mentions| hashtags|
+--------------------+--------------+--------------------+-------------+--------------+---------------------+-------------------+-----------------------+-------------+--------------------+
|2017-03-13 23:00:...| danielmellen|#DevOps understan...| 0| 0| null| null| null| []| [devops]|
|2017-03-13 23:00:...| RebacaInc|Automation of ent...| 0| 0| null| null| null| []|[googlecloud, orc...|
|2017-03-13 23:00:...| CMMIAppraiser|Get your Professi...| 0| 0| null| null| null| []| [broadsword]|
|2017-03-13 23:00:...| usxtron|and when the syst...| 0| 0| null| null| null| []| [cloud]|
|2017-03-13 23:00:...| SearchCRM|.#Automation and ...| 0| 0| null| null| null| []|[automation, chat...|
|2017-03-13 23:00:...| careers_tech|SummitSync - Juni...| 0| 0| null| null| null| []|[junior, cloud, e...|
|2017-03-13 23:00:...| roy_lauzon|Both the #DevOps ...| 0| 0| null| null| null| []|[devops, cybersec...|
|2017-03-13 23:00:...| nosqlgal|Introducing #Couc...| 0| 0| null| null| null| []| [couchbase, nosql]|
|2017-03-13 23:00:...| jordanfarrer|Ran into a weird ...| 0| 0| null| null| null| []| [docker]|
|2017-03-13 23:00:...| BGrieveSTL|#purestorage + #a...| 0| 0| null| null| null| []|[purestorage, azure]|
|2017-03-13 23:00:...| Hotelbeds_API|"How to Quickly O...| 0| 0| null| null| null| []| [api, feedly]|
|2017-03-13 23:00:...| ScalaWilliam|Principles behind...| 0| 0| null| null| null| []| [agile]|
|2017-03-13 23:00:...| PRFT_Oracle|[On-Demand Webina...| 0| 0| null| null| null| []| [cloud]|
|2017-03-13 23:00:...| PDF_filler|Now you can #secu...| 0| 0| null| null| null| []|[secure, data, ap...|
|2017-03-13 23:00:...|lgoncalves1979|10 Mistakes We Ma...| 0| 0| null| null| null| []|[coaching, scrumm...|
|2017-03-13 23:00:...| Jelecos|Vanguard CIO: Why...| 0| 0| null| null| null| []|[microservices, cio]|
|2017-03-13 23:00:...| DJGaryBaldy|Why bother with W...| 0| 0| null| null| null| []| [automation]|
|2017-03-13 23:00:...| 1codeblog|Apigee Edge Produ...| 0| 0| null| null| null| []|[cloud, next17, g...|
|2017-03-13 23:00:...| CloudRank|Why and when shou...| 0| 0| null| null| null| []|[machinelearning,...|
|2017-03-13 23:00:...| forgeaheadio|5 essentials for ...| 0| 0| null| null| null| []|[hybrid, cloud, h...|
+--------------------+--------------+--------------------+-------------+--------------+---------------------+-------------------+-----------------------+-------------+--------------------+
only showing top 20 rows
In PySpark, using filter() or where() functions of DataFrame we can filter rows with NULL values by checking isNULL() of PySpark Column class. The above statements return all rows that have null values on the state column and the result is returned as the new DataFrame.
In order to remove Rows with NULL values on selected columns of PySpark DataFrame, use drop(columns:Seq[String]) or drop(columns:Array[String]). To these functions pass the names of the columns you wanted to check for NULL values to delete rows.
Spark filter() or where() function is used to filter the rows from DataFrame or Dataset based on the given one or multiple conditions or SQL expression. You can use where() operator instead of the filter if you are coming from SQL background. Both these functions operate exactly the same.
In PySpark DataFrame you can calculate the count of Null, None, NaN or Empty/Blank values in a column by using isNull() of Column class & SQL functions isnan() count() and when().
One of the way is to first get the size of your array, and then filter on the rows which array size is 0. I have found the solution here How to convert empty arrays to nulls?.
import pyspark.sql.functions as F
df = df.withColumn("size", F.size(F.col(user_mentions)))
df_filtered = df.filter(F.col("size") >= 1)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With