Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Filtering rows with empty arrays in PySpark

We are trying to filter rows that contain empty arrays in a field using PySpark. Here is the schema of the DF:

root
 |-- created_at: timestamp (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- text: string (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- favorite_count: long (nullable = true)
 |-- in_reply_to_status_id: long (nullable = true)
 |-- in_reply_to_user_id: long (nullable = true)
 |-- in_reply_to_screen_name: string (nullable = true)
 |-- user_mentions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- screen_name: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)

We are trying two approaches.

First, defining UDF that can modify the rows like this

empty_array_to_null = udf(lambda arr: None if len(arr) == 0 else arr, ArrayType(StructType()))

and use it to exclude the rows in df.select(empty_array_to_null(df.user_mentions)).

The other approach is to have the following UDF:

is_empty = udf(lambda x: len(x) == 0, BooleanType())

and use it in df.filter(is_empty(df.user_mentions))

Both approaches throw errors. First approach yields the following:

An error occurred while calling o3061.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1603.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1603.0 (TID 41390, 10.0.0.11): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 0 fields are required while 5 values are provided.
at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:136)
at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$fromJava$1.apply(EvaluatePython.scala:122)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)

Second approach throws the following:

Some of types cannot be determined by the first 100 rows, please try again with sampling
Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 57, in toDF
    return sparkSession.createDataFrame(self, schema, sampleRatio)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 522, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 360, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 347, in _inferSchema
    raise ValueError("Some of types cannot be determined by the "
ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
...

Update: Added sample data...

+--------------------+--------------+--------------------+-------------+--------------+---------------------+-------------------+-----------------------+-------------+--------------------+
|          created_at|   screen_name|                text|retweet_count|favorite_count|in_reply_to_status_id|in_reply_to_user_id|in_reply_to_screen_name|user_mentions|            hashtags|
+--------------------+--------------+--------------------+-------------+--------------+---------------------+-------------------+-----------------------+-------------+--------------------+
|2017-03-13 23:00:...|  danielmellen|#DevOps understan...|            0|             0|                 null|               null|                   null|           []|            [devops]|
|2017-03-13 23:00:...|     RebacaInc|Automation of ent...|            0|             0|                 null|               null|                   null|           []|[googlecloud, orc...|
|2017-03-13 23:00:...| CMMIAppraiser|Get your Professi...|            0|             0|                 null|               null|                   null|           []|        [broadsword]|
|2017-03-13 23:00:...|       usxtron|and when the syst...|            0|             0|                 null|               null|                   null|           []|             [cloud]|
|2017-03-13 23:00:...|     SearchCRM|.#Automation and ...|            0|             0|                 null|               null|                   null|           []|[automation, chat...|
|2017-03-13 23:00:...|  careers_tech|SummitSync - Juni...|            0|             0|                 null|               null|                   null|           []|[junior, cloud, e...|
|2017-03-13 23:00:...|    roy_lauzon|Both the #DevOps ...|            0|             0|                 null|               null|                   null|           []|[devops, cybersec...|
|2017-03-13 23:00:...|      nosqlgal|Introducing #Couc...|            0|             0|                 null|               null|                   null|           []|  [couchbase, nosql]|
|2017-03-13 23:00:...|  jordanfarrer|Ran into a weird ...|            0|             0|                 null|               null|                   null|           []|            [docker]|
|2017-03-13 23:00:...|    BGrieveSTL|#purestorage + #a...|            0|             0|                 null|               null|                   null|           []|[purestorage, azure]|
|2017-03-13 23:00:...| Hotelbeds_API|"How to Quickly O...|            0|             0|                 null|               null|                   null|           []|       [api, feedly]|
|2017-03-13 23:00:...|  ScalaWilliam|Principles behind...|            0|             0|                 null|               null|                   null|           []|             [agile]|
|2017-03-13 23:00:...|   PRFT_Oracle|[On-Demand Webina...|            0|             0|                 null|               null|                   null|           []|             [cloud]|
|2017-03-13 23:00:...|    PDF_filler|Now you can #secu...|            0|             0|                 null|               null|                   null|           []|[secure, data, ap...|
|2017-03-13 23:00:...|lgoncalves1979|10 Mistakes We Ma...|            0|             0|                 null|               null|                   null|           []|[coaching, scrumm...|
|2017-03-13 23:00:...|       Jelecos|Vanguard CIO: Why...|            0|             0|                 null|               null|                   null|           []|[microservices, cio]|
|2017-03-13 23:00:...|   DJGaryBaldy|Why bother with W...|            0|             0|                 null|               null|                   null|           []|        [automation]|
|2017-03-13 23:00:...|     1codeblog|Apigee Edge Produ...|            0|             0|                 null|               null|                   null|           []|[cloud, next17, g...|
|2017-03-13 23:00:...|     CloudRank|Why and when shou...|            0|             0|                 null|               null|                   null|           []|[machinelearning,...|
|2017-03-13 23:00:...|  forgeaheadio|5 essentials for ...|            0|             0|                 null|               null|                   null|           []|[hybrid, cloud, h...|
+--------------------+--------------+--------------------+-------------+--------------+---------------------+-------------------+-----------------------+-------------+--------------------+
only showing top 20 rows
like image 374
toddysm Avatar asked Mar 23 '17 17:03

toddysm


People also ask

How do you filter blank rows in PySpark?

In PySpark, using filter() or where() functions of DataFrame we can filter rows with NULL values by checking isNULL() of PySpark Column class. The above statements return all rows that have null values on the state column and the result is returned as the new DataFrame.

How we can delete rows with NULL values in PySpark DataFrame?

In order to remove Rows with NULL values on selected columns of PySpark DataFrame, use drop(columns:Seq[String]) or drop(columns:Array[String]). To these functions pass the names of the columns you wanted to check for NULL values to delete rows.

How do I filter rows in Spark DataFrame?

Spark filter() or where() function is used to filter the rows from DataFrame or Dataset based on the given one or multiple conditions or SQL expression. You can use where() operator instead of the filter if you are coming from SQL background. Both these functions operate exactly the same.

How do you check blank value in PySpark?

In PySpark DataFrame you can calculate the count of Null, None, NaN or Empty/Blank values in a column by using isNull() of Column class & SQL functions isnan() count() and when().


1 Answers

One of the way is to first get the size of your array, and then filter on the rows which array size is 0. I have found the solution here How to convert empty arrays to nulls?.

import pyspark.sql.functions as F
df = df.withColumn("size", F.size(F.col(user_mentions)))
df_filtered = df.filter(F.col("size") >= 1)
like image 151
Sophie D. Avatar answered Oct 18 '22 14:10

Sophie D.