I am using Spark 1.5.1 with Scala on Zeppelin notebook.
What is the best way to delete all the rows that belong to the users to exclude?
Another way to ask the same question is: what is the best way to keep the rows that belong to the users to retain?
I saw this post and applied its solution (see the code below), but the execution is slow, knowing that I am running SPARK 1.5.1 on my local machine, an I have decent RAM memory of 16GB and the initial DataFrame fits in the memory.
Here is the code that I am applying:
import org.apache.spark.sql.functions.lit
val finalDataFrame = initialDataFrame.where($"userID".in(listOfUsersToKeep.map(lit(_)):_*))
In the code above:
I wonder if there is a more efficient solution than the one I am using.
Thanks
If you are coming from SQL background, you can use that knowledge in Spark to filter DataFrame rows with SQL expressions. This yields below DataFrame results. To filter rows on DataFrame based on multiple conditions, you case use either Column with a condition or SQL expression.
Filter on an Array Column When you want to filter rows from DataFrame based on value present in an array collection column, you can use the first syntax. The below example uses array_contains () Spark SQL function which checks if a value contains in an array if present it returns true otherwise false.
DataFrame filter () with Column condition Use Column with the condition to filter the rows from DataFrame, using this you can express complex condition by referring column names using col (name), $"colname" dfObject ("colname") , this approach is mostly used while working with DataFrames. Use “ === ” for comparison.
Spark filter () or where () function is used to filter the rows from DataFrame or Dataset based on the given one or multiple conditions or SQL expression. You can use where () operator instead of the filter if you are coming from SQL background. Both these functions operate exactly the same.
You can either use join
:
val usersToKeep = sc.parallelize(
listOfUsersToKeep.map(Tuple1(_))).toDF("userID_")
val finalDataFrame = usersToKeep
.join(initialDataFrame, $"userID" === $"userID_")
.drop("userID_")
or a broadcast variable and an UDF:
import org.apache.spark.sql.functions.udf
val usersToKeepBD = sc.broadcast(listOfUsersToKeep.toSet)
val checkUser = udf((id: Long) => usersToKeepBD.value.contains(id))
val finalDataFrame = initialDataFrame.where(checkUser($"userID"))
It should be also possible to broadcast a DataFrame:
import org.apache.spark.sql.functions.broadcast
initialDataFrame.join(broadcast(usersToKeep), $"userID" === $"userID_")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With