Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Geoip2's python library doesn't work in pySpark's map function

I'm using geoip2's python library and pySpark to get the geographical address of some IPs. My code is like:

geoDBpath = 'somePath/geoDB/GeoLite2-City.mmdb'
geoPath = os.path.join(geoDBpath)
sc.addFile(geoPath)
reader = geoip2.database.Reader(SparkFiles.get(geoPath))
def ip2city(ip):
    try:
        city = reader.city(ip).city.name
    except:
        city = 'not found'
    return city

I tried

print ip2city("128.101.101.101")

It works. But when I tried to do this in rdd.map:

rdd = sc.parallelize([ip1, ip2, ip3, ip3, ...])
print rdd.map(lambda x: ip2city(x))

It reported

    Traceback (most recent call last):
  File "/home/worker/software/spark/python/pyspark/rdd.py", line 1299, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/home/worker/software/spark/python/pyspark/context.py", line 916, in runJob
    port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/home/worker/software/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/home/worker/software/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
TypeError: Required argument 'fileno' (pos 1) not found

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Could any one tell my how to make the ip2city function work in rdd.map(). Thanks!

like image 729
Dong Avatar asked Nov 16 '15 22:11

Dong


People also ask

How do you map columns in PySpark?

Solution: PySpark SQL function create_map() is used to convert selected DataFrame columns to MapType , create_map() takes a list of columns you wanted to convert as an argument and returns a MapType column.

What does map function do in PySpark?

In PySpark, the map (map()) is defined as the RDD transformation that is widely used to apply the transformation function (Lambda) on every element of Resilient Distributed Datasets(RDD) or DataFrame and further returns a new Resilient Distributed Dataset(RDD).


1 Answers

It looks like the problem with your code comes from a reader object. It cannot be correctly serialized as a part of a closure and send to the workers.To deal with this you have instantiate it on the workers. One way you can handle this is to use mapPartitions:

from pyspark import SparkFiles

geoDBpath = 'GeoLite2-City.mmdb'
sc.addFile(geoDBpath)

def partitionIp2city(iter):
    from geoip2 import database

    def ip2city(ip):
        try:
           city = reader.city(ip).city.name
        except:
            city = 'not found'
        return city

    reader = database.Reader(SparkFiles.get(geoDBpath))
    return [ip2city(ip) for ip in iter]

rdd = sc.parallelize(['128.101.101.101', '85.25.43.84'])
rdd.mapPartitions(partitionIp2city).collect()

## ['Minneapolis', None]
like image 152
zero323 Avatar answered Oct 13 '22 06:10

zero323