Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataframe validating column names for parquet writes

I'm processing events using Dataframes converted from a stream of JSON events which eventually gets written out as Parquet format.

However, some of the JSON events contains spaces in the keys which I want to log and filter/drop such events from the data frame before converting it to Parquet because ;{}()\n\t= are considered special characters in Parquet schema (CatalystSchemaConverter) as listed in [1] below and thus should not be allowed in the column names.

How can I do such validations in Dataframe on the column names and drop such an event altogether without erroring out the Spark Streaming job.

[1] Spark's CatalystSchemaConverter

def checkFieldName(name: String): Unit = {
  // ,;{}()\n\t= and space are special characters in Parquet schema
  checkConversionRequirement(
    !name.matches(".*[ ,;{}()\n\t=].*"),
    s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
             |Please use alias to rename it.
           """.stripMargin.split("\n").mkString(" ").trim
  )
}
like image 616
codehammer Avatar asked Jul 04 '16 19:07

codehammer


People also ask

How do I display DataFrame column names in Spark?

You can get the all columns of a Spark DataFrame by using df. columns , it returns an array of column names as Array[Stirng] .

Is Parquet column names case sensitive?

Although Spark SQL itself is not case-sensitive, Hive compatible file formats such as Parquet are. Spark SQL must use a case-preserving schema when querying any table backed by files containing case-sensitive field names or queries may not return accurate results.

How do I create a Spark DataFrame with column names?

To do this first create a list of data and a list of column names. Then pass this zipped data to spark. createDataFrame() method. This method is used to create DataFrame.


2 Answers

For everyone experiencing this in pyspark: this even happened to me after renaming the columns. One way I could get this to work after some iterations is this:

file = "/opt/myfile.parquet"
df = spark.read.parquet(file)
for c in df.columns:
    df = df.withColumnRenamed(c, c.replace(" ", ""))

df = spark.read.schema(df.schema).parquet(file)

like image 51
Jan C. Schäfer Avatar answered Sep 22 '22 03:09

Jan C. Schäfer


You can use a regex to replace all invalid characters with an underscore before you write into parquet. Additionally, strip accents from the column names too.

Here's a function normalize that do this for both Scala and Python :

Scala

/**
  * Normalize column name by replacing invalid characters with underscore
  * and strips accents
  *
  * @param columns dataframe column names list
  * @return the list of normalized column names
  */
def normalize(columns: Seq[String]): Seq[String] = {
  columns.map { c =>
    org.apache.commons.lang3.StringUtils.stripAccents(c.replaceAll("[ ,;{}()\n\t=]+", "_"))
  }
}

// using the function
val df2 = df.toDF(normalize(df.columns):_*)

Python

import unicodedata
import re

def normalize(column: str) -> str:
    """
    Normalize column name by replacing invalid characters with underscore
    strips accents and make lowercase
    :param column: column name
    :return: normalized column name
    """
    n = re.sub(r"[ ,;{}()\n\t=]+", '_', column.lower())
    return unicodedata.normalize('NFKD', n).encode('ASCII', 'ignore').decode()


# using the function
df = df.toDF(*map(normalize, df.columns))

like image 13
blackbishop Avatar answered Sep 24 '22 03:09

blackbishop