I've read the other questions and answers, and still am not able to figure out what I'm doing wrong here.
I'm trying to create an Elasticsearch 6.x producer in Python 3.6 using ES' asyncio implementation (https://github.com/elastic/elasticsearch-py-async) and while it works (the records are successfully pushed into ES), I get Task Exception was never retried
and Task got bad yield: 200
errors. I assume they're both resulting from the same issue and one probably causes the other?
I'm using the following modules:
python 3.6
elasticsearch=6.3.1
elasticsearch-async=6.2.0
boto3=1.9.118
Below is my code:
import json
import boto3
import logging
import os
import gzip
import asyncio
from elasticsearch import RequestsHttpConnection
from elasticsearch_async import AsyncElasticsearch
from assume_role_aws4auth import AssumeRoleAWS4Auth
import time
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Operating constants
MAX_RECORDS_IN_BATCH = 500
MAX_BATCH_SIZE = 10000000
# boto3 clients
credentials = boto3.Session().get_credentials()
awsauth = AssumeRoleAWS4Auth(credentials, 'us-east-1', 'es')
cloudwatch_client = boto3.client('cloudwatch')
s3_resource = boto3.resource('s3')
event_loop = asyncio.get_event_loop()
es_client = AsyncElasticsearch(hosts=['https://ES_HOST'], http_compress=True, http_auth=awsauth, use_ssl=True,
verify_certs=True, connection_class=RequestsHttpConnection, loop=event_loop)
def lambda_handler(filename, context):
event_loop.run_until_complete(process(filename))
pending = asyncio.Task.all_tasks()
event_loop.run_until_complete(asyncio.gather(*pending))
async def process(filename: str):
for action_chunk in read_chunk(filename, MAX_BATCH_SIZE, MAX_RECORDS_IN_BATCH):
try:
resp = asyncio.ensure_future(es_client.bulk(body=action_chunk, index='index', doc_type='type', _source=False))
await asyncio.sleep(.1)
except Exception as ex:
logger.error(ex)
def read_chunk(file_path: str, max_batch_size: int, max_records: int):
actions: str = ''
actions_size: int = 0
num_actions: int = 0
with gzip.open(file_path, 'rt') as f:
for line in f:
request = json.dumps(dict({'index': dict({})})) + '\n' + line + '\n'
request_size = len(request.encode('utf-8'))
# Check to see if this record will put us over the limits
if (actions_size + request_size) > max_batch_size or num_actions == max_records:
yield actions
actions = ''
num_actions = 0
actions_size = 0
# Add the record
actions += request
num_actions += 1
actions_size += request_size
if actions != '':
yield actions
if __name__ == '__main__':
lambda_handler('/path/to/file', None)
Below is the error I get each time I call es_client.bulk
:
Task exception was never retrieved
future: <Task finished coro=<AsyncTransport.main_loop() done, defined at /path/to/PythonElasticsearchIngest/venv/lib/python3.6/site-packages/elasticsearch_async/transport.py:143> exception=RuntimeError('Task got bad yield: 200',)>
Traceback (most recent call last):
File "/path/to/PythonElasticsearchIngest/venv/lib/python3.6/site-packages/elasticsearch_async/transport.py", line 150, in main_loop
method, url, params, body, headers=headers, ignore=ignore, timeout=timeout)
Can anyone tell me what I'm doing wrong here? Also, if there is anything I can be doing better / more efficient, I'd love to hear it. I wanted to use the Helpers package, but there is no asyncio implementation of it.
I'm not sure if it is the problem but here's what may happen.
You create multiple tasks inside process()
coroutine, but don't store references to them. It may lead to a problem: some tasks are garbage collected before you could explicitly retrieve their results. If such thing happens asyncio
warns you about situation.
To solve it you should store all created tasks and ensure all of them are awaited:
tasks = []
# ...
async def process(filename: str):
# ...
task = asyncio.ensure_future(...)
tasks.append(task)
# ...
def lambda_handler(filename, context):
# ...
event_loop.run_until_complete(asyncio.gather(*tasks ))
If my guess is correct you'll probably see RuntimeError('Task got bad yield: 200',)
raised at lambda_handler
. You can retrieve all exceptions without raising them passing return_exceptions=True
param to asyncio.gather. This way you avoid warnings (but not underlying reason why those exceptions happened it tasks in the first place).
Sorry, can't help further than here.
Upd:
I altered answer fixing mistake of original version.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With