We have a firehose that sends records to an Elasticsearch Service cluster. Our cluster filled up and some records failed over to S3. The documentation at https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#retry indicates that failed records can be used to backfill: "The skipped documents are delivered to your S3 bucket in the elasticsearch_failed/ folder, which you can use for manual backfill" but I haven't been able to find any documentation on how to accomplish this.
Looking at the records they appear to be gzip files of text files containing JSON blobs with a "rawData" field containing a base64 encoded string of the original record we sent to firehose.
Is there a existing tool to process these gzip files out of S3, break them down, and re-submit the records? The documentation implies that you can "just manually backfill" and it's a pretty standardized flow so my assumption is someone has done this before but I haven't been able to find how.
I suppose manual backfill means to use one of the AWS SDKs to send the documents into Elasticsearch again. An example in python (using boto3), of reading a failure file from S3 and sending the documents within to Elasticsearch:
es_client = boto3.client('es', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
s3_client = boto3.client('s3', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
file = s3_client.get_object(Bucket=bucket, Key=key)
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))
for case in failure_cases:
try:
data = base64.b64decode(case['rawData'])
es_instance.create(index=case['esIndexName'], id=case['esDocumentId'], body=data)
logger.debug("Successfully sent {}".format(case['esDocumentId']))
except RequestError:
logger.info("Retry failed for Document ID {}\nReason: {}"
.format(case['esDocumentId'], case['errorMessage']))
Had the same problem, modified the above script to backfill failed documents (with 403 )to an existing elasticsearch instance
import boto3
import json
import base64
import logger
import requests
s3_client = boto3.client('s3', region_name="xx-xx-x", aws_access_key_id="xxxx", aws_secret_access_key="xxxx")
s3keys = s3_client.list_objects(Bucket="bucketname", Prefix='path/to/folder/file')
for s3key in s3keys['Contents']:
print(s3key['Key'])
file = s3_client.get_object(Bucket="bucketname", Key=s3key['Key'])
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))
for case in failure_cases:
data = base64.b64decode(case['rawData'])
esid = case['esDocumentId']
esIndexName = case['esIndexName']
doc = data.decode('utf-8')
url = ("https://es-domain-name/%s/_doc/%s" %(esIndexName, esid ))
headers = {"content-type": "application/json", "Accept-Charset": "UTF-8"}
if case['errorCode'] == '403':
try:
print(case['errorCode'])
r = requests.post(url, data=doc, headers=headers, auth=('user', 'password'))
response = r.json()
print(response)
except:
pass
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With