Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Add a partition on glue table via API on AWS?

I have an S3 bucket which is constantly being filled with new data, I am using Athena and Glue to query that data, the thing is if glue doesn't know that a new partition is created it doesn't search that it needs to search there. If I make an API call to run the Glue crawler each time I need a new partition is too expensive so the best solution to do this is to tell glue that a new partition is added i.e to create a new partition is in it's properties table. I looked through AWS documentation but no luck, I am using Java with AWS. Any help?

like image 306
Gudzo Avatar asked Jun 01 '18 08:06

Gudzo


2 Answers

You may want to use batch_create_partition() glue api to register new partitions. It doesn't require any expensive operation like MSCK REPAIR TABLE or re-crawling.

I had a similar use case for which I wrote a python script which does the below -

Step 1 - Fetch the table information and parse the necessary information from it which is required to register the partitions.

# Fetching table information from glue catalog
logger.info("Fetching table info for {}.{}".format(l_database, l_table))
try:
    response = l_client.get_table(
        CatalogId=l_catalog_id,
        DatabaseName=l_database,
        Name=l_table
    )
except Exception as error:
    logger.error("Exception while fetching table info for {}.{} - {}"
                 .format(l_database, l_table, error))
    sys.exit(-1)

# Parsing table info required to create partitions from table
input_format = response['Table']['StorageDescriptor']['InputFormat']
output_format = response['Table']['StorageDescriptor']['OutputFormat']
table_location = response['Table']['StorageDescriptor']['Location']
serde_info = response['Table']['StorageDescriptor']['SerdeInfo']
partition_keys = response['Table']['PartitionKeys']

Step 2 - Generate a dictionary of lists where each list contains the information to create a single partition. All lists will have same structure but their partition specific values will change (year, month, day, hour)

def generate_partition_input_list(start_date, num_of_days, table_location,
                                  input_format, output_format, serde_info):
    input_list = []  # Initializing empty list
    today = datetime.utcnow().date()
    if start_date > today:  # To handle scenarios if any future partitions are created manually
        start_date = today
    end_date = today + timedelta(days=num_of_days)  # Getting end date till which partitions needs to be created
    logger.info("Partitions to be created from {} to {}".format(start_date, end_date))

    for input_date in date_range(start_date, end_date):
        # Formatting partition values by padding required zeroes and converting into string
        year = str(input_date)[0:4].zfill(4)
        month = str(input_date)[5:7].zfill(2)
        day = str(input_date)[8:10].zfill(2)
        for hour in range(24):  # Looping over 24 hours to generate partition input for 24 hours for a day
            hour = str('{:02d}'.format(hour))  # Padding zero to make sure that hour is in two digits
            part_location = "{}{}/{}/{}/{}/".format(table_location, year, month, day, hour)
            input_dict = {
                'Values': [
                    year, month, day, hour
                ],
                'StorageDescriptor': {
                    'Location': part_location,
                    'InputFormat': input_format,
                    'OutputFormat': output_format,
                    'SerdeInfo': serde_info
                }
            }
            input_list.append(input_dict.copy())
    return input_list

Step 3 - Call the batch_create_partition() API

for each_input in break_list_into_chunks(partition_input_list, 100):
    create_partition_response = client.batch_create_partition(
        CatalogId=catalog_id,
        DatabaseName=l_database,
        TableName=l_table,
        PartitionInputList=each_input
    )

There is a limit of 100 partitions in a single api call, So if you are creating more than 100 partitions then you will need to break your list into chunks and iterate over it.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.batch_create_partition

like image 186
conetfun Avatar answered Oct 01 '22 10:10

conetfun


  1. You can configure you're glue crawler to get triggered every 5 mins

  2. You can create a lambda function which will either run on schedule, or will be triggered by an event from your bucket (eg. putObject event) and that function could call athena to discover partitions:

     import boto3
    
     athena = boto3.client('athena')
    
     def lambda_handler(event, context):
         athena.start_query_execution(
             QueryString = "MSCK REPAIR TABLE mytable",
             ResultConfiguration = {
                 'OutputLocation': "s3://some-bucket/_athena_results"
             }
    
  3. Use Athena to add partitions manualy. You can also run sql queries via API like in my lambda example.

    Example from Athena manual:

     ALTER TABLE orders ADD
       PARTITION (dt = '2016-05-14', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_14_May_2016'
       PARTITION (dt = '2016-05-15', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_15_May_2016';
    
like image 22
botchniaque Avatar answered Oct 01 '22 08:10

botchniaque