Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Getting Http Response from boto3 table.batch_writer object

There is a list of data in a csv that I want to put into a dynamodb table on aws. See sample list below.

    Mary,F,7065
    Anna,F,2604
    Emma,F,2003
    Elizabeth,F,1939
    Minnie,F,1746
    Margaret,F,1578
    Ida,F,1472
    Alice,F,1414
    Bertha,F,1320
    Sarah,F,1288
    Annie,F,1258
    Clara,F,1226
    Ella,F,1156
    Florence,F,1063
    Cora,F,1045
    Martha,F,1040
    Laura,F,1012
    Nellie,F,995
    Grace,F,982
    Carrie,F,949
    Maude,F,858
    Mabel,F,808
    Bessie,F,796
    Jennie,F,793
    Gertrude,F,787
    Julia,F,783
    Hattie,F,769
    Edith,F,768
    Mattie,F,704
    Rose,F,700
    Catherine,F,688
    Lillian,F,672
    Ada,F,652
    Lillie,F,647
    Helen,F,636
    Jessie,F,635
    Louise,F,635
    Ethel,F,633
    Lula,F,621
    Myrtle,F,615
    Eva,F,614
    Frances,F,605
    Lena,F,603
    Lucy,F,590
    Edna,F,588
    Maggie,F,582
    Pearl,F,569
    Daisy,F,564
    Fannie,F,560
    Josephine,F,544

In order to write more than 25 items to a dynamodb table, the documents use a batch_writer object.

    resource = boto3.resource('dynamodb')
    table = resource.Table('Names')
    with table.batch_writer() as batch:
        for item in items:
            batch.put_item(item)

Is there a way to return an http response to indicate a successful completion of the batch_write? I know that it is asyncronous. Is there a wait or fetch or something to call?

like image 690
polka Avatar asked Mar 21 '19 17:03

polka


1 Answers

The documents for the BatchWriter object instantiated by batch_writer are located (<3 Open Source) here. Looking at the BatchWriter class, the _flush method generates a response, it just doesn't store it anywhere.

class BatchWriter(object):
    """Automatically handle batch writes to DynamoDB for a single table."""
    def __init__(self, table_name, client, flush_amount=25,
                 overwrite_by_pkeys=None):
        """
        :type table_name: str
        :param table_name: The name of the table.  The class handles
            batch writes to a single table.
        :type client: ``botocore.client.Client``
        :param client: A botocore client.  Note this client
            **must** have the dynamodb customizations applied
            to it for transforming AttributeValues into the
            wire protocol.  What this means in practice is that
            you need to use a client that comes from a DynamoDB
            resource if you're going to instantiate this class
            directly, i.e
            ``boto3.resource('dynamodb').Table('foo').meta.client``.
        :type flush_amount: int
        :param flush_amount: The number of items to keep in
            a local buffer before sending a batch_write_item
            request to DynamoDB.
        :type overwrite_by_pkeys: list(string)
        :param overwrite_by_pkeys: De-duplicate request items in buffer
            if match new request item on specified primary keys. i.e
            ``["partition_key1", "sort_key2", "sort_key3"]``
        """
        self._table_name = table_name
        self._client = client
        self._items_buffer = []
        self._flush_amount = flush_amount
        self._overwrite_by_pkeys = overwrite_by_pkeys

    def put_item(self, Item):
        self._add_request_and_process({'PutRequest': {'Item': Item}})

    def delete_item(self, Key):
        self._add_request_and_process({'DeleteRequest': {'Key': Key}})

    def _add_request_and_process(self, request):
        if self._overwrite_by_pkeys:
            self._remove_dup_pkeys_request_if_any(request)
        self._items_buffer.append(request)
        self._flush_if_needed()

    def _remove_dup_pkeys_request_if_any(self, request):
        pkey_values_new = self._extract_pkey_values(request)
        for item in self._items_buffer:
            if self._extract_pkey_values(item) == pkey_values_new:
                self._items_buffer.remove(item)
                logger.debug("With overwrite_by_pkeys enabled, skipping "
                             "request:%s", item)

    def _extract_pkey_values(self, request):
        if request.get('PutRequest'):
            return [request['PutRequest']['Item'][key]
                    for key in self._overwrite_by_pkeys]
        elif request.get('DeleteRequest'):
            return [request['DeleteRequest']['Key'][key]
                    for key in self._overwrite_by_pkeys]
        return None

    def _flush_if_needed(self):
        if len(self._items_buffer) >= self._flush_amount:
            self._flush()

    def _flush(self):
        items_to_send = self._items_buffer[:self._flush_amount]
        self._items_buffer = self._items_buffer[self._flush_amount:]
        response = self._client.batch_write_item(
            RequestItems={self._table_name: items_to_send})
        unprocessed_items = response['UnprocessedItems']

        if unprocessed_items and unprocessed_items[self._table_name]:
            # Any unprocessed_items are immediately added to the
            # next batch we send.
            self._items_buffer.extend(unprocessed_items[self._table_name])
        else:
            self._items_buffer = []
        logger.debug("Batch write sent %s, unprocessed: %s",
                     len(items_to_send), len(self._items_buffer))

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        # When we exit, we need to keep flushing whatever's left
        # until there's nothing left in our items buffer.
        while self._items_buffer:
            self._flush()

How I solved it:

I built on the responses to this question about overwriting class methods. They all work, but the best for my use case was to overwrite the class instance with this version of _flush.

First I built a new version of _flush.

import logging
import types

## New Flush

def _flush(self):
    items_to_send = self._items_buffer[:self._flush_amount]
    self._items_buffer = self._items_buffer[self._flush_amount:]
    self._response = self._client.batch_write_item(
        RequestItems={self._table_name: items_to_send})
    unprocessed_items = self._response['UnprocessedItems']

    if unprocessed_items and unprocessed_items[self._table_name]:
        # Any unprocessed_items are immediately added to the
        # next batch we send.
        self._items_buffer.extend(unprocessed_items[self._table_name])
    else:
        self._items_buffer = []
    logger.debug("Batch write sent %s, unprocessed: %s",
                 len(items_to_send), len(self._items_buffer))


Then I overwrote the instance method like this.

with batch_writer() as batch:
    batch._flush=types.MethodType(_flush, batch)
    for item in items:
        batch.put_item(Item=item)
print(batch._response)

And this generates an output like this.

{'UnprocessedItems': {},
 'ResponseMetadata': {'RequestId': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Fri, 29 Mar 2019 18:29:49 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '23',
   'connection': 'keep-alive',
   'x-amzn-requestid': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
   'x-amz-crc32': '4185382645'},
  'RetryAttempts': 0}}
like image 153
polka Avatar answered Sep 23 '22 14:09

polka