I have a function fore firestore in python where I do a for loop for all the users of one collection, then I go into another collection to take some metrics and I update this metrics in the first collections.
I run the function but in some point in the execution the function breaks giving me this error:
_Rendezvous Traceback (most recent call last)
~\Anaconda3\envs\work\lib\site-packages\google\api_core\grpc_helpers.py in next(self)
78 try:
---> 79 return six.next(self._wrapped)
80 except grpc.RpcError as exc:
~\Anaconda3\envs\work\lib\site-packages\grpc\_channel.py in __next__(self)
363 def __next__(self):
--> 364 return self._next()
365
~\Anaconda3\envs\work\lib\site-packages\grpc\_channel.py in _next(self)
346 else:
--> 347 raise self
348 while True:
_Rendezvous: <_Rendezvous of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "{"created":"@1570660422.708000000","description":"Error received from peer ipv4:216.58.202.234:443","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Deadline Exceeded","grpc_status":4}"
>
The above exception was the direct cause of the following exception:
DeadlineExceeded Traceback (most recent call last)
<ipython-input-20-05c9cefdafb4> in <module>
----> 1 update_collection__persons()
<ipython-input-19-6e2bdd597a6e> in update_collection__persons()
10 counter_secs = 0
11
---> 12 for person_doc in person_docs:
13 person_dict = person_doc.to_dict()
14 last_updated = person_dict['last_updated']
~\Anaconda3\envs\work\lib\site-packages\google\cloud\firestore_v1\query.py in stream(self, transaction)
766 )
767
--> 768 for response in response_iterator:
769 if self._all_descendants:
770 snapshot = _collection_group_query_response_to_snapshot(
~\Anaconda3\envs\work\lib\site-packages\google\api_core\grpc_helpers.py in next(self)
79 return six.next(self._wrapped)
80 except grpc.RpcError as exc:
---> 81 six.raise_from(exceptions.from_grpc_error(exc), exc)
82
83 # Alias needed for Python 2/3 support.
~\Anaconda3\envs\work\lib\site-packages\six.py in raise_from(value, from_value)
DeadlineExceeded: 504 Deadline Exceeded
I have been looking up for a solution, there is not much information, here I found a similar problem: https://github.com/googleapis/google-cloud-python/issues/8933
So I tried to use this code but is not working. This is my function:
def update_collection__persons():
persons = db.collection(u'collections__persons')
person_docs = persons.stream()
counter_secs = 0
for person_doc in person_docs:
person_dict = person_doc.to_dict()
last_updated = person_dict['last_updated']
last_processed = person_dict['last_processed']
dt_last_updated = datetime(1, 1, 1) + timedelta(microseconds=last_updated/10)
dt_last_processed = datetime(1, 1, 1) + timedelta(microseconds=last_processed/10)
if dt_last_processed < dt_last_updated:
orders = db.collection(u'collection__orders').where(u'email', u'==', person_dict['email'])
orders_docs = orders.stream()
sum_price = 0
count = 0
date_add_list = []
for order_doc in orders_docs:
order_dict = order_doc.to_dict()
sum_price += order_dict['total_price']
count +=1
date_add_list.append(order_dict['dateAdded'])
if count > 0:
data = {'metrics': {'LTV': sum_price,
'AOV': sum_price/count,
'Quantity_orders': count,
'first_order_date': min(date_add_list),
'last_order_date': max(date_add_list)},
'last_processed': int((datetime.utcnow() - datetime(1, 1, 1)).total_seconds() * 10000000)}
db.collection(u'collection__persons').document(person_dict['email']).set(data, merge = True)
I have created a counter_secs just to see if the function is breaking always in the same query but it is not.
Also after run the function if I see random users for some of them I have update their data, so it is working but breaking in some point
There's a 60 second timeout for persons.stream()
. Instead of processing each document as you stream, try fetching all the documents upfront:
person_docs = [snapshot for snapshot in persons.stream()]
If you have more documents than you can fetch in 60 seconds, try a recursive function like in this answer.
Same for the orders:
orders_docs = [snapshot for snapshot in orders.stream()]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With