sampleDF is sample dataframe that has a list record for look up purposes.sampleDS is a RDD with list of elements in it.mappingFunction is to look up the elements of sampleDS in sampleDF and map them to 1 if they exist in sampleDF and to 0 if they don't.I have a mapping function as follows:
def mappingFunction(element):
# The dataframe lookup!
lookupResult = sampleDF.filter(sampleDF[0] == element).collect()
if len(lookupResult) > 0:
print lookupResult
return 1
return 0
Accessing sampleDF outside of the mapping function works perfectly fine but as soon as I use it inside the function I get the following error:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:744)
I did try saving a temporary table and using the sqlContext select inside the map function and still couldn't get this to work. This is the error I get:
File "/usr/lib64/python2.6/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.6/pickle.py", line 686, in _batch_setitems
save(v)
File "/usr/lib64/python2.6/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 542, in save_reduce
save(state)
File "/usr/lib64/python2.6/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib64/python2.6/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: 'JavaPackage' object is not callable
I have tried to simplify my problem through simple example. Any help on how to use a dataframe inside a map function is highly appreciated.
It is not possible. Spark doesn't support nested operations on distributed data structures (RDDs, DataFrames, Datasets). Even if it did executing large number of jobs wouldn't be a good idea. Given the code you've shown you probably want to convert your RDD to a DataFrame and perform join wit
(rdd.map(x => (x, )).toDF(["element"])
.join(sampleDF, sampleDF[0] == df[0])
.groupBy("element")
.agg(count("element") > 0))
On a side note printing inside map is completely useless not mention it add additional IO overhead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With