Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Structured Streaming: StructField(..., ..., False) always returns `nullable=true` instead of `nullable=false`

I'm using Spark Structured Streaming (3.2.1) with Kafka.

I'm trying to simply read JSON from Kafka using a defined schema.

My problem is that in defined schema I got non-nullable field that is ignored when I read messages from Kafka. I use the from_json functions that seems to ignore that some fields can't be null.

Here is my code example:

val schemaTest = new StructType()
  .add("firstName", StringType)
  .add("lastName", StringType)
  .add("birthDate", LongType, nullable = false)

val loader =
    spark
    .readStream
    .format("kafka")
    .option("startingOffsets", "earliest")
    .option("kafka.bootstrap.servers", "BROKER:PORT")
    .option("subscribe", "TOPIC")
    .load()

val df = loader.
     selectExpr("CAST(value AS STRING)")
    .withColumn("value", from_json(col("value"), schemaTest))
    .select(col("value.*"))

df.printSchema()

val q = df.writeStream
  .format("console")
  .option("truncate","false")
  .start()
  .awaitTermination()

I got this when I'm printing the schema of df which is different of my schemaTest:

root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- birthDate: long (nullable = true)

And received data are like that:

+---------+--------+----------+
|firstName|lastName|birthDate |
+---------+--------+----------+
|Toto     |Titi    |1643799912|
+---------+--------+----------+
|Tutu     |Tata    |null      |
+---------+--------+----------+

We also try to add option to change mode in from_json function from default one PERMISSIVE to others (DROPMALFORMED, FAILFAST) but in fact the second record that doesn't respect the defined schema is simply not considered as corrupted because the field birthDate is nullable..

Maybe I missed something but if it's not the case, I got following questions.

Do you know why the printSchema of df is not like my schemaTest ? (With non nullable field)

And also, how can I manage non-nullable value in my case ? I know that I can filter but I would like to know if there is an alternative using schema like it's supposed to work. And also, It's not quite simple to filter if I got a schema with lots of fields non-nullable.

like image 951
MaxP Avatar asked Jun 09 '26 15:06

MaxP


1 Answers

This is actually the intended behavior of from_json function. You can read the following from the source code:

// The JSON input data might be missing certain fields. We force the nullability
// of the user-provided schema to avoid data corruptions. In particular, the parquet-mr encoder
// can generate incorrect files if values are missing in columns declared as non-nullable.
val nullableSchema = schema.asNullable

override def nullable: Boolean = true

If you have multiple fields which are mandatory then you can construct the filter expression from your schemaTest (or list of columns) and use it like this:

val filterExpr = schemaTest.fields
  .filter(!_.nullable)
  .map(f => col(f.name).isNotNull)
  .reduce(_ and _)

val df = loader
  .selectExpr("CAST(value AS STRING)")
  .withColumn("value", from_json(col("value"), schemaTest))
  .select(col("value.*"))
  .filter(filterExpr) 
like image 117
blackbishop Avatar answered Jun 12 '26 04:06

blackbishop



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!