I'm reading a stream from Kafka, and I convert the value from Kafka ( which is JSON ) in to Structure.
from_json
has a variant that takes a schema of type String
, but I could not find a sample. Please advise what is wrong in the below code.
Error
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '(' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT',
== SQL ==
STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING ) ) )
-------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)
Program
public static void main(String[] args) throws AnalysisException {
String master = "local[*]";
String brokers = "quickstart:9092";
String topics = "simple_topic_6";
SparkSession sparkSession = SparkSession
.builder().appName(EmployeeSchemaLoader.class.getName())
.master(master).getOrCreate();
String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " +
"addresses: ARRAY ( STRUCT ( city: STRING, state: STRING, zip: STRING ) ) ) ";
SparkContext context = sparkSession.sparkContext();
context.setLogLevel("ERROR");
SQLContext sqlCtx = sparkSession.sqlContext();
Dataset<Row> employeeDataset = sparkSession.readStream().
format("kafka").
option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics).load();
employeeDataset.printSchema();
employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string"));
employeeDataset = employeeDataset.withColumn("employeeRecord",
functions.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>()));
employeeDataset.printSchema();
employeeDataset.createOrReplaceTempView("employeeView");
sparkSession.catalog().listTables().show();
sqlCtx.sql("select * from employeeView").show();
}
from_json (col, schema, options={})[source] Parses a column containing a JSON string into a MapType with StringType as keys type, StructType or ArrayType with the specified schema.
Spark SQL allows users to ingest data from these classes of data sources, both in batch and streaming queries. It natively supports reading and writing data in Parquet, ORC, JSON, CSV, and text format and a plethora of other connectors exist on Spark Packages.
A schema is the description of the structure of your data (which together create a Dataset in Spark SQL). It can be implicit (and inferred at runtime) or explicit (and known at compile time).
Your question helped me to find that the variant of from_json
with String
-based schema was only available in Java and has recently been added to Spark API for Scala in the upcoming 2.3.0. I've so long lived with the strong belief that Spark API for Scala was always the most feature-rich and your question helped me to learn it should not have been so before the change in 2.3.0 (!)
Back to your question, you can define the string-based schema in JSON or DDL format actually.
Writing JSON by hand may be a bit cumbersome and so I'd take a different approach (that given I'm a Scala developer is fairly easy).
Let's first define the schema using Spark API for Scala.
import org.apache.spark.sql.types._
val addressesSchema = new StructType()
.add($"city".string)
.add($"state".string)
.add($"zip".string)
val schema = new StructType()
.add($"firstName".string)
.add($"lastName".string)
.add($"email".string)
.add($"addresses".array(addressesSchema))
scala> schema.printTreeString
root
|-- firstName: string (nullable = true)
|-- lastName: string (nullable = true)
|-- email: string (nullable = true)
|-- addresses: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| | |-- zip: string (nullable = true)
That seems to match your schema, doesn't it?
With that convert the schema to a JSON-encoded string was a breeze with json
method.
val schemaAsJson = schema.json
schemaAsJson
is exactly your JSON string which looks pretty...hmmm...complex. For the display purposes I'd rather use prettyJson
method.
scala> println(schema.prettyJson)
{
"type" : "struct",
"fields" : [ {
"name" : "firstName",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "lastName",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "email",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "addresses",
"type" : {
"type" : "array",
"elementType" : {
"type" : "struct",
"fields" : [ {
"name" : "city",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "state",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "zip",
"type" : "string",
"nullable" : true,
"metadata" : { }
} ]
},
"containsNull" : true
},
"nullable" : true,
"metadata" : { }
} ]
}
That's your schema in JSON.
You can use DataType
and "validate" the JSON string (using DataType.fromJson that Spark uses under the covers for from_json
).
import org.apache.spark.sql.types.DataType
val dt = DataType.fromJson(schemaAsJson)
scala> println(dt.sql)
STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>
All seems fine. Mind if I'm checking this out with a sample dataset?
val rawJsons = Seq("""
{
"firstName" : "Jacek",
"lastName" : "Laskowski",
"email" : "[email protected]",
"addresses" : [
{
"city" : "Warsaw",
"state" : "N/A",
"zip" : "02-791"
}
]
}
""").toDF("rawjson")
val people = rawJsons
.select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
.select("json.*") // <-- flatten the struct field
.withColumn("address", explode($"addresses")) // <-- explode the array field
.drop("addresses") // <-- no longer needed
.select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field
scala> people.show
+---------+---------+---------------+------+-----+------+
|firstName| lastName| email| city|state| zip|
+---------+---------+---------------+------+-----+------+
| Jacek|Laskowski|[email protected]|Warsaw| N/A|02-791|
+---------+---------+---------------+------+-----+------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With