Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implicit schema discovery on a JSON-formatted Spark DataFrame column

I'm writing an ETL Spark (2.4) job in Scala reading ;-separated CSV files with a glob pattern on S3. The data is loaded in a DataFrame and contains a column (let's say it is named custom) with a JSON-formatted string (multiple levels of nesting). The goal is to automatically infer the schema from that column so that it can be structured for a write sink on Parquet files back in S3.

This post (How to query JSON data column using Spark DataFrames?) suggests schema_of_json from Spark 2.4 can infer the schema from a JSON-formatted column or string.

Here is what I tried:

val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

df.withColumn(
    "nestedCustom",
    from_json(col("custom"), jsonSchema, Map[String, String]())
)

But the above doesn't work and raise this exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`custom`)' due to data type mismatch: The input json should be a string literal and not null; however, got `custom`.;;
'Project [schemaofjson(custom#7) AS schemaofjson(custom)#16]

Keep in mind I'm filtering out null values on custom for this DataFrame.


EDIT: whole code below.

import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

/**
  * RandomName entry point.
  *
  * @author Random author
  */
object RandomName {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder
      .appName("RandomName")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.parquet.fs.optimized.committer.optimization-enabled", true)
      .getOrCreate

    import spark.implicits._

    val randomName: RandomName = new RandomName(spark)

    val df: sql.DataFrame  = randomName.read().filter($"custom".isNotNull)
    val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

    df.withColumn(
      "nestedCustom",
      from_json(col("custom"), jsonSchema, Map[String, String]())
    )

    df.show

    spark.stop
  }
}

class RandomName(private val spark: SparkSession) {

  /**
    * Reads CSV files from S3 and creates a sql.DataFrame.
    *
    * @return a sql.DataFrame
    */
  def read(): sql.DataFrame = {
    val tableSchema = StructType(
      Array(
        StructField("a", StringType, true),
        StructField("b", StringType, true),
        StructField("c", DateType, true),
        StructField("custom", StringType, true)
      ))

    spark.read
      .format("csv")
      .option("sep", ";")
      .option("header", "true")
      .option("inferSchema", "true")
      .schema(tableSchema)
      .load("s3://random-bucket/*")
  }
}

And an example of a JSON:

{
  "lvl1":  {
    "lvl2a": {
      "lvl3a":   {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    },
    "lvl2b":   {
      "lvl3a":   {
        "lvl4a": "ramdom_data"
      },
      "lvl3b":  {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    }
  }
}
like image 456
ngc2359 Avatar asked Feb 14 '19 10:02

ngc2359


People also ask

How do I infer schema from JSON file?

Spark SQL can automatically infer the schema of a JSON dataset and load it as a Dataset<Row> . This conversion can be done using SparkSession. read(). json() on either a Dataset<String> , or a JSON file.

How do I see the structure schema of the DataFrame in Spark SQL?

To get the schema of the Spark DataFrame, use printSchema() on Spark DataFrame object. From the above example, printSchema() prints the schema to console( stdout ) and show() displays the content of the Spark DataFrame.

How do I create a JSON schema in Spark?

Spark Read JSON with schemaUse the StructType class to create a custom schema, below we initiate this class and use add a method to add columns to it by providing the column name, data type and nullable option. The above example ignores the default schema and uses the custom schema while reading a JSON file.


1 Answers

That's an indicator that custom is not a valid input for schema_of_json

scala> spark.sql("SELECT schema_of_json(struct(1, 2))")
org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(named_struct('col1', 1, 'col2', 2))' due to data type mismatch: argument 1 requires string type, however, 'named_struct('col1', 1, 'col2', 2)' is of struct<col1:int,col2:int> type.; line 1 pos 7;
...

You should go back to your data and make sure that custom is indeed a String.

like image 166
user11061726 Avatar answered Nov 14 '22 05:11

user11061726