Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Deserializing Event Hub messages in Azure Databricks

I have an Azure Databricks script in Python that reads JSON messages from Event Hub using Structured Streaming, processes the messages and saves the results in Data Lake Store. The messages are sent to the Event Hub from an Azure Logic App that reads tweets from the Twitter API.

I am trying to deserialize the body of the Event Hub message in order the process its contents. The message body is first converted from binary to string value and then deserialized to a struct type using the from_json function, as explained in this article: https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

Here is a code example (with confuscated parameters):

from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import DateType, StringType, StructType

EVENT_HUB_CONN_STRING = 'Endpoint=sb://myehnamespace.servicebus.windows.net/;SharedAccessKeyName=Listen;SharedAccessKey=xxx;EntityPath=myeh'
OUTPUT_DIR = '/mnt/DataLake/output'
CHECKPOINT_DIR = '/mnt/DataLake/checkpoint'

event_hub_conf = {
    'eventhubs.connectionString' : EVENT_HUB_CONN_STRING
}

stream_data = spark \
    .readStream \
    .format('eventhubs') \
    .options(**event_hub_conf) \
    .option('multiLine', True) \
    .option('mode', 'PERMISSIVE') \
    .load()

schema = StructType() \
    .add('FetchTimestampUtc', DateType()) \
    .add('Username', StringType()) \
    .add('Name', StringType()) \
    .add('TweetedBy', StringType()) \
    .add('Location', StringType()) \
    .add('TweetText', StringType())

stream_data_body = stream_data \
    .select(stream_data.body) \
    .select(from_json('body', schema).alias('body')) \
    .select(to_json('body').alias('body'))

# This works (bare string value, no deserialization):
# stream_data_body = stream_data.select(stream_data.body)

stream_data_body \
    .writeStream \
    .outputMode('append') \
    .format('json') \
    .option('path', OUTPUT_DIR) \
    .option('checkpointLocation', CHECKPOINT_DIR) \
    .start() \
    .awaitTermination()

Here I am not actually doing any processing yet, just a trivial deserialization/serialization.

The above script does produce output to Data Lake, but the result JSON objects are empty. Here is an example of the output:

{}
{}
{}

The commented code in the script does produce output, but this is just the string value since we did not include deserialization:

{"body":"{\"FetchTimestampUtc\": 2018-10-16T09:21:40.6173187Z, \"Username\": ... }}

I was wondering if the backslashes should be doubled, as in the example given in the link above? This might be doable with the options parameter of the from_json function: "options to control parsing. accepts the same options as the json datasource." But I have not found documentation for the options format.

Any ideas why the deserialization/serialization is not working?

like image 212
Lazer Avatar asked Oct 17 '22 12:10

Lazer


1 Answers

It appears that the input JSON must have a specific syntax. The field values must be strings, timestamps are not allowed (and perhaps the same goes for integers, floats etc.). The type conversion must be done inside the Databricks script.

I changed the input JSON so that the timestamp value is quoted. In the schema, I also changed DateType to TimestampType (which is more appropriate), NOT to StringType.

By using the following select expression:

stream_data_body = stream_data \
    .select(from_json(stream_data.body.cast('string'), schema).alias('body')) \
    .select(to_json('body').alias('body'))

the following output is produced in the output file:

{"body":"{\"FetchTimestampUtc\":\"2018-11-29T21:26:40.039Z\",\"Username\":\"xyz\",\"Name\":\"x\",\"TweetedBy\":\"xyz\",\"Location\":\"\",\"TweetText\":\"RT @z123: I just want to say thanks to everyone who interacts with me, whether they talk or they just silently rt or like, thats okay.…\"}"}

which is kind of the expected result, although the timestamp value is outputted as a string value. In fact, the whole body object is outputted as a string.

I didn't manage to get the ingestion working if the input format is proper JSON with native field types. The output of from_json is always null in that case.

EDIT: This seems to have been confusion on my part. Date values should always be quoted in JSON, they are not "native" types.

I have tested that integer and float values can be passed without quotes so that it is possible to do calculations with them.

like image 152
Lazer Avatar answered Oct 30 '22 18:10

Lazer