The code below is working and creates a Spark dataframe from a text file. However, I'm trying to use the header option to use the first column as header and for some reason it doesn't seem to be happening. I cannot understand why! It must be something stupid but I cannot solve this.
>>>from pyspark.sql import SparkSession
>>>spark = SparkSession.builder.master("local").appName("Word Count")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()
>>>df = spark.read.option("header", "true")\
.option("delimiter", ",")\
.option("inferSchema", "true")\
.text("StockData/ETFs/aadr.us.txt")
>>>df.take(3)
Returns the following:
[Row(value=u'Date,Open,High,Low,Close,Volume,OpenInt'), Row(value=u'2010-07-21,24.333,24.333,23.946,23.946,43321,0'), Row(value=u'2010-07-22,24.644,24.644,24.362,24.487,18031,0')]
>>>df.columns
Returns the following:
['value']
text("file_name") to read a file or directory of text files into a Spark DataFrame, and dataframe. write(). text("path") to write to a text file. When reading a text file, each line becomes each row that has string “value” column by default.
Also, like any other file system, we can read and write TEXT, CSV, Avro, Parquet and JSON files into HDFS. Spark RDD natively supports reading text files and later with DataFrame, Spark added different data sources like CSV, JSON, Avro, and Parquet.
In Spark, you can save (write/extract) a DataFrame to a CSV file on disk by using dataframeObj. write. csv("path") , using this you can also write DataFrame to AWS S3, Azure Blob, HDFS, or any Spark supported file systems.
You can write and read files from DBFS with dbutils. Use the dbutils. fs. help() command in databricks to access the help menu for DBFS.
Issue
The issue is that you are using .text
api call instead of .csv
or .load
. If you read the .text api documentation, it says
def text(self, paths): """Loads text files and returns a :class:DataFrame whose schema starts with a string column named "value", and followed by partitioned columns if there are any. Each line in the text file is a new row in the resulting DataFrame. :param paths: string, or list of strings, for input path(s). df = spark.read.text('python/test_support/sql/text-test.txt') df.collect() [Row(value=u'hello'), Row(value=u'this')] """
Solution using .csv
Change the .text
function call to .csv
and you should be fine as
df = spark.read.option("header", "true") \
.option("delimiter", ",") \
.option("inferSchema", "true") \
.csv("StockData/ETFs/aadr.us.txt")
df.show(2, truncate=False)
which should give you
+-------------------+------+------+------+------+------+-------+
|Date |Open |High |Low |Close |Volume|OpenInt|
+-------------------+------+------+------+------+------+-------+
|2010-07-21 00:00:00|24.333|24.333|23.946|23.946|43321 |0 |
|2010-07-22 00:00:00|24.644|24.644|24.362|24.487|18031 |0 |
+-------------------+------+------+------+------+------+-------+
Solution using .load
.load
would assume the file to be of parquet format if a format option is not defined. So you would need a format option to be defined as well
df = spark.read\
.format("com.databricks.spark.csv")\
.option("header", "true") \
.option("delimiter", ",") \
.option("inferSchema", "true") \
.load("StockData/ETFs/aadr.us.txt")
df.show(2, truncate=False)
I hope the answer is helpful
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With