I'm trying to build a Spark DataFrame from a simple Pandas DataFrame. This are the steps I follow.
import pandas as pd
pandas_df = pd.DataFrame({"Letters":["X", "Y", "Z"]})
spark_df = sqlContext.createDataFrame(pandas_df)
spark_df.printSchema()
Till' this point everything is OK. The output is:
root
|-- Letters: string (nullable = true)
The problem comes when I try to print the DataFrame:
spark_df.show()
This is the result:
An error occurred while calling o158.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 5, localhost, executor driver): org.apache.spark.SparkException:
Error from python worker:
Error executing Jupyter command 'pyspark.daemon': [Errno 2] No such file or directory PYTHONPATH was:
/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip:/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip:/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/jars/spark-core_2.11-2.4.0.jar:/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip:/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/python/: org.apache.spark.SparkException: No port number in pyspark.daemon's stdout
These are my Spark specifications:
SparkSession - hive
SparkContext
Spark UI
Version: v2.4.0
Master: local[*]
AppName: PySparkShell
This are my venv:
export PYSPARK_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='lab'
Fact:
As the error mentions, it has to do with running pyspark from Jupyter. Running it with 'PYSPARK_PYTHON=python2.7' and 'PYSPARK_PYTHON=python3.6' works fine
pandas-on-Spark DataFrame and Spark DataFrame are virtually interchangeable. However, note that a new default index is created when pandas-on-Spark DataFrame is created from Spark DataFrame. See Default Index Type. In order to avoid this overhead, specify the column to use as an index when possible.
Import and initialise findspark, create a spark session and then use the object to convert the pandas data frame to a spark data frame. Then add the new spark data frame to the catalogue. Tested and runs in both Jupiter 5.7.2 and Spyder 3.3.2 with python 3.6.6.
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
# Create a spark session
spark = SparkSession.builder.getOrCreate()
# Create pandas data frame and convert it to a spark data frame
pandas_df = pd.DataFrame({"Letters":["X", "Y", "Z"]})
spark_df = spark.createDataFrame(pandas_df)
# Add the spark data frame to the catalog
spark_df.createOrReplaceTempView('spark_df')
spark_df.show()
+-------+
|Letters|
+-------+
| X|
| Y|
| Z|
+-------+
spark.catalog.listTables()
Out[18]: [Table(name='spark_df', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With