Is it possible to save AWS Athena results to a dictionary for a certain time period, while working on AWS lambda function using Python 3.6?
it's possible to save the Athena query result in a Python dictionary; I write a class with the parameters needed to connect to Athena as region, database, s3_output and workgroup. You have to initialize the class with those parameters and then execute the run query method with the Athena query. If you need more information about the class please do not hesitate to comment. I hope this could help you.
import boto3
import logging
from retry import retry
from typing import List, Dict, Any
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
class FailedExecution(Exception):
def __init__(self, message: str) -> None:
self.message = message
log.exception(self.message)
super().__init__(self.message)
class ExecutionNotReady(Exception):
def __init__(self, message: str) -> None:
self.message = message
log.warning(self.message)
super().__init__(self.message)
class AthenaCall(object):
def __init__(self, region: str, database: str, s3_output: str, workgroup: str = 'primary')-> None:
self.region = region
self.database = database
self.s3_output = s3_output
self.workgroup = workgroup
self.client = boto3.client('athena', region_name=self.region)
@retry(ExecutionNotReady, tries=10, delay=2)
def _get_query_result(self, query_execution_id: str, next_token: str)-> List[Dict[str, Any]]:
query_status = self.client.get_query_execution(
QueryExecutionId = query_execution_id
)
state = query_status['QueryExecution']['Status']['State']
if state == 'SUCCEEDED':
if next_token:
response = self.client.get_query_results(
QueryExecutionId = query_execution_id,
NextToken = next_token,
MaxResults = 1000
)
else:
response = self.client.get_query_results(
QueryExecutionId = query_execution_id,
MaxResults = 1000
)
return response
elif state == 'FAILED':
raise FailedExecution(query_status['QueryExecution']['Status']['StateChangeReason'])
else:
raise ExecutionNotReady('QueryExecution {} is not ready yet.'.format(query_execution_id))
@staticmethod
def _get_data(row: Dict[str, Any])-> List[str]:
result = []
for key in row["Data"]:
result += key.values() or [None]
return result
def _create_dict_from_athena_result(self, response: Dict[str, Any], _key_result: List[str])-> List[Dict[str, Any]]:
while response["ResultSet"]["Rows"]:
result = {_k: None for _k in _key_result}
row = response["ResultSet"]["Rows"].pop(0)
_values = self._get_data(row)
for idx, _value in enumerate(_values):
result[_key_result[idx]] = _value
yield result
def _get_query_response(self, query_execution_id: str)-> List[Dict[str, Any]]:
i = 1
log.info(f"Getting page {i}")
response = self._get_query_result(query_execution_id, None)
_key_row = response["ResultSet"]["Rows"].pop(0)
_keys = self._get_data(_key_row)
data = []
while "NextToken" in response:
i+=1
log.info(f"Getting page {i}")
data += list(self._create_dict_from_athena_result(response, _keys))
response = self._get_query_result(query_execution_id, response["NextToken"])
data += list(self._create_dict_from_athena_result(response, _keys))
log.info(f"Total data: {len(data)}")
return data
def run_query(self, query: str)-> List[Dict[str, Any]]:
log.info(f'Running query: {query}')
query_execution = self.client.start_query_execution(
QueryString = query,
QueryExecutionContext = {
'Database': self.database
},
ResultConfiguration = {
'OutputLocation': self.s3_output,
},
WorkGroup = self.workgroup
)
response = self._get_query_response(query_execution["QueryExecutionId"])
return response
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With