Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Filter based on another RDD in Spark

I would like to keep only the employees which does have a departement ID referenced in the second table.

Employee table
LastName    DepartmentID
Rafferty    31
Jones   33
Heisenberg  33
Robinson    34
Smith   34

Department table
DepartmentID
31  
33  

I have tried the following code which does not work:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
employee.filter(lambda e: e[1] in department).collect()

Py4JError: An error occurred while calling o344.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist

Any ideas? I am using Spark 1.1.0 with Python. However, I would accept a Scala or Python answer.

like image 992
poiuytrez Avatar asked Oct 06 '14 10:10

poiuytrez


People also ask

How to filter RDDs in spark?

Spark RDD Filter : RDD.filter () method returns an RDD with those elements which pass a filter condition (function) that is given as argument to the method. In this tutorial, we learn to filter RDD containing Integers, and an RDD containing Tuples, with example programs. Steps to apply filter to Spark RDD To apply filter to Spark RDD,

What is the difference between where () and filter () in spark?

Spark filter () or where () function is used to filter the rows from DataFrame or Dataset based on the given one or multiple conditions or SQL expression. You can use where () operator instead of the filter if you are coming from SQL background. Both these functions operate exactly the same.

How to count the number of elements of an RDD in spark?

.count() method is Spark’s action. It counts the number of elements of an RDD. It returns a Long integer. Therefore, we can simply print it out. println(logrdd.count() + " " + f1.count())

What is the use of where () function in spark?

Spark filter () or where () function is used to filter the rows from DataFrame or Dataset based on the given one or multiple conditions or SQL expression. You can use where () operator instead of the filter if you are coming from SQL background.


2 Answers

In this case, what you would like to achieve is to filter at each partition with the data contained in the department table: This would be the basic solution:

val dept = deptRdd.collect.toSet
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)}

If your department data is large, a broadcast variable will improve performance by delivering the data once to all the nodes instead of having to serialize it with each task

val deptBC = sc.broadcast(deptRdd.collect.toSet)
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)}

Although using join would work, it's a very expensive solution as it will require a distributed shuffle of the data (byKey) to achieve the join. Given that the requirement is a simple filter, sending the data to each partition (as shown above) will provide much better performance.

like image 154
maasg Avatar answered Sep 20 '22 18:09

maasg


I finally implemented a solution using a join. I had to add a 0 value to the department to avoid an exception from Spark:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
# invert id and name to get id as the key
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0]))
# add a 0 value to avoid an exception
department = sc.parallelize(department).map(lambda d: (d,0))

employee.join(department).map(lambda e: (e[1][0], e[0])).collect()

output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)]
like image 24
poiuytrez Avatar answered Sep 17 '22 18:09

poiuytrez