... by checking whether a columns' value is in a seq
.
Perhaps I'm not explaining it very well, I basically want this (to express it using regular SQL): DF_Column IN seq
?
First I did it using a broadcast var
(where I placed the seq), UDF
(that did the checking) and registerTempTable
.
The problem is that I didn't get to test it since I ran into a known bug that apparently only appears when using registerTempTable
with ScalaIDE.
I ended up creating a new DataFrame
out of seq
and doing inner join with it (intersection), but I doubt that's the most performant way of accomplishing the task.
Thanks
EDIT: (in response to @YijieShen):
How to do filter
based on whether elements of one DataFrame
's column are in another DF's column (like SQL select * from A where login in (select username from B)
)?
E.g: First DF:
login count login1 192 login2 146 login3 72
Second DF:
username login2 login3 login4
The result:
login count login2 146 login3 72
Attempts:
EDIT-2: I think, now that the bug is fixed, these should work. END EDIT-2
ordered.select("login").filter($"login".contains(empLogins("username")))
and
ordered.select("login").filter($"login" in empLogins("username"))
which both throw Exception in thread "main" org.apache.spark.sql.AnalysisException
, respectively:
resolved attribute(s) username#10 missing from login#8 in operator !Filter Contains(login#8, username#10);
and
resolved attribute(s) username#10 missing from login#8 in operator !Filter login#8 IN (username#10);
Vectorization is always the first and best choice. You can convert the data frame to NumPy array or into dictionary format to speed up the iteration workflow. Iterating through the key-value pair of dictionaries comes out to be the fastest way with around 280x times speed up for 20 million records.
One way to filter by rows in Pandas is to use boolean expression. We first create a boolean variable by taking the column of interest and checking if its value equals to the specific value that we want to select/keep. For example, let us filter the dataframe or subset the dataframe based on year's value 2002.
My code (following the description of your first method) runs normally in Spark 1.4.0-SNAPSHOT
on these two configurations:
Intellij IDEA's test
Spark Standalone cluster
with 8 nodes (1 master, 7 worker)Please check if any differences exists
val bc = sc.broadcast(Array[String]("login3", "login4")) val x = Array(("login1", 192), ("login2", 146), ("login3", 72)) val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt") val func: (String => Boolean) = (arg: String) => bc.value.contains(arg) val sqlfunc = udf(func) val filtered = xdf.filter(sqlfunc(col("name"))) xdf.show() filtered.show()
Output
name cnt
login1 192
login2 146
login3 72name cnt
login3 72
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With