I am getting the following error when trying to run to_dataframe() method from Google BigQuery Storage Lib.
The full code is here:
import google.auth
import os
import time
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1
import fastavro
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=credentials
)
table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_taxi_trips"
table.table_id = "tlc_yellow_trips_2018"
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.row_restriction = 'pickup_location_id = "48"'
read_options.selected_fields.append("vendor_id")
read_options.selected_fields.append("passenger_count")
read_options.selected_fields.append("trip_distance")
read_options.selected_fields.append("rate_code")
read_options.selected_fields.append("store_and_fwd_flag")
read_options.selected_fields.append("payment_type")
read_options.selected_fields.append("fare_amount")
read_options.selected_fields.append("extra")
read_options.selected_fields.append("mta_tax")
read_options.selected_fields.append("tip_amount")
read_options.selected_fields.append("tolls_amount")
read_options.selected_fields.append("imp_surcharge")
read_options.selected_fields.append("total_amount")
read_options.selected_fields.append("pickup_location_id")
read_options.selected_fields.append("dropoff_location_id")
parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
table, parent, read_options=read_options
)
now=time.time()
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)
dataframe = reader.to_dataframe(session)
I am using Conda on Windows 10. The versions of Google Libs are below:
google-api-core 1.14.2 py37h21ff451_0 conda-forge
google-api-core-grpc 1.14.2 h21ff451_0 conda-forge
google-api-python-client 1.7.11 py_0 conda-forge
google-auth 1.6.3 py_0 conda-forge
google-auth-httplib2 0.0.3 py_2 conda-forge
google-cloud-bigquery 1.19.0 py37_0 conda-forge
google-cloud-bigquery-storage 0.7.0 0 conda-forge
google-cloud-bigquery-storage-core 0.7.0 py37h21ff451_0 conda-forge
google-cloud-core 1.0.3 py_0 conda-forge
google-resumable-media 0.3.3 py_0 conda-forge
googleapis-common-protos 1.6.0 py37h21ff451_0
grpcio 1.16.1 py37h351948d_1
Here is the error I get:
---------------------------------------------------------------------------
_Rendezvous Traceback (most recent call last)
~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\google\api_core\grpc_helpers.py in next(self)
78 try:
---> 79 return six.next(self._wrapped)
80 except grpc.RpcError as exc:
~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\grpc\_channel.py in __next__(self)
363 def __next__(self):
--> 364 return self._next()
365
~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\grpc\_channel.py in _next(self)
346 else:
--> 347 raise self
348 while True:
_Rendezvous: <_Rendezvous of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "{"created":"@1568284475.885000000","description":"Error received from peer","file":"src/core/lib/surface/call.cc","file_line":1017,"grpc_message":"Deadline Exceeded","grpc_status":4}"
>
The above exception was the direct cause of the following exception:
DeadlineExceeded Traceback (most recent call last)
<ipython-input-9-8e16e005ecd6> in <module>
48 position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
49 reader = bqstorageclient.read_rows(position)
---> 50 dataframe = reader.to_dataframe(session)
~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\google\cloud\bigquery_storage_v1beta1\reader.py in to_dataframe(self, read_session, dtypes)
220 raise ImportError(_PANDAS_REQUIRED)
221
--> 222 return self.rows(read_session).to_dataframe(dtypes=dtypes)
223
224
~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\google\cloud\bigquery_storage_v1beta1\reader.py in to_dataframe(self, dtypes)
313
314 frames = []
--> 315 for page in self.pages:
316 frames.append(page.to_dataframe(dtypes=dtypes))
317 return pandas.concat(frames)
~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\google\cloud\bigquery_storage_v1beta1\reader.py in pages(self)
261 # Each page is an iterator of rows. But also has num_items, remaining,
262 # and to_dataframe.
--> 263 for message in self._reader:
264 self._status = message.status
265 yield ReadRowsPage(self._stream_parser, message)
~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\google\cloud\bigquery_storage_v1beta1\reader.py in __iter__(self)
126 while True:
127 try:
--> 128 for message in self._wrapped:
129 rowcount = message.row_count
130 self._position.offset += rowcount
~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\google\api_core\grpc_helpers.py in next(self)
79 return six.next(self._wrapped)
80 except grpc.RpcError as exc:
---> 81 six.raise_from(exceptions.from_grpc_error(exc), exc)
82
83 # Alias needed for Python 2/3 support.
~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\six.py in raise_from(value, from_value)
DeadlineExceeded: 504 Deadline Exceeded
The code fails immediately with Deadline Exceeded error after I run this line dataframe = reader.to_dataframe(session). The rest of code executes without errors.
I've also created issue on GitHub but didn't get any useful reply there: https://github.com/googleapis/google-cloud-python/issues/9135
I tried older versions of Google Libs. I have exactly the same issue on my another Windows PC (laptop with Windows 10). Everything works fine on Linux machines.
Any help would be appreciated.
I believe (not sure) that this error means the request is surpassing the default timeout limit. If you look at test_reader.py the comments seem to suggest that a retry won't occur if the Deadline Exception is raised.
'# Don't reconnect on DeadlineException. This allows user-specified timeouts'
You can override the default timeout by passing read_rows a manual timeout parameter in seconds:
reader = bqstorageclient.read_rows(position)
So I think it would look like this, although I don't know how large to make the timeout limit:
reader = bqstorageclient.read_rows(position, timeout=100)
See also line 99 of client.py for discussion of the read_rows timeout parameter.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With