I am writing record to Kinesis Firehose stream that is eventually written to a S3 file by Amazon Kinesis Firehose.
My record object looks like
ItemPurchase { String personId, String itemId }
The data is written to S3 looks like:
{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}
NO COMMA SEPERATION.
NO STARTING BRACKET as in a Json Array
[
NO ENDING BRACKET as in a Json Array
]
I want to read this data get a list of ItemPurchase objects.
List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))
What is the correct way to read this data?
Yes, Kinesis Data Firehose can back up all un-transformed records to your S3 bucket concurrently while delivering transformed records to destination. Source record backup can be enabled when you create or update your delivery stream.
Kinesis Data Analytics for Apache Flink cannot write data to Amazon S3 with server-side encryption enabled on Kinesis Data Analytics. You can create the Kinesis stream and Amazon S3 bucket using the console.
The Data Source We found that Kinesis Analytics can read from two streaming sources: Kinesis Data Streams and Kinesis Data Firehose.
To read from a stream continually, call GetRecords in a loop. Use GetShardIterator to get the shard iterator to specify in the first GetRecords call. GetRecords returns a new shard iterator in NextShardIterator . Specify the shard iterator returned in NextShardIterator in subsequent calls to GetRecords.
It boggles my mind that Amazon Firehose dumps JSON messages to S3 in this manner, and doesn't allow you to set a delimiter or anything.
Ultimately, the trick I found to deal with the problem was to process the text file using the JSON raw_decode method
This will allow you to read a bunch of concatenated JSON records without any delimiters between them.
Python code:
import json decoder = json.JSONDecoder() with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file: content = content_file.read() content_length = len(content) decode_index = 0 while decode_index < content_length: try: obj, decode_index = decoder.raw_decode(content, decode_index) print("File index:", decode_index) print(obj) except JSONDecodeError as e: print("JSONDecodeError:", e) # Scan forward and keep trying to decode decode_index += 1
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With