Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark How to read CSV into Dataframe, and manipulate it

I'm quite new to pyspark and am trying to use it to process a large dataset which is saved as a csv file. I'd like to read CSV file into spark dataframe, drop some columns, and add new columns. How should I do that?

I am having trouble getting this data into a dataframe. This is a stripped down version of what I have so far:

def make_dataframe(data_portion, schema, sql):
    fields = data_portion.split(",")
    return sql.createDateFrame([(fields[0], fields[1])], schema=schema)

if __name__ == "__main__":
    sc = SparkContext(appName="Test")
    sql = SQLContext(sc)

    ...

    big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
                .reduce(lambda a, b: a.union(b))

    big_frame.write \
        .format("com.databricks.spark.redshift") \
        .option("url", "jdbc:redshift://<...>") \
        .option("dbtable", "my_table_copy") \
        .option("tempdir", "s3n://path/for/temp/data") \
        .mode("append") \
        .save()

    sc.stop()

This produces an error TypeError: 'JavaPackage' object is not callable at the reduce step.

Is it possible to do this? The idea with reducing to a dataframe is to be able to write the resulting data to a database (Redshift, using the spark-redshift package).

I have also tried using unionAll(), and map() with partial() but can't get it to work.

I am running this on Amazon's EMR, with spark-redshift_2.10:2.0.0, and Amazon's JDBC driver RedshiftJDBC41-1.1.17.1017.jar.

like image 538
Tim B Avatar asked Oct 30 '16 09:10

Tim B


1 Answers

Update - answering also your question in comments:

Read data from CSV to dataframe: It seems that you only try to read CSV file into a spark dataframe.

If so - my answer here: https://stackoverflow.com/a/37640154/5088142 cover this.

The following code should read CSV into a spark-data-frame

import pyspark
sc = pyspark.SparkContext()
sql = SQLContext(sc)

df = (sql.read
         .format("com.databricks.spark.csv")
         .option("header", "true")
         .load("/path/to_csv.csv"))

// these lines are equivalent in Spark 2.0 - using [SparkSession][1]
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark.read.format("csv").option("header", "true").load("/path/to_csv.csv") 
spark.read.option("header", "true").csv("/path/to_csv.csv")

drop column

you can drop column using "drop(col)" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

drop(col)

Returns a new DataFrame that drops the specified column.
Parameters: col – a string name of the column to drop, or a Column to drop.

>>> df.drop('age').collect()
[Row(name=u'Alice'), Row(name=u'Bob')]

>>> df.drop(df.age).collect()
[Row(name=u'Alice'), Row(name=u'Bob')]

>>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
[Row(age=5, height=85, name=u'Bob')]

>>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
[Row(age=5, name=u'Bob', height=85)]

add column You can use "withColumn" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

withColumn(colName, col)

Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
Parameters: 

    colName – string, name of the new column.
    col – a Column expression for the new column.

>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]

Note: spark has a lot of other functions which can be used (e.g. you can use "select" instead of "drop")

like image 103
Yaron Avatar answered Sep 30 '22 15:09

Yaron