I am trying to write Azure App Function using Python where I am creating a simple Azure Storage Table and saving like up to 10000 rows.
I have tried to create entity by entity by using
from azure.data.tables import TableClient, TableTransactionError
...
table_client.create_entity({...})
It works but it is slow.
Then I tried to use
from concurrent.futures import ProcessPoolExecutor as PoolExecutor
which speed up entire process quite a lot but you CANNOT use it in Azure app Function from obvious reasons.
I also tried to use
table_client.upsert_entity(i)
...
table_client.submit_transaction(operations)
but it is again slow.
Finally I tried
# Create a new batch
batch = TableBatch()
# Count how many items are stored in the batch
inBatch = 0
# Loop over all the data we want to insert
for x in dataToStore:
# Insert the entity into the batch
batch.insert_entity({
'PartitionKey': 'PARTITION1',
'RowKey': str(x['rowkey']),
'someKey': x['someValue'],
'someOtherKey': x['someOtherValue']
})
# Increment the batch item counter
inBatch += 1
# We can only send batches with up to 100 records
if inBatch > 99:
# Commit the batch (send to Azure)
table_service.commit_batch('tablename', batch)
# Reset the batch so it doesn't contain any old items
batch = TableBatch()
inBatch = 0
But this is quite slow and not durable at all.,
Azure Storage table claims that you can save huge amount of data quickly.
Anyone knows how?
A few observations. You haven't mentioned how long your tests actually took. That would have been useful.
Usually when writing to table storage you would choose a partition id which is well distributed. However, in your example, you are using the same partition id = 'PARTITION1'. In this case, you can try using the Cosmos table storage SDK to batch up your entities and load them in a go. You can only use the batching in Cosmos table storage SDK if all entities in your batch have the same partition key.
Documentation states that you can load upto 100 entities or 4MB which ever is met first.
I ran a quick test using the Cosmos table python SDK and batch 50 entities in a go. I was able to load 1000 entities in around 35s. A 10000 entity load took me 4:55. I'm not sure if this suits your requirements or you were expecting something faster. Also please note, that my example entity is slightly larger in size that yours. Below is my code and this looks fairly similar to yours. This should be fairly easy to put into an Azure function.
#USING COSMOS TABLE STORAGE API
from azure.cosmosdb.table import TableService, TableBatch
from datetime import datetime
from random import randrange
import random
import names
import json
from datetime import timedelta
import uuid
acc_name = '<storageacct_name>'
acc_key = 'xxxxxxxxxxxxxxxxx'
table_name='xxxxxxx'
Number_of_docs = 10000
d1 = datetime.strptime('1/1/2008 1:30 PM', '%m/%d/%Y %I:%M %p')
d2 = datetime.strptime('1/1/2009 4:50 AM', '%m/%d/%Y %I:%M %p')
service = TableService(account_name=acc_name,
account_key=acc_key)
def random_date(start, end):
"""
This function will return a random datetime between two datetime
objects.
"""
delta = end - start
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
random_second = randrange(int_delta)
return start + timedelta(seconds=random_second)
def create_entity(id):
my_entity = {"PartitionKey" : "PARTITION1",
"RowKey" : str(id),
"employee_id" : str(id),
"first_name" : names.get_first_name(),
"last_name" : names.get_last_name(),
"regn_no" : "TEST00000000"+ str(id),
"start_date" : random_date(d1,d2),
"salary" : random.randint(12000, 2000000),
"inserted_at" : str(datetime.now())
}
return my_entity
starttime = datetime.utcnow()
print("Starting ingestion: ", starttime.strftime("%Y-%m-%d %H:%M:%S.%f"))
batch_no = 0
batch = TableBatch()
for i in range(Number_of_docs):
i = i + 1
batch_no = batch_no + 1
my_entity = create_entity(i)
batch.insert_or_merge_entity(my_entity)
if (batch_no > 50):
service.commit_batch(table_name, batch)
batch_no = 0
batch = TableBatch()
else:
batch_no = batch_no + 1
if (batch_no > 0):
service.commit_batch(table_name, batch)
endtime = datetime.utcnow()
print("\nrun_sample done :" + endtime.strftime("%Y-%m-%d %H:%M:%S.%f"))
print("Time taken :" + str(endtime-starttime))
print("Number of messages :" + str(Number_of_docs))
EDIT : Apologies, I did realize that the Cosmos table SDK is being replaced by the azure data-tabes API as per article released 3 days ago. So I have rewritten this code using the new table storage SDK and tested it again. Results are infact better. 3:55 for 10000 entities. You can find more examples on how to use this new SDK HERE.
#USING THE NEW TABLE STORAGE API
from azure.data.tables import TableClient
from datetime import datetime
from random import randrange
import random
import names
import json
from datetime import timedelta
import uuid
conn='xxxxxxxxxxxxxxxxxxxxx;EndpointSuffix=core.windows.net'
tablename='mytable'
table_client = TableClient.from_connection_string(conn_str=conn,table_name=tablename)
Number_of_docs = 10000
d1 = datetime.strptime('1/1/2008 1:30 PM', '%m/%d/%Y %I:%M %p')
d2 = datetime.strptime('1/1/2009 4:50 AM', '%m/%d/%Y %I:%M %p')
def random_date(start, end):
"""
This function will return a random datetime between two datetime
objects.
"""
delta = end - start
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
random_second = randrange(int_delta)
return start + timedelta(seconds=random_second)
def create_entity(id):
my_entity = {"PartitionKey" : "PARTITION1",
"RowKey" : str(id),
"employee_id" : str(id),
"first_name" : names.get_first_name(),
"last_name" : names.get_last_name(),
"regn_no" : "TEST00000000"+ str(id),
"start_date" : random_date(d1,d2),
"salary" : random.randint(12000, 2000000),
"inserted_at" : str(datetime.now())
}
return my_entity
starttime = datetime.utcnow()
print("Starting ingestion: ", starttime.strftime("%Y-%m-%d %H:%M:%S.%f"))
batch_no = 0
operations = []
for i in range(Number_of_docs):
i = i + 1
batch_no = batch_no + 1
my_entity = create_entity(i)
operations.append(("upsert", my_entity))
if (batch_no > 50):
table_client.submit_transaction(operations)
batch_no = 0
operations = []
else:
batch_no = batch_no + 1
if (batch_no > 0):
service.commit_batch(table_name, batch)
endtime = datetime.utcnow()
print("\nrun_sample done :" + endtime.strftime("%Y-%m-%d %H:%M:%S.%f"))
print("Time taken :" + str(endtime-starttime))
print("Number of messages :" + str(Number_of_docs))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With