Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading the data written to s3 by Amazon Kinesis Firehose stream

I am writing record to Kinesis Firehose stream that is eventually written to a S3 file by Amazon Kinesis Firehose.

My record object looks like

ItemPurchase {     String personId,     String itemId } 

The data is written to S3 looks like:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"} 

NO COMMA SEPERATION.

NO STARTING BRACKET as in a Json Array

[ 

NO ENDING BRACKET as in a Json Array

] 

I want to read this data get a list of ItemPurchase objects.

List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent)) 

What is the correct way to read this data?

like image 532
learner_21 Avatar asked Dec 26 '15 03:12

learner_21


People also ask

Can Kinesis firehose read from S3?

Yes, Kinesis Data Firehose can back up all un-transformed records to your S3 bucket concurrently while delivering transformed records to destination. Source record backup can be enabled when you create or update your delivery stream.

Can Kinesis data stream write to S3?

Kinesis Data Analytics for Apache Flink cannot write data to Amazon S3 with server-side encryption enabled on Kinesis Data Analytics. You can create the Kinesis stream and Amazon S3 bucket using the console.

Can Kinesis data analytics read from firehose?

The Data Source We found that Kinesis Analytics can read from two streaming sources: Kinesis Data Streams and Kinesis Data Firehose.

How do I get my records from Kinesis stream?

To read from a stream continually, call GetRecords in a loop. Use GetShardIterator to get the shard iterator to specify in the first GetRecords call. GetRecords returns a new shard iterator in NextShardIterator . Specify the shard iterator returned in NextShardIterator in subsequent calls to GetRecords.


1 Answers

It boggles my mind that Amazon Firehose dumps JSON messages to S3 in this manner, and doesn't allow you to set a delimiter or anything.

Ultimately, the trick I found to deal with the problem was to process the text file using the JSON raw_decode method

This will allow you to read a bunch of concatenated JSON records without any delimiters between them.

Python code:

import json  decoder = json.JSONDecoder()  with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:      content = content_file.read()      content_length = len(content)     decode_index = 0      while decode_index < content_length:         try:             obj, decode_index = decoder.raw_decode(content, decode_index)             print("File index:", decode_index)             print(obj)         except JSONDecodeError as e:             print("JSONDecodeError:", e)             # Scan forward and keep trying to decode             decode_index += 1 
like image 74
Tom Chapin Avatar answered Sep 27 '22 16:09

Tom Chapin