Is it possible to extend Spark's RDDs in Python to add custom operators? If it's not possible, how can one wrap Scala code for a class that extends an RDD, such as the one here: http://blog.madhukaraphatak.com/extending-spark-api/
Edit: I am trying to create a new RDD, say PersonRDD and add a set of new operators on the PersonRDD, ex. PersonRDD.computeMedianIncome(). According to the link below, it is not trivial to do that in Python. However, since it's an old thread, I was wondering whether there were any new updates on that. If not, I would like to use Scala to do it, but I am not sure how to call the class from Python using Py4J ( mail-archives.us.apache.org/mod_mbox/spark-user/201308.mbox/…)
Any advice or help would be greatly appreciated.
Mandy
Computing exact median in a distributed environment takes some effort so lets say you want something like square all values in a RDD. Let's call this method squares
and assume it should work as follows:
assert rdd.squares().collect() == rdd.map(lambda x: x * x).collect()
pyspark.RDD
definition:from pyspark import RDD
def squares(self):
return self.map(lambda x: x * x)
RDD.squares = squares
rdd = sc.parallelize([1, 2, 3])
assert rdd.squares().collect() == [1, 4, 9]
Note: If you modify class definition every instance will get access to the squares
.
class RDDWithSquares(RDD):
def squares(self):
return self.map(lambda x: x * x)
rdd = sc.parallelize([1, 2, 3])
rdd.__class__ = RDDWithSquares # WARNING: see a comment below
Assigning a class is a dirty hack so in practice you should create a RDD in a proper way (see for example context.parallelize implementation).
import types
rdd = sc.parallelize([1, 2, 3])
# Reusing squares function defined above
rdd.squares = types.MethodType(squares, rdd)
First of all I haven't tested any of these long enough to be sure there no hidden problems there.
Moreover I don't think it is really worth all the fuss. Without static type checking it is really hard to find any benefits and you can obtain a similar result using functions, currying, and pipes
in a much cleaner way.
from toolz import pipe
pipe(
sc.parallelize([1, 2, 3]),
squares,
lambda rdd: rdd.collect())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With