I am new to spark & pyspark.
I am reading a small csv file (~40k) into a dataframe.
from pyspark.sql import functions as F
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv')
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF()
I get some weird error that does not occur every single time, but does happen pretty regularly
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
Traceback (most recent call last):
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int
raise EOFError
EOFError
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
Once that EOFError has been raised, I will not see it again until I do something that requires interacting with the spark server
When I call df2.count() it shows that [Stage xxx] prompt which is what I mean by it going to the spark server. Anything that triggers that seems to eventually end up giving the EOFError again when I do something with df2.
It does not seem to happen with df (vs. df2) so seems like it must be something happening with the df.map() line.
Can you please try to do map after converting dataframe into rdd. You are applying map function on a dataframe and then again creating a dataframe from that.Syntax would be like
df.rdd.map().toDF()
Please let me know if it works. Thanks.
I believe you are running Spark 2.x and above. Below code should create your dataframe from csv:
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
then you can have below code:
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
and then you can create df2 without Row and toDF()
Let me know if this works or if you are using Spark 1.6...thanks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With