Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating a custom Spark RDD in Python

Is it possible to extend Spark's RDDs in Python to add custom operators? If it's not possible, how can one wrap Scala code for a class that extends an RDD, such as the one here: http://blog.madhukaraphatak.com/extending-spark-api/

Edit: I am trying to create a new RDD, say PersonRDD and add a set of new operators on the PersonRDD, ex. PersonRDD.computeMedianIncome(). According to the link below, it is not trivial to do that in Python. However, since it's an old thread, I was wondering whether there were any new updates on that. If not, I would like to use Scala to do it, but I am not sure how to call the class from Python using Py4J ( mail-archives.us.apache.org/mod_mbox/spark-user/201308.mbox/…)

Any advice or help would be greatly appreciated.

Mandy

like image 990
mandy Avatar asked Sep 28 '22 08:09

mandy


1 Answers

Computing exact median in a distributed environment takes some effort so lets say you want something like square all values in a RDD. Let's call this method squares and assume it should work as follows:

assert rdd.squares().collect() == rdd.map(lambda x: x * x).collect()

1. Modify pyspark.RDD definition:

from pyspark import RDD

def squares(self):
    return self.map(lambda x: x * x)

RDD.squares = squares
rdd = sc.parallelize([1, 2, 3])
assert rdd.squares().collect() == [1, 4, 9]

Note: If you modify class definition every instance will get access to the squares.

2. Create RDD subclass:

class RDDWithSquares(RDD):
    def squares(self):
        return self.map(lambda x: x * x)

rdd = sc.parallelize([1, 2, 3])
rdd.__class__ = RDDWithSquares # WARNING: see a comment below

Assigning a class is a dirty hack so in practice you should create a RDD in a proper way (see for example context.parallelize implementation).

3. Add method to an instance

import types

rdd = sc.parallelize([1, 2, 3])
# Reusing squares function defined above
rdd.squares = types.MethodType(squares, rdd)

Disclaimer

First of all I haven't tested any of these long enough to be sure there no hidden problems there.

Moreover I don't think it is really worth all the fuss. Without static type checking it is really hard to find any benefits and you can obtain a similar result using functions, currying, and pipes in a much cleaner way.

from toolz import pipe
pipe(
    sc.parallelize([1, 2, 3]),
    squares,
    lambda rdd: rdd.collect())
like image 113
zero323 Avatar answered Oct 04 '22 17:10

zero323