Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to add extra metadata for Spark dataframes?

Is it possible to add extra meta data to DataFrames?

Reason

I have Spark DataFrames for which I need to keep extra information. Example: A DataFrame, for which I want to "remember" the highest used index in an Integer id column.

Current solution

I use a separate DataFrame to store this information. Of course, keeping this information separately is tedious and error-prone.

Is there a better solution to store such extra information on DataFrames?

like image 519
Martin Senne Avatar asked Sep 17 '15 11:09

Martin Senne


People also ask

How do you add data to a DataFrame in Spark?

Here we create an empty DataFrame where data is to be added, then we convert the data to be added into a Spark DataFrame using createDataFrame() and further convert both DataFrames to a Pandas DataFrame using toPandas() and use the append() function to add the non-empty data frame to the empty DataFrame and ignore the ...

How many rows of data can Spark handle?

By default Spark with Scala, Java, or with Python (PySpark), fetches only 20 rows from DataFrame show() but not all rows and the column value is truncated to 20 characters, In order to fetch/display more than 20 rows and column full value from Spark/PySpark DataFrame, you need to pass arguments to the show() method.


2 Answers

To expand and Scala-fy nealmcb's answer (the question was tagged scala, not python, so I don't think this answer will be off-topic or redundant), suppose you have a DataFrame:

import org.apache.spark.sql
val df = sc.parallelize(Seq.fill(100) { scala.util.Random.nextInt() }).toDF("randInt")

And some way to get the max or whatever you want to memoize on the DataFrame:

val randIntMax = df.rdd.map { case sql.Row(randInt: Int) => randInt }.reduce(math.max)

sql.types.Metadata can only hold strings, booleans, some types of numbers, and other metadata structures. So we have to use a Long:

val metadata = new sql.types.MetadataBuilder().putLong("columnMax", randIntMax).build()

DataFrame.withColumn() actually has an overload that permits supplying a metadata argument at the end, but it's inexplicably marked [private], so we just do what it does — use Column.as(alias, metadata):

val newColumn = df.col("randInt").as("randInt_withMax", metadata)
val dfWithMax = df.withColumn("randInt_withMax", newColumn)

dfWithMax now has (a column with) the metadata you want!

dfWithMax.schema.foreach(field => println(s"${field.name}: metadata=${field.metadata}"))
> randInt: metadata={}
> randInt_withMax: metadata={"columnMax":2094414111}

Or programmatically and type-safely (sort of; Metadata.getLong() and others do not return Option and may throw a "key not found" exception):

dfWithMax.schema("randInt_withMax").metadata.getLong("columnMax")
> res29: Long = 209341992

Attaching the max to a column makes sense in your case, but in the general case of attaching metadata to a DataFrame and not a column in particular, it appears you'd have to take the wrapper route described by the other answers.

like image 125
chbrown Avatar answered Sep 22 '22 20:09

chbrown


As of Spark 1.2, StructType schemas have a metadata attribute which can hold an arbitrary mapping / dictionary of information for each Column in a Dataframe. E.g. (when used with the separate spark-csv library):

customSchema = StructType([
  StructField("cat_id", IntegerType(), True,
    {'description': "Unique id, primary key"}),
  StructField("cat_title", StringType(), True,
    {'description': "Name of the category, with underscores"}) ])

categoryDumpDF = (sqlContext.read.format('com.databricks.spark.csv')
 .options(header='false')
 .load(csvFilename, schema = customSchema) )

f = categoryDumpDF.schema.fields
["%s (%s): %s" % (t.name, t.dataType, t.metadata) for t in f]

["cat_id (IntegerType): {u'description': u'Unique id, primary key'}",
 "cat_title (StringType): {u'description': u'Name of the category, with underscores.'}"]

This was added in [SPARK-3569] Add metadata field to StructField - ASF JIRA, and designed for use in Machine Learning pipelines to track information about the features stored in columns, like categorical/continuous, number categories, category-to-index map. See the SPARK-3569: Add metadata field to StructField design document.

I'd like to see this used more widely, e.g. for descriptions and documentation of columns, the unit of measurement used in the column, coordinate axis information, etc.

Issues include how to appropriately preserve or manipulate the metadata information when the column is transformed, how to handle multiple sorts of metadata, how to make it all extensible, etc.

For the benefit of those thinking of expanding this functionality in Spark dataframes, I reference some analogous discussions around Pandas.

For example, see xray - bring the labeled data power of pandas to the physical sciences which supports metadata for labeled arrays.

And see the discussion of metadata for Pandas at Allow custom metadata to be attached to panel/df/series? · Issue #2485 · pydata/pandas.

See also discussion related to units: ENH: unit of measurement / physical quantities · Issue #10349 · pydata/pandas

like image 30
nealmcb Avatar answered Sep 22 '22 20:09

nealmcb