Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark 2.0.x dump a csv file from a dataframe containing one array of type string

I have a dataframe df that contains one column of type array

df.show() looks like

|ID|ArrayOfString|Age|Gender| +--+-------------+---+------+ |1 | [A,B,D]     |22 | F    | |2 | [A,Y]       |42 | M    | |3 | [X]         |60 | F    | +--+-------------+---+------+ 

I try to dump that df in a csv file as follow:

val dumpCSV = df.write.csv(path="/home/me/saveDF") 

It is not working because of the column ArrayOfString. I get the error:

CSV data source does not support array string data type

The code works if I remove the column ArrayOfString. But I need to keep ArrayOfString!

What would be the best way to dump the csv dataframe including column ArrayOfString (ArrayOfString should be dumped as one column on the CSV file)

like image 346
S12000 Avatar asked Nov 04 '16 15:11

S12000


People also ask

Does CSV support array?

pyspark. sql. utils. AnalysisException: CSV data source does not support array<bigint> data type.

How do I convert a Spark DataFrame to a CSV file?

In Spark/PySpark, you can save (write/extract) a DataFrame to a CSV file on disk by using dataframeObj. write. csv("path") , using this you can also write DataFrame to AWS S3, Azure Blob, HDFS, or any Spark supported file systems.

What is inferSchema?

inferSchema -> Infer schema will automatically guess the data types for each field. If we set this option to TRUE, the API will read some sample records from the file to infer the schema. If we want to set this value to false, we must specify a schema explicitly.

What is withColumn Pyspark?

PySpark withColumn() is a transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more.


2 Answers

The reason why you are getting this error is that csv file format doesn't support array types, you'll need to express it as a string to be able to save.

Try the following :

import org.apache.spark.sql.functions._  val stringify = udf((vs: Seq[String]) => vs match {   case null => null   case _    => s"""[${vs.mkString(",")}]""" })  df.withColumn("ArrayOfString", stringify($"ArrayOfString")).write.csv(...) 

or

import org.apache.spark.sql.Column  def stringify(c: Column) = concat(lit("["), concat_ws(",", c), lit("]"))  df.withColumn("ArrayOfString", stringify($"ArrayOfString")).write.csv(...) 
like image 185
4 revs, 3 users 60% Avatar answered Sep 19 '22 01:09

4 revs, 3 users 60%


Pyspark implementation.

In this example, change the field column_as_array to column_as_string before saving.

from pyspark.sql.functions import udf from pyspark.sql.types import StringType  def array_to_string(my_list):     return '[' + ','.join([str(elem) for elem in my_list]) + ']'  array_to_string_udf = udf(array_to_string, StringType())  df = df.withColumn('column_as_str', array_to_string_udf(df["column_as_array"])) 

Then you can drop the old column (array type) before saving.

df.drop("column_as_array").write.csv(...) 
like image 40
plfrick Avatar answered Sep 21 '22 01:09

plfrick