Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark EOFError after calling map

I am new to spark & pyspark.

I am reading a small csv file (~40k) into a dataframe.

from pyspark.sql import functions as F
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv')
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF()

I get some weird error that does not occur every single time, but does happen pretty regularly

>>> df2.show(1)
+--------------------+---------+
|            features|    label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row

>>> df2.count()
41999                                                                           
>>> df2.show(1)
+--------------------+---------+
|            features|    label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row

>>> df2.count()
41999                                                                           
>>> df2.show(1)
Traceback (most recent call last):
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker    
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int
    raise EOFError
EOFError
+--------------------+---------+
|            features|    label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row

Once that EOFError has been raised, I will not see it again until I do something that requires interacting with the spark server

When I call df2.count() it shows that [Stage xxx] prompt which is what I mean by it going to the spark server. Anything that triggers that seems to eventually end up giving the EOFError again when I do something with df2.

It does not seem to happen with df (vs. df2) so seems like it must be something happening with the df.map() line.

like image 495
Pete Avatar asked Apr 13 '16 08:04

Pete


2 Answers

Can you please try to do map after converting dataframe into rdd. You are applying map function on a dataframe and then again creating a dataframe from that.Syntax would be like

df.rdd.map().toDF()

Please let me know if it works. Thanks.

like image 193
Manu Gupta Avatar answered Nov 08 '22 20:11

Manu Gupta


I believe you are running Spark 2.x and above. Below code should create your dataframe from csv:

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

then you can have below code:

df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))

and then you can create df2 without Row and toDF()

Let me know if this works or if you are using Spark 1.6...thanks.

like image 1
singhak.bhu Avatar answered Nov 08 '22 20:11

singhak.bhu