Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get the result of an SQL query from Big Query in Airflow?

Using Airflow I want to get the result of an SQL Query fomratted as a pandas DataFrame.

def get_my_query(*args, **kwargs)
    bq_hook = BigQueryHook(bigquery_conn_id='my_connection_id', delegate_to=None)
    my_query = """ 
                 SELECT col1, col2
                 FROM `my_bq_project.my_bq_dataset.my_table`
                """
    df = bq_hook.get_pandas_df(bql=my_query, dialect='standard')
    logging.info('df.head()\n{}'.format(df.head()))

Above is the python function that I want to execute in a PythonOperator. Here is the DAG:

my_dag = DAG('my_dag',start_date=datetime.today())
start = DummyOperator(task_id='start', dag=my_dag)
end = DummyOperator(task_id='end', dag=my_dag)
work = PythonOperator(task_id='work',python_callable=get_my_query, dag=my_dag)
start >> work >> end

But, the work step is throwing an exception. Here is the log :

[2018-04-02 20:25:50,506] {base_task_runner.py:98} INFO - Subtask: [2018-04-02 20:25:50,506] {gcp_api_base_hook.py:82} INFO - Getting connection using a JSON key file.
[2018-04-02 20:25:51,035] {base_task_runner.py:98} INFO - Subtask: [2018-04-02 20:25:51,035] {slack_operator.py:70} ERROR - Slack API call failed (%s)
[2018-04-02 20:25:51,070] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-04-02 20:25:51,071] {base_task_runner.py:98} INFO - Subtask:   File "/opt/conda/bin/airflow", line 28, in <module>
[2018-04-02 20:25:51,072] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-04-02 20:25:51,072] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-04-02 20:25:51,073] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-04-02 20:25:51,074] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-04-02 20:25:51,075] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-04-02 20:25:51,075] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-04-02 20:25:51,076] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-04-02 20:25:51,077] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 89, in execute
[2018-04-02 20:25:51,077] {base_task_runner.py:98} INFO - Subtask:     return_value = self.execute_callable()
[2018-04-02 20:25:51,078] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
[2018-04-02 20:25:51,079] {base_task_runner.py:98} INFO - Subtask:     return self.python_callable(*self.op_args, **self.op_kwargs)
[2018-04-02 20:25:51,080] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/processing/dags/my_dag.py", line 37, in get_my_query
[2018-04-02 20:25:51,080] {base_task_runner.py:98} INFO - Subtask:     df = bq_hook.get_pandas_df(bql=my_query, dialect='standard')
[2018-04-02 20:25:51,081] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 94, in get_pandas_df
[2018-04-02 20:25:51,081] {base_task_runner.py:98} INFO - Subtask:     schema, pages = connector.run_query(bql)
[2018-04-02 20:25:51,082] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/pandas_gbq/gbq.py", line 502, in run_query
[2018-04-02 20:25:51,082] {base_task_runner.py:98} INFO - Subtask:     except self.http_error as ex:
[2018-04-02 20:25:51,082] {base_task_runner.py:98} INFO - Subtask: AttributeError: 'BigQueryPandasConnector' object has no attribute 'http_error'

This exception is due to this issue, which accroding to the description

When BigQueryPandasConnector (in bigquery_hook.py) encounters a BQ job insertion error, the exception will be assigned to connector.http_error

hides another exception, still strange because I'm not doing any insertion.

What am I doing wrong? Maybe there is a problem with bigquery_conn_id used in the BigQueryHook. Or, dataFrame is not the way to go in order to handle query results.

PS: result of pip freeze

alembic==0.8.10
amqp==2.2.2
apache-airflow==1.9.0
apache-beam==2.3.0
asn1crypto==0.24.0
avro==1.8.2
Babel==2.5.3
backports-abc==0.5
bcrypt==3.1.4
billiard==3.5.0.3
bleach==2.1.2
cachetools==2.0.1
celery==4.1.0
certifi==2018.1.18
cffi==1.11.4
chardet==3.0.4
click==6.7
configparser==3.5.0
crcmod==1.7
croniter==0.3.20
cryptography==2.1.4
dill==0.2.7.1
docutils==0.14
elasticsearch==1.4.0
enum34==1.1.6
fasteners==0.14.1
Flask==0.11.1
Flask-Admin==1.4.1
Flask-Bcrypt==0.7.1
Flask-Cache==0.13.1
Flask-Login==0.2.11
flask-swagger==0.2.13
Flask-WTF==0.14
flower==0.9.2
funcsigs==1.0.0
future==0.16.0
futures==3.2.0
gapic-google-cloud-datastore-v1==0.15.3
gapic-google-cloud-error-reporting-v1beta1==0.15.3
gapic-google-cloud-logging-v2==0.91.3
gapic-google-cloud-pubsub-v1==0.15.4
gapic-google-cloud-spanner-admin-database-v1==0.15.3
gapic-google-cloud-spanner-admin-instance-v1==0.15.3sta
gapic-google-cloud-spanner-v1==0.15.3
gitdb2==2.0.3
GitPython==2.1.8
google-api-core==1.1.0
google-api-python-client==1.6.5
google-apitools==0.5.20
google-auth==1.4.1
google-auth-oauthlib==0.2.0
google-cloud==0.27.0
google-cloud-bigquery==0.31.0
google-cloud-bigtable==0.26.0
google-cloud-core==0.28.1
google-cloud-dataflow==2.3.0
google-cloud-datastore==1.2.0
google-cloud-dns==0.26.0
google-cloud-error-reporting==0.26.0
google-cloud-language==0.27.0
google-cloud-logging==1.2.0
google-cloud-monitoring==0.26.0
google-cloud-pubsub==0.27.0
google-cloud-resource-manager==0.26.0
google-cloud-runtimeconfig==0.26.0
google-cloud-spanner==0.26.0
google-cloud-speech==0.28.0
google-cloud-storage==1.3.2
google-cloud-translate==1.1.0
google-cloud-videointelligence==0.25.0
google-cloud-vision==0.26.0
google-gax==0.15.16
google-resumable-media==0.3.1
googleads==4.5.1
googleapis-common-protos==1.5.3
googledatastore==7.0.1
grpc-google-iam-v1==0.11.4
grpcio==1.10.0
gunicorn==19.7.1
hdfs3==0.3.0
html5lib==1.0.1
httplib2==0.10.3
idna==2.6
ipaddress==1.0.19
itsdangerous==0.24
Jinja2==2.8.1
kombu==4.1.0
ldap3==2.4.1
lockfile==0.12.2
lxml==3.8.0
Mako==1.0.7
Markdown==2.6.11
MarkupSafe==1.0
mock==2.0.0
monotonic==1.4
mysqlclient==1.3.10
numpy==1.13.0
oauth2client==2.0.2
oauthlib==2.0.7
ordereddict==1.1
pandas==0.19.2
pandas-gbq==0.3.1
pbr==3.1.1
ply==3.8
proto-google-cloud-datastore-v1==0.90.4
proto-google-cloud-error-reporting-v1beta1==0.15.3
proto-google-cloud-logging-v2==0.91.3
proto-google-cloud-pubsub-v1==0.15.4
proto-google-cloud-spanner-admin-database-v1==0.15.3
proto-google-cloud-spanner-admin-instance-v1==0.15.3
proto-google-cloud-spanner-v1==0.15.3
protobuf==3.5.2
psutil==4.4.2
pyasn1==0.4.2
pyasn1-modules==0.2.1
pycosat==0.6.3
pycparser==2.18
Pygments==2.2.0
pyOpenSSL==17.5.0
PySocks==1.6.8
python-daemon==2.1.2
python-dateutil==2.7.0
python-editor==1.0.3
python-nvd3==0.14.2
python-slugify==1.1.4
pytz==2018.3
PyVCF==0.6.8
PyYAML==3.12
redis==2.10.6
requests==2.18.4
requests-oauthlib==0.8.0
rsa==3.4.2
setproctitle==1.1.10
setuptools-scm==1.15.0
singledispatch==3.4.0.3
six==1.11.0
slackclient==1.1.3
smmap2==2.0.3
SQLAlchemy==1.2.5
statsd==3.2.2
suds-jurko==0.6
tableauserverclient==0.5.1
tabulate==0.7.7
tenacity==4.9.0
thrift==0.11.0
tornado==5.0.1
typing==3.6.4
Unidecode==1.0.22
uritemplate==3.0.0
urllib3==1.22
vine==1.1.4
webencodings==0.5.1
websocket-client==0.47.0
Werkzeug==0.14.1
WTForms==2.1
xmltodict==0.11.0
zope.deprecation==4.3.0
like image 749
MassyB Avatar asked Apr 02 '18 22:04

MassyB


People also ask

How do I export BigQuery results?

Open the BigQuery page in the Google Cloud console. In the Explorer panel, expand your project and dataset, then select the table. In the details panel, click Export and select Export to Cloud Storage.

How do you view a query plan in a BigQuery?

Viewing information with the Google Cloud console In the Google Cloud console, you can see details of the query plan for a completed query by clicking the Execution Details button (near the Results button).

When you run a query in BigQuery what happens to the results?

By default, BigQuery caches query results for 24 hours, with the exceptions noted previously. Queries against a table protected by column-level security might not be cached. If BigQuery does cache the result, the 24-hour cache lifetime applies.


1 Answers

Another possible way would be to use the pandas Big Query connector.

pd.read_gbq

and

pd.to_gbq

Looking at the stack trace, the BigQueryHook is using the connector itself.

It might be a good idea to

1) try the connection with the pandas connector in a PythonOperator directly

2) then maybe switch to the pandas connector or try to debug the BigQueryHook when the connection works

like image 62
tobi6 Avatar answered Nov 14 '22 23:11

tobi6