Using Airflow I want to get the result of an SQL Query fomratted as a pandas DataFrame.
def get_my_query(*args, **kwargs)
bq_hook = BigQueryHook(bigquery_conn_id='my_connection_id', delegate_to=None)
my_query = """
SELECT col1, col2
FROM `my_bq_project.my_bq_dataset.my_table`
"""
df = bq_hook.get_pandas_df(bql=my_query, dialect='standard')
logging.info('df.head()\n{}'.format(df.head()))
Above is the python function that I want to execute in a PythonOperator
. Here is the DAG:
my_dag = DAG('my_dag',start_date=datetime.today())
start = DummyOperator(task_id='start', dag=my_dag)
end = DummyOperator(task_id='end', dag=my_dag)
work = PythonOperator(task_id='work',python_callable=get_my_query, dag=my_dag)
start >> work >> end
But, the work step is throwing an exception. Here is the log :
[2018-04-02 20:25:50,506] {base_task_runner.py:98} INFO - Subtask: [2018-04-02 20:25:50,506] {gcp_api_base_hook.py:82} INFO - Getting connection using a JSON key file.
[2018-04-02 20:25:51,035] {base_task_runner.py:98} INFO - Subtask: [2018-04-02 20:25:51,035] {slack_operator.py:70} ERROR - Slack API call failed (%s)
[2018-04-02 20:25:51,070] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-04-02 20:25:51,071] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/bin/airflow", line 28, in <module>
[2018-04-02 20:25:51,072] {base_task_runner.py:98} INFO - Subtask: args.func(args)
[2018-04-02 20:25:51,072] {base_task_runner.py:98} INFO - Subtask: File "/home/airflow/.local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-04-02 20:25:51,073] {base_task_runner.py:98} INFO - Subtask: pool=args.pool,
[2018-04-02 20:25:51,074] {base_task_runner.py:98} INFO - Subtask: File "/home/airflow/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-04-02 20:25:51,075] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
[2018-04-02 20:25:51,075] {base_task_runner.py:98} INFO - Subtask: File "/home/airflow/.local/lib/python2.7/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-04-02 20:25:51,076] {base_task_runner.py:98} INFO - Subtask: result = task_copy.execute(context=context)
[2018-04-02 20:25:51,077] {base_task_runner.py:98} INFO - Subtask: File "/home/airflow/.local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 89, in execute
[2018-04-02 20:25:51,077] {base_task_runner.py:98} INFO - Subtask: return_value = self.execute_callable()
[2018-04-02 20:25:51,078] {base_task_runner.py:98} INFO - Subtask: File "/home/airflow/.local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
[2018-04-02 20:25:51,079] {base_task_runner.py:98} INFO - Subtask: return self.python_callable(*self.op_args, **self.op_kwargs)
[2018-04-02 20:25:51,080] {base_task_runner.py:98} INFO - Subtask: File "/home/airflow/.local/lib/python2.7/site-packages/processing/dags/my_dag.py", line 37, in get_my_query
[2018-04-02 20:25:51,080] {base_task_runner.py:98} INFO - Subtask: df = bq_hook.get_pandas_df(bql=my_query, dialect='standard')
[2018-04-02 20:25:51,081] {base_task_runner.py:98} INFO - Subtask: File "/home/airflow/.local/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 94, in get_pandas_df
[2018-04-02 20:25:51,081] {base_task_runner.py:98} INFO - Subtask: schema, pages = connector.run_query(bql)
[2018-04-02 20:25:51,082] {base_task_runner.py:98} INFO - Subtask: File "/home/airflow/.local/lib/python2.7/site-packages/pandas_gbq/gbq.py", line 502, in run_query
[2018-04-02 20:25:51,082] {base_task_runner.py:98} INFO - Subtask: except self.http_error as ex:
[2018-04-02 20:25:51,082] {base_task_runner.py:98} INFO - Subtask: AttributeError: 'BigQueryPandasConnector' object has no attribute 'http_error'
This exception is due to this issue, which accroding to the description
When BigQueryPandasConnector (in bigquery_hook.py) encounters a BQ job insertion error, the exception will be assigned to connector.http_error
hides another exception, still strange because I'm not doing any insertion.
What am I doing wrong? Maybe there is a problem with bigquery_conn_id
used in the BigQueryHook
. Or, dataFrame is not the way to go in order to handle query results.
PS: result of pip freeze
alembic==0.8.10
amqp==2.2.2
apache-airflow==1.9.0
apache-beam==2.3.0
asn1crypto==0.24.0
avro==1.8.2
Babel==2.5.3
backports-abc==0.5
bcrypt==3.1.4
billiard==3.5.0.3
bleach==2.1.2
cachetools==2.0.1
celery==4.1.0
certifi==2018.1.18
cffi==1.11.4
chardet==3.0.4
click==6.7
configparser==3.5.0
crcmod==1.7
croniter==0.3.20
cryptography==2.1.4
dill==0.2.7.1
docutils==0.14
elasticsearch==1.4.0
enum34==1.1.6
fasteners==0.14.1
Flask==0.11.1
Flask-Admin==1.4.1
Flask-Bcrypt==0.7.1
Flask-Cache==0.13.1
Flask-Login==0.2.11
flask-swagger==0.2.13
Flask-WTF==0.14
flower==0.9.2
funcsigs==1.0.0
future==0.16.0
futures==3.2.0
gapic-google-cloud-datastore-v1==0.15.3
gapic-google-cloud-error-reporting-v1beta1==0.15.3
gapic-google-cloud-logging-v2==0.91.3
gapic-google-cloud-pubsub-v1==0.15.4
gapic-google-cloud-spanner-admin-database-v1==0.15.3
gapic-google-cloud-spanner-admin-instance-v1==0.15.3sta
gapic-google-cloud-spanner-v1==0.15.3
gitdb2==2.0.3
GitPython==2.1.8
google-api-core==1.1.0
google-api-python-client==1.6.5
google-apitools==0.5.20
google-auth==1.4.1
google-auth-oauthlib==0.2.0
google-cloud==0.27.0
google-cloud-bigquery==0.31.0
google-cloud-bigtable==0.26.0
google-cloud-core==0.28.1
google-cloud-dataflow==2.3.0
google-cloud-datastore==1.2.0
google-cloud-dns==0.26.0
google-cloud-error-reporting==0.26.0
google-cloud-language==0.27.0
google-cloud-logging==1.2.0
google-cloud-monitoring==0.26.0
google-cloud-pubsub==0.27.0
google-cloud-resource-manager==0.26.0
google-cloud-runtimeconfig==0.26.0
google-cloud-spanner==0.26.0
google-cloud-speech==0.28.0
google-cloud-storage==1.3.2
google-cloud-translate==1.1.0
google-cloud-videointelligence==0.25.0
google-cloud-vision==0.26.0
google-gax==0.15.16
google-resumable-media==0.3.1
googleads==4.5.1
googleapis-common-protos==1.5.3
googledatastore==7.0.1
grpc-google-iam-v1==0.11.4
grpcio==1.10.0
gunicorn==19.7.1
hdfs3==0.3.0
html5lib==1.0.1
httplib2==0.10.3
idna==2.6
ipaddress==1.0.19
itsdangerous==0.24
Jinja2==2.8.1
kombu==4.1.0
ldap3==2.4.1
lockfile==0.12.2
lxml==3.8.0
Mako==1.0.7
Markdown==2.6.11
MarkupSafe==1.0
mock==2.0.0
monotonic==1.4
mysqlclient==1.3.10
numpy==1.13.0
oauth2client==2.0.2
oauthlib==2.0.7
ordereddict==1.1
pandas==0.19.2
pandas-gbq==0.3.1
pbr==3.1.1
ply==3.8
proto-google-cloud-datastore-v1==0.90.4
proto-google-cloud-error-reporting-v1beta1==0.15.3
proto-google-cloud-logging-v2==0.91.3
proto-google-cloud-pubsub-v1==0.15.4
proto-google-cloud-spanner-admin-database-v1==0.15.3
proto-google-cloud-spanner-admin-instance-v1==0.15.3
proto-google-cloud-spanner-v1==0.15.3
protobuf==3.5.2
psutil==4.4.2
pyasn1==0.4.2
pyasn1-modules==0.2.1
pycosat==0.6.3
pycparser==2.18
Pygments==2.2.0
pyOpenSSL==17.5.0
PySocks==1.6.8
python-daemon==2.1.2
python-dateutil==2.7.0
python-editor==1.0.3
python-nvd3==0.14.2
python-slugify==1.1.4
pytz==2018.3
PyVCF==0.6.8
PyYAML==3.12
redis==2.10.6
requests==2.18.4
requests-oauthlib==0.8.0
rsa==3.4.2
setproctitle==1.1.10
setuptools-scm==1.15.0
singledispatch==3.4.0.3
six==1.11.0
slackclient==1.1.3
smmap2==2.0.3
SQLAlchemy==1.2.5
statsd==3.2.2
suds-jurko==0.6
tableauserverclient==0.5.1
tabulate==0.7.7
tenacity==4.9.0
thrift==0.11.0
tornado==5.0.1
typing==3.6.4
Unidecode==1.0.22
uritemplate==3.0.0
urllib3==1.22
vine==1.1.4
webencodings==0.5.1
websocket-client==0.47.0
Werkzeug==0.14.1
WTForms==2.1
xmltodict==0.11.0
zope.deprecation==4.3.0
Open the BigQuery page in the Google Cloud console. In the Explorer panel, expand your project and dataset, then select the table. In the details panel, click Export and select Export to Cloud Storage.
Viewing information with the Google Cloud console In the Google Cloud console, you can see details of the query plan for a completed query by clicking the Execution Details button (near the Results button).
By default, BigQuery caches query results for 24 hours, with the exceptions noted previously. Queries against a table protected by column-level security might not be cached. If BigQuery does cache the result, the 24-hour cache lifetime applies.
Another possible way would be to use the pandas Big Query connector.
pd.read_gbq
and
pd.to_gbq
Looking at the stack trace, the BigQueryHook
is using the connector itself.
It might be a good idea to
1) try the connection with the pandas connector in a PythonOperator
directly
2) then maybe switch to the pandas connector or try to debug the BigQueryHook
when the connection works
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With