Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Load CSV file with Spark

People also ask

How do I read a CSV file in Spark Databricks?

Apache PySpark provides the "csv("path")" for reading a CSV file into the Spark DataFrame and the "dataframeObj. write. csv("path")" for saving or writing to the CSV file. The Apache PySpark supports reading the pipe, comma, tab, and other delimiters/separator files.

How do I read a CSV file in HDFS Spark?

Use textFile() and wholeTextFiles() method of the SparkContext to read files from any file system and to read from HDFS, you need to provide the hdfs path as an argument to the function. If you wanted to read a text file from an HDFS into DataFrame.


Spark 2.0.0+

You can use built-in csv data source directly:

spark.read.csv(
    "some_input_file.csv", 
    header=True, 
    mode="DROPMALFORMED", 
    schema=schema
)

or

(
    spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv")
)

without including any external dependencies.

Spark < 2.0.0:

Instead of manual parsing, which is far from trivial in a general case, I would recommend spark-csv:

Make sure that Spark CSV is included in the path (--packages, --jars, --driver-class-path)

And load your data as follows:

df = (
    sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv")
)

It can handle loading, schema inference, dropping malformed lines and doesn't require passing data from Python to the JVM.

Note:

If you know the schema, it is better to avoid schema inference and pass it to DataFrameReader. Assuming you have three columns - integer, double and string:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(
    sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv")
)

Are you sure that all the lines have at least 2 columns? Can you try something like, just to check?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

Alternatively, you could print the culprit (if any):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")

print(df.collect())

And yet another option which consist in reading the CSV file using Pandas and then importing the Pandas DataFrame into Spark.

For example:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)