Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Backfill AWS Kinesis Firehose to Elasticsearch Service failed records

We have a firehose that sends records to an Elasticsearch Service cluster. Our cluster filled up and some records failed over to S3. The documentation at https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#retry indicates that failed records can be used to backfill: "The skipped documents are delivered to your S3 bucket in the elasticsearch_failed/ folder, which you can use for manual backfill" but I haven't been able to find any documentation on how to accomplish this.

Looking at the records they appear to be gzip files of text files containing JSON blobs with a "rawData" field containing a base64 encoded string of the original record we sent to firehose.

Is there a existing tool to process these gzip files out of S3, break them down, and re-submit the records? The documentation implies that you can "just manually backfill" and it's a pretty standardized flow so my assumption is someone has done this before but I haven't been able to find how.

like image 706
ima747 Avatar asked Apr 13 '18 17:04


2 Answers

I suppose manual backfill means to use one of the AWS SDKs to send the documents into Elasticsearch again. An example in python (using boto3), of reading a failure file from S3 and sending the documents within to Elasticsearch:

es_client = boto3.client('es', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
s3_client = boto3.client('s3', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)

file = s3_client.get_object(Bucket=bucket, Key=key)
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))

for case in failure_cases:
        data = base64.b64decode(case['rawData'])
        es_instance.create(index=case['esIndexName'], id=case['esDocumentId'], body=data)
        logger.debug("Successfully sent {}".format(case['esDocumentId']))
    except RequestError:
        logger.info("Retry failed for Document ID {}\nReason: {}"
                    .format(case['esDocumentId'], case['errorMessage']))
like image 54
Nimrod Kor Avatar answered Sep 28 '22 07:09

Nimrod Kor

Had the same problem, modified the above script to backfill failed documents (with 403 )to an existing elasticsearch instance

import boto3
import json
import base64
import logger
import requests

s3_client = boto3.client('s3', region_name="xx-xx-x", aws_access_key_id="xxxx", aws_secret_access_key="xxxx")
s3keys = s3_client.list_objects(Bucket="bucketname", Prefix='path/to/folder/file')
for s3key in s3keys['Contents']:
    file = s3_client.get_object(Bucket="bucketname", Key=s3key['Key'])
    text = file['Body'].read().decode("utf-8")
    failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))
    for case in failure_cases:
        data = base64.b64decode(case['rawData'])
        esid = case['esDocumentId']
        esIndexName = case['esIndexName']
        doc = data.decode('utf-8')
        url = ("https://es-domain-name/%s/_doc/%s" %(esIndexName, esid ))
        headers = {"content-type": "application/json", "Accept-Charset": "UTF-8"}
        if case['errorCode'] == '403':
                r = requests.post(url, data=doc, headers=headers, auth=('user', 'password'))
                response = r.json()
like image 42
sachin roy Avatar answered Sep 28 '22 05:09

sachin roy