Code:
views = sdf \
.where(sdf['PRODUCT_ID'].isin(PRODUCTS)) \
.rdd \
.groupBy(lambda x: x['SESSION_ID']) \
.toLocalIterator()
for sess_id, rows in views:
# do something
PRODUCTS
is a set
. It is large, about 10000 items.
The code fails with:
--> 9 for sess_id, rows in views:
/usr/local/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer)
--> 142 for item in serializer.load_stream(rf):
/usr/local/spark/python/pyspark/serializers.py in load_stream(self, stream)
--> 139 yield self._read_with_length(stream)
/usr/local/spark/python/pyspark/serializers.py in _read_with_length(self, stream)
--> 156 length = read_int(stream)
/usr/local/spark/python/pyspark/serializers.py in read_int(stream)
--> 543 length = stream.read(4)
/opt/conda/lib/python3.5/socket.py in readinto(self, b)
574 try:
--> 575 return self._sock.recv_into(b)
576 except timeout:
577 self._timeout_occurred = True
timeout: timed out
But when I make PRODUCTS
set smaller everything is alright. I tried to change some timeout values in Spark configuration. It didn't help. How to avoid such crashes?
UPDATE
PRODUCTS = sdf.sort(['TIMESTAMP']).select('PRODUCT_ID').limit(10000).drop_duplicates()
views = sdf \
.join(PRODUCTS, 'PRODUCT_ID', 'inner') \
.rdd \
.groupBy(lambda x: x['SESSION_ID']) \
.toLocalIterator()
for sess_id, rows in views:
# do ...
Now PRODUCTS
is a dataframe. And I use join
. Got the same error..
UPDATE 2
Trying this solution:
views = sdf \
.join(PRODUCTS, 'PRODUCT_ID', 'inner') \
.rdd \
.groupBy(lambda x: x['SESSION_ID'])
views.cache()
for sess_id, rows in views.toLocalIterator():
pass
After some time got a very long error:
Py4JJavaError: An error occurred while calling o289.javaToPython.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
....
This error appeared only once! Now I get the same timeout exceptions!
I believe this is due to basically a bug in the implementation of toLocalIterator()
in pyspark 2.0.2
. You can read more here: [SPARK-18281][SQL][PySpark] Remove timeout for reading data through socket for local iterator.
It seems the fix will be available in the next update after 2.0.2
and in the 2.1.x
release. If you want to fix it yourself temporarily, you can apply the changes from the issue above:
Replace this around line 138 of rdd.py
(on the actual spark cluster, it seems you need to update the rdd.py
inside pyspark.zip
:
try:
rf = sock.makefile("rb", 65536)
for item in serializer.load_stream(rf):
yield item
finally:
sock.close()
with this:
sock.settimeout(None) # << this is they key line that disables timeout after the initial connection
return serializer.load_stream(sock.makefile("rb", 65536))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With