I'm using geoip2's python library and pySpark to get the geographical address of some IPs. My code is like:
geoDBpath = 'somePath/geoDB/GeoLite2-City.mmdb'
geoPath = os.path.join(geoDBpath)
sc.addFile(geoPath)
reader = geoip2.database.Reader(SparkFiles.get(geoPath))
def ip2city(ip):
try:
city = reader.city(ip).city.name
except:
city = 'not found'
return city
I tried
print ip2city("128.101.101.101")
It works. But when I tried to do this in rdd.map:
rdd = sc.parallelize([ip1, ip2, ip3, ip3, ...])
print rdd.map(lambda x: ip2city(x))
It reported
Traceback (most recent call last):
File "/home/worker/software/spark/python/pyspark/rdd.py", line 1299, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/home/worker/software/spark/python/pyspark/context.py", line 916, in runJob
port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "/home/worker/software/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/home/worker/software/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
TypeError: Required argument 'fileno' (pos 1) not found
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Could any one tell my how to make the ip2city function work in rdd.map(). Thanks!
Solution: PySpark SQL function create_map() is used to convert selected DataFrame columns to MapType , create_map() takes a list of columns you wanted to convert as an argument and returns a MapType column.
In PySpark, the map (map()) is defined as the RDD transformation that is widely used to apply the transformation function (Lambda) on every element of Resilient Distributed Datasets(RDD) or DataFrame and further returns a new Resilient Distributed Dataset(RDD).
It looks like the problem with your code comes from a reader
object. It cannot be correctly serialized as a part of a closure and send to the workers.To deal with this you have instantiate it on the workers. One way you can handle this is to use mapPartitions
:
from pyspark import SparkFiles
geoDBpath = 'GeoLite2-City.mmdb'
sc.addFile(geoDBpath)
def partitionIp2city(iter):
from geoip2 import database
def ip2city(ip):
try:
city = reader.city(ip).city.name
except:
city = 'not found'
return city
reader = database.Reader(SparkFiles.get(geoDBpath))
return [ip2city(ip) for ip in iter]
rdd = sc.parallelize(['128.101.101.101', '85.25.43.84'])
rdd.mapPartitions(partitionIp2city).collect()
## ['Minneapolis', None]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With