Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark DataFrame filtering: retain element belonging to a list

I am using Spark 1.5.1 with Scala on Zeppelin notebook.

  • I have a DataFrame with a column called userID with Long type.
  • In total I have about 4 million rows and 200,000 unique userID.
  • I have also a list of 50,000 userID to exclude.
  • I can easily build the list of userID to retain.

What is the best way to delete all the rows that belong to the users to exclude?

Another way to ask the same question is: what is the best way to keep the rows that belong to the users to retain?

I saw this post and applied its solution (see the code below), but the execution is slow, knowing that I am running SPARK 1.5.1 on my local machine, an I have decent RAM memory of 16GB and the initial DataFrame fits in the memory.

Here is the code that I am applying:

import org.apache.spark.sql.functions.lit
val finalDataFrame = initialDataFrame.where($"userID".in(listOfUsersToKeep.map(lit(_)):_*))

In the code above:

  • the initialDataFrame has 3885068 rows, each row has 5 columns, one of these columns called userID and it contains Long values.
  • The listOfUsersToKeep is an Array[Long] and it contains 150,000 Long userID.

I wonder if there is a more efficient solution than the one I am using.

Thanks

like image 436
Rami Avatar asked Nov 20 '15 10:11

Rami


People also ask

How to filter a Dataframe based on multiple conditions in spark?

If you are coming from SQL background, you can use that knowledge in Spark to filter DataFrame rows with SQL expressions. This yields below DataFrame results. To filter rows on DataFrame based on multiple conditions, you case use either Column with a condition or SQL expression.

How do I filter a column in an array in spark?

Filter on an Array Column When you want to filter rows from DataFrame based on value present in an array collection column, you can use the first syntax. The below example uses array_contains () Spark SQL function which checks if a value contains in an array if present it returns true otherwise false.

How do I filter a column in a Dataframe?

DataFrame filter () with Column condition Use Column with the condition to filter the rows from DataFrame, using this you can express complex condition by referring column names using col (name), $"colname" dfObject ("colname") , this approach is mostly used while working with DataFrames. Use “ === ” for comparison.

What is the difference between where () and filter () in spark?

Spark filter () or where () function is used to filter the rows from DataFrame or Dataset based on the given one or multiple conditions or SQL expression. You can use where () operator instead of the filter if you are coming from SQL background. Both these functions operate exactly the same.


1 Answers

You can either use join:

val usersToKeep = sc.parallelize(
  listOfUsersToKeep.map(Tuple1(_))).toDF("userID_")

val finalDataFrame = usersToKeep
  .join(initialDataFrame, $"userID" === $"userID_")
  .drop("userID_")

or a broadcast variable and an UDF:

import org.apache.spark.sql.functions.udf

val usersToKeepBD = sc.broadcast(listOfUsersToKeep.toSet)
val checkUser = udf((id: Long) => usersToKeepBD.value.contains(id))
val finalDataFrame = initialDataFrame.where(checkUser($"userID"))

It should be also possible to broadcast a DataFrame:

import org.apache.spark.sql.functions.broadcast

initialDataFrame.join(broadcast(usersToKeep), $"userID" === $"userID_")
like image 100
zero323 Avatar answered Sep 26 '22 07:09

zero323