Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to save the athena results in to a python dictionary from lambda?

Tags:

aws-lambda

Is it possible to save AWS Athena results to a dictionary for a certain time period, while working on AWS lambda function using Python 3.6?

like image 762
pyhotshot Avatar asked Apr 05 '26 19:04

pyhotshot


1 Answers

it's possible to save the Athena query result in a Python dictionary; I write a class with the parameters needed to connect to Athena as region, database, s3_output and workgroup. You have to initialize the class with those parameters and then execute the run query method with the Athena query. If you need more information about the class please do not hesitate to comment. I hope this could help you.

import boto3
import logging
from retry import retry
from typing import List, Dict, Any

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)

class FailedExecution(Exception):
    def __init__(self, message: str) -> None:
        self.message = message
        log.exception(self.message)
        super().__init__(self.message)

class ExecutionNotReady(Exception):
    def __init__(self, message: str) -> None:
        self.message = message
        log.warning(self.message)
        super().__init__(self.message)

class AthenaCall(object):
    def __init__(self, region: str, database: str, s3_output: str, workgroup: str = 'primary')-> None:
        self.region = region
        self.database = database
        self.s3_output = s3_output
        self.workgroup = workgroup
        self.client = boto3.client('athena', region_name=self.region)
    
    @retry(ExecutionNotReady, tries=10, delay=2)
    def _get_query_result(self, query_execution_id: str, next_token: str)-> List[Dict[str, Any]]:
        query_status = self.client.get_query_execution(
        QueryExecutionId = query_execution_id
        )
        state = query_status['QueryExecution']['Status']['State']
        if state == 'SUCCEEDED':
            if next_token:
                response = self.client.get_query_results(
                    QueryExecutionId = query_execution_id,
                    NextToken = next_token,
                    MaxResults = 1000
                )
            else:
                response = self.client.get_query_results(
                        QueryExecutionId = query_execution_id,
                        MaxResults = 1000
                    )
            return response
        elif state == 'FAILED':
            raise FailedExecution(query_status['QueryExecution']['Status']['StateChangeReason'])
        else:
            raise ExecutionNotReady('QueryExecution {} is not ready yet.'.format(query_execution_id))
    
    @staticmethod
    def _get_data(row: Dict[str, Any])-> List[str]:
        result = []
        for key in row["Data"]:
            result += key.values() or [None]
        return result
    
    def _create_dict_from_athena_result(self, response: Dict[str, Any], _key_result: List[str])-> List[Dict[str, Any]]:
        while response["ResultSet"]["Rows"]:
            result = {_k: None for _k in _key_result} 
            row = response["ResultSet"]["Rows"].pop(0)
            _values = self._get_data(row)
            for idx, _value in enumerate(_values):
                result[_key_result[idx]] = _value
            yield result

    def _get_query_response(self, query_execution_id: str)-> List[Dict[str, Any]]:
        i = 1
        log.info(f"Getting page {i}")
        response = self._get_query_result(query_execution_id, None)
        _key_row = response["ResultSet"]["Rows"].pop(0)
        _keys = self._get_data(_key_row)
        data = []
        while "NextToken" in response:
            i+=1
            log.info(f"Getting page {i}")
            data += list(self._create_dict_from_athena_result(response, _keys))
            response = self._get_query_result(query_execution_id, response["NextToken"])
        data += list(self._create_dict_from_athena_result(response, _keys))
        log.info(f"Total data: {len(data)}")
        return data
    
    def run_query(self, query: str)-> List[Dict[str, Any]]:
        log.info(f'Running query: {query}')
        query_execution = self.client.start_query_execution(
            QueryString = query,
            QueryExecutionContext = {
                'Database': self.database
            },
            ResultConfiguration = {
                'OutputLocation': self.s3_output,
            },
            WorkGroup = self.workgroup
        )
        response = self._get_query_response(query_execution["QueryExecutionId"])
        return response
like image 140
gamez_code Avatar answered Apr 08 '26 08:04

gamez_code