I would like to keep only the employees which does have a departement ID referenced in the second table.
Employee table
LastName DepartmentID
Rafferty 31
Jones 33
Heisenberg 33
Robinson 34
Smith 34
Department table
DepartmentID
31
33
I have tried the following code which does not work:
employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
employee.filter(lambda e: e[1] in department).collect()
Py4JError: An error occurred while calling o344.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
Any ideas? I am using Spark 1.1.0 with Python. However, I would accept a Scala or Python answer.
Spark RDD Filter : RDD.filter () method returns an RDD with those elements which pass a filter condition (function) that is given as argument to the method. In this tutorial, we learn to filter RDD containing Integers, and an RDD containing Tuples, with example programs. Steps to apply filter to Spark RDD To apply filter to Spark RDD,
Spark filter () or where () function is used to filter the rows from DataFrame or Dataset based on the given one or multiple conditions or SQL expression. You can use where () operator instead of the filter if you are coming from SQL background. Both these functions operate exactly the same.
.count() method is Spark’s action. It counts the number of elements of an RDD. It returns a Long integer. Therefore, we can simply print it out. println(logrdd.count() + " " + f1.count())
Spark filter () or where () function is used to filter the rows from DataFrame or Dataset based on the given one or multiple conditions or SQL expression. You can use where () operator instead of the filter if you are coming from SQL background.
In this case, what you would like to achieve is to filter at each partition with the data contained in the department table: This would be the basic solution:
val dept = deptRdd.collect.toSet
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)}
If your department data is large, a broadcast variable will improve performance by delivering the data once to all the nodes instead of having to serialize it with each task
val deptBC = sc.broadcast(deptRdd.collect.toSet)
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)}
Although using join would work, it's a very expensive solution as it will require a distributed shuffle of the data (byKey) to achieve the join. Given that the requirement is a simple filter, sending the data to each partition (as shown above) will provide much better performance.
I finally implemented a solution using a join. I had to add a 0 value to the department to avoid an exception from Spark:
employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
# invert id and name to get id as the key
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0]))
# add a 0 value to avoid an exception
department = sc.parallelize(department).map(lambda d: (d,0))
employee.join(department).map(lambda e: (e[1][0], e[0])).collect()
output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With