I have a Spark batch job which is executed hourly. Each run generates and stores new data in S3
with the directory naming pattern DATA/YEAR=?/MONTH=?/DATE=?/datafile
.
After uploading the data to S3
, I want to investigate it using Athena
. Also, I would like to visualize them in QuickSight
by connecting to Athena as a data source.
The problem is that after each run of my Spark batch, the newly generated data stored in S3
will not be discovered by Athena, unless I manually run the query MSCK REPAIR TABLE
.
Is there a way to make Athena update the data automatically, so that I can create a fully automatic data visualization pipeline?
Use the MSCK REPAIR TABLE command to update the metadata in the catalog after you add Hive compatible partitions. The MSCK REPAIR TABLE command scans a file system such as Amazon S3 for Hive compatible partitions that were added to the file system after the table was created.
MSCK REPAIR TABLE can be a costly operation, because it needs to scan the table's sub-tree in the file system (the S3 bucket). Multiple levels of partitioning can make it more costly, as it needs to traverse additional sub-directories.
AWS gives us a few ways to refresh the Athena table partitions. We can use the user interface, run the MSCK REPAIR TABLE statement using Hive, or use a Glue Crawler. This article will show you how to create a new crawler and use it to refresh an Athena table. If the crawler already exists, we can reuse it.
MSCK REPAIR TABLE recovers all the partitions in the directory of a table and updates the Hive metastore. When creating a table using PARTITIONED BY clause, partitions are generated and registered in the Hive metastore.
There are a number of ways to schedule this task. How do you schedule your workflows? Do you use a system like Airflow, Luigi, Azkaban, cron, or using an AWS Data pipeline?
From any of these, you should be able to fire off the following CLI command.
$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"
Another option would be AWS Lambda. You could have a function that calls MSCK REPAIR TABLE some_database.some_table
in response to a new upload to S3.
An example Lambda Function could be written as such:
import boto3 def lambda_handler(event, context): bucket_name = 'some_bucket' client = boto3.client('athena') config = { 'OutputLocation': 's3://' + bucket_name + '/', 'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'} } # Query Execution Parameters sql = 'MSCK REPAIR TABLE some_database.some_table' context = {'Database': 'some_database'} client.start_query_execution(QueryString = sql, QueryExecutionContext = context, ResultConfiguration = config)
You would then configure a trigger to execute your Lambda function when new data are added under the DATA/
prefix in your bucket.
Ultimately, explicitly rebuilding the partitions after you run your Spark Job using a job scheduler has the advantage of being self documenting. On the other hand, AWS Lambda is convenient for jobs like this one.
You should be running ADD PARTITION
instead:
aws athena start-query-execution --query-string "ALTER TABLE ADD PARTITION..."
Which adds a the newly created partition from your S3
location Athena leverages Hive for partitioning data. To create a table with partitions, you must define it during the CREATE TABLE
statement. Use PARTITIONED BY
to define the keys by which to partition data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With