Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use dataframes within a map function in Spark?

Definitions:

  • sampleDF is sample dataframe that has a list record for look up purposes.
  • sampleDS is a RDD with list of elements in it.
  • mappingFunction is to look up the elements of sampleDS in sampleDF and map them to 1 if they exist in sampleDF and to 0 if they don't.

I have a mapping function as follows:

def mappingFunction(element):
    # The dataframe lookup!
    lookupResult = sampleDF.filter(sampleDF[0] == element).collect()
    if len(lookupResult) > 0:
        print lookupResult
        return 1
    return 0

The problem:

Accessing sampleDF outside of the mapping function works perfectly fine but as soon as I use it inside the function I get the following error:

py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
    at py4j.Gateway.invoke(Gateway.java:252)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:744)

What else I tried:

I did try saving a temporary table and using the sqlContext select inside the map function and still couldn't get this to work. This is the error I get:

  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.6/pickle.py", line 686, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.6/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/spark/python/pyspark/cloudpickle.py", line 542, in save_reduce
    save(state)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.6/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: 'JavaPackage' object is not callable

What I am asking for:

I have tried to simplify my problem through simple example. Any help on how to use a dataframe inside a map function is highly appreciated.

like image 275
AmirHd Avatar asked Oct 18 '22 13:10

AmirHd


1 Answers

It is not possible. Spark doesn't support nested operations on distributed data structures (RDDs, DataFrames, Datasets). Even if it did executing large number of jobs wouldn't be a good idea. Given the code you've shown you probably want to convert your RDD to a DataFrame and perform join wit

(rdd.map(x => (x, )).toDF(["element"])
  .join(sampleDF, sampleDF[0] == df[0])
  .groupBy("element")
  .agg(count("element") > 0))

On a side note printing inside map is completely useless not mention it add additional IO overhead.

like image 65
zero323 Avatar answered Oct 21 '22 06:10

zero323