Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark Column.isin() for a large set

Code:

views = sdf \
    .where(sdf['PRODUCT_ID'].isin(PRODUCTS)) \
    .rdd \
    .groupBy(lambda x: x['SESSION_ID']) \
    .toLocalIterator()

for sess_id, rows in views:
    # do something

PRODUCTS is a set. It is large, about 10000 items.

The code fails with:

--> 9 for sess_id, rows in views:

/usr/local/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer)
--> 142         for item in serializer.load_stream(rf):

/usr/local/spark/python/pyspark/serializers.py in load_stream(self, stream)
--> 139                 yield self._read_with_length(stream)

/usr/local/spark/python/pyspark/serializers.py in _read_with_length(self, stream)
--> 156         length = read_int(stream)

/usr/local/spark/python/pyspark/serializers.py in read_int(stream)
--> 543     length = stream.read(4)

/opt/conda/lib/python3.5/socket.py in readinto(self, b)
    574             try:
--> 575                 return self._sock.recv_into(b)
    576             except timeout:
    577                 self._timeout_occurred = True

timeout: timed out

But when I make PRODUCTS set smaller everything is alright. I tried to change some timeout values in Spark configuration. It didn't help. How to avoid such crashes?

UPDATE

PRODUCTS = sdf.sort(['TIMESTAMP']).select('PRODUCT_ID').limit(10000).drop_duplicates()

views = sdf \
    .join(PRODUCTS, 'PRODUCT_ID', 'inner') \
    .rdd \
    .groupBy(lambda x: x['SESSION_ID']) \
    .toLocalIterator()

for sess_id, rows in views:
    # do ...

Now PRODUCTS is a dataframe. And I use join. Got the same error..

UPDATE 2

Trying this solution:

views = sdf \
    .join(PRODUCTS, 'PRODUCT_ID', 'inner') \
    .rdd \
    .groupBy(lambda x: x['SESSION_ID'])
views.cache()

for sess_id, rows in views.toLocalIterator():
    pass

After some time got a very long error:

Py4JJavaError: An error occurred while calling o289.javaToPython.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
....

This error appeared only once! Now I get the same timeout exceptions!

like image 584
Leonid Avatar asked Aug 17 '16 15:08

Leonid


1 Answers

I believe this is due to basically a bug in the implementation of toLocalIterator() in pyspark 2.0.2. You can read more here: [SPARK-18281][SQL][PySpark] Remove timeout for reading data through socket for local iterator.

It seems the fix will be available in the next update after 2.0.2and in the 2.1.x release. If you want to fix it yourself temporarily, you can apply the changes from the issue above:

Replace this around line 138 of rdd.py (on the actual spark cluster, it seems you need to update the rdd.py inside pyspark.zip:

try:
    rf = sock.makefile("rb", 65536)
    for item in serializer.load_stream(rf):
        yield item
finally:
    sock.close()

with this:

sock.settimeout(None)  # << this is they key line that disables timeout after the initial connection
return serializer.load_stream(sock.makefile("rb", 65536))
like image 159
KobeJohn Avatar answered Oct 06 '22 11:10

KobeJohn