Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

(AWS) Athena: Query Results seem too short

My Athena queries appear to be too short in their results. Trying to figure out Why?

Setup:

Glue Catalogs (118.6 Gig in size). Data: Stored in S3 in both CSV and JSON format. Athena Query: When I query data for a whole table, I only get 40K results per Query, there should be 121Million Records for that query on average for one month's data.

Does Athena Cap query result data? Is this a service limit (the documentation does not suggest this to be the case).

like image 945
Compiled Kernel Avatar asked Jan 18 '18 19:01

Compiled Kernel


People also ask

Why are Athena queries slow?

If your queries have a higher planning time, it might be caused by over-partitioning the table. Tables with hundreds or thousands of partitions can result in slower query processing. To improve query performance, try one or more of the following: Consider reducing the number of partitions.

Does AWS Athena cache query results?

Amazon Athena automatically stores query results and metadata information for each query that runs in a query result location that you can specify in Amazon S3. If necessary, you can access the files in this location to work with them.


3 Answers

It seems that there is a limit of 1000. You should use NextToken to iterate over the results.

Quote of the GetQueryResults Documentation

MaxResults The maximum number of results (rows) to return in this request.

Type: Integer

Valid Range: Minimum value of 0. Maximum value of 1000.

Required: No

like image 191
MaiKaY Avatar answered Oct 20 '22 15:10

MaiKaY


So, getting 1000 results at a time obviously doesn't scale. Thankfully, there's a simple workaround. (Or maybe this is how it was supposed to be done all along.)

When you run an Athena query, you should get a QueryExecutionId. This Id corresponds to the output file you'll find in S3.

Here's a snippet I wrote:

s3 = boto3.resource("s3")
athena = boto3.client("athena")
response: Dict = athena.start_query_execution(QueryString=query, WorkGroup="<your_work_group>")
execution_id: str = response["QueryExecutionId"]
print(execution_id)

# Wait until the query is finished
while True:
    try:
        athena.get_query_results(QueryExecutionId=execution_id)
        break
    except botocore.exceptions.ClientError as e:
        time.sleep(5)

local_filename: str = "temp/athena_query_result_temp.csv"
s3.Bucket("athena-query-output").download_file(execution_id + ".csv", local_filename)
return pd.read_csv(local_filename)

Make sure the corresponding WorkGroup has "Query result location" set, e.g. "s3://athena-query-output/"

Also see this thread with similar answers: How to Create Dataframe from AWS Athena using Boto3 get_query_results method

like image 41
Alexei Andreev Avatar answered Oct 20 '22 15:10

Alexei Andreev


Another option is Paginate and count approach : Don't know whether better way to do it like select count(*) from table like...

Here is the complete example code ready to use. Used python boto3 athena api I used paginator and converted result as list of dict and also returning count along with the result.

below are 2 methods First one will paginate second one will convert paginated result to list of dict and calculate count.

Note : converting in to list of dict is not necessary in this case. If you don't want that.. in the code you can modify to have only count

def get_athena_results_paginator(params, athena_client):
    """

    :param params:
    :param athena_client:
    :return:
    """
    query_id = athena_client.start_query_execution(
        QueryString=params['query'],
        QueryExecutionContext={
            'Database': params['database']
        }
        # ,
        # ResultConfiguration={
        #     'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
        # }
        , WorkGroup=params['workgroup']

    )['QueryExecutionId']
    query_status = None
    while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
        query_status = athena_client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State']
        if query_status == 'FAILED' or query_status == 'CANCELLED':
            raise Exception('Athena query with the string "{}" failed or was cancelled'.format(params.get('query')))
        time.sleep(10)
    results_paginator = athena_client.get_paginator('get_query_results')
    results_iter = results_paginator.paginate(
        QueryExecutionId=query_id,
        PaginationConfig={
            'PageSize': 1000
        }
    )
    count, results = result_to_list_of_dict(results_iter)
    return results, count


def result_to_list_of_dict(results_iter):
    """

    :param results_iter:
    :return:
    """
    results = []
    column_names = None
    count = 0
    for results_page in results_iter:
        print(len(list(results_iter)))
        for row in results_page['ResultSet']['Rows']:
            count = count + 1
            column_values = [col.get('VarCharValue', None) for col in row['Data']]
            if not column_names:
                column_names = column_values
            else:
                results.append(dict(zip(column_names, column_values)))
    return count, results

like image 41
Ram Ghadiyaram Avatar answered Oct 20 '22 15:10

Ram Ghadiyaram