Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python 3.6 asyncio - Task exception was never retrieved - Task got bad yield: 200

I've read the other questions and answers, and still am not able to figure out what I'm doing wrong here.

I'm trying to create an Elasticsearch 6.x producer in Python 3.6 using ES' asyncio implementation (https://github.com/elastic/elasticsearch-py-async) and while it works (the records are successfully pushed into ES), I get Task Exception was never retried and Task got bad yield: 200 errors. I assume they're both resulting from the same issue and one probably causes the other?

I'm using the following modules:

python 3.6
elasticsearch=6.3.1
elasticsearch-async=6.2.0
boto3=1.9.118

Below is my code:

import json
import boto3
import logging
import os
import gzip
import asyncio
from elasticsearch import RequestsHttpConnection
from elasticsearch_async import AsyncElasticsearch
from assume_role_aws4auth import AssumeRoleAWS4Auth
import time

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Operating constants
MAX_RECORDS_IN_BATCH = 500
MAX_BATCH_SIZE = 10000000

# boto3 clients
credentials = boto3.Session().get_credentials()
awsauth = AssumeRoleAWS4Auth(credentials, 'us-east-1', 'es')

cloudwatch_client = boto3.client('cloudwatch')
s3_resource = boto3.resource('s3')
event_loop = asyncio.get_event_loop()

es_client = AsyncElasticsearch(hosts=['https://ES_HOST'], http_compress=True, http_auth=awsauth, use_ssl=True,
                               verify_certs=True, connection_class=RequestsHttpConnection, loop=event_loop)


def lambda_handler(filename, context):
    event_loop.run_until_complete(process(filename))
    pending = asyncio.Task.all_tasks()
    event_loop.run_until_complete(asyncio.gather(*pending))


async def process(filename: str):
    for action_chunk in read_chunk(filename, MAX_BATCH_SIZE, MAX_RECORDS_IN_BATCH):
        try:
            resp = asyncio.ensure_future(es_client.bulk(body=action_chunk, index='index', doc_type='type', _source=False))
            await asyncio.sleep(.1)
        except Exception as ex:
            logger.error(ex)


def read_chunk(file_path: str, max_batch_size: int, max_records: int):
    actions: str = ''
    actions_size: int = 0
    num_actions: int = 0
    with gzip.open(file_path, 'rt') as f:
        for line in f:
            request = json.dumps(dict({'index': dict({})})) + '\n' + line + '\n'
            request_size = len(request.encode('utf-8'))

            # Check to see if this record will put us over the limits
            if (actions_size + request_size) > max_batch_size or num_actions == max_records:
                yield actions
                actions = ''
                num_actions = 0
                actions_size = 0

            # Add the record
            actions += request
            num_actions += 1
            actions_size += request_size

    if actions != '':
        yield actions


if __name__ == '__main__':
    lambda_handler('/path/to/file', None)

Below is the error I get each time I call es_client.bulk:

Task exception was never retrieved
future: <Task finished coro=<AsyncTransport.main_loop() done, defined at /path/to/PythonElasticsearchIngest/venv/lib/python3.6/site-packages/elasticsearch_async/transport.py:143> exception=RuntimeError('Task got bad yield: 200',)>
Traceback (most recent call last):
  File "/path/to/PythonElasticsearchIngest/venv/lib/python3.6/site-packages/elasticsearch_async/transport.py", line 150, in main_loop
    method, url, params, body, headers=headers, ignore=ignore, timeout=timeout)

Can anyone tell me what I'm doing wrong here? Also, if there is anything I can be doing better / more efficient, I'd love to hear it. I wanted to use the Helpers package, but there is no asyncio implementation of it.

like image 215
Brooks Avatar asked Nov 06 '22 19:11

Brooks


1 Answers

I'm not sure if it is the problem but here's what may happen.

You create multiple tasks inside process() coroutine, but don't store references to them. It may lead to a problem: some tasks are garbage collected before you could explicitly retrieve their results. If such thing happens asyncio warns you about situation.

To solve it you should store all created tasks and ensure all of them are awaited:

tasks = []

# ...

async def process(filename: str):
    # ...
    task = asyncio.ensure_future(...)
    tasks.append(task)
    # ...


def lambda_handler(filename, context):
    # ...
    event_loop.run_until_complete(asyncio.gather(*tasks ))

If my guess is correct you'll probably see RuntimeError('Task got bad yield: 200',) raised at lambda_handler. You can retrieve all exceptions without raising them passing return_exceptions=True param to asyncio.gather. This way you avoid warnings (but not underlying reason why those exceptions happened it tasks in the first place).

Sorry, can't help further than here.

Upd:

I altered answer fixing mistake of original version.

like image 52
Mikhail Gerasimov Avatar answered Nov 15 '22 08:11

Mikhail Gerasimov