Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

setting up airflow with bigquery operator

I am experimenting with airflow for data pipelines. I unfortunately cannot get it to work with the bigquery operator so far. I have searched for a solution to the best of my ability but I am still stuck.. I am using the sequential executor running locally.

Here is my code:

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    dag_id='bigQueryPipeline', 
    default_args=default_args, 
    schedule_interval=timedelta(1)
)

t1 = BigQueryOperator(
    task_id='bigquery_test',
    bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
    destination_dataset_table=False,
    bigquery_conn_id='bigquery_default',
    delegate_to=False,
    udf_config=False,
    dag=dag,
)

The error message:

[2016-08-27 00:13:14,665] {models.py:1327} ERROR - 'project'
Traceback (most recent call last):
  File "/Users/jean.rodrigue/anaconda/bin/airflow", line 15, in <module>
    args.func(args)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 352, in test
    ti.run(force=True, ignore_dependencies=True, test_mode=True)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/models.py", line 1245, in run
    result = task_copy.execute(context=context)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 57, in execute
    conn = hook.get_conn()
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 54, in get_conn
    project = connection_extras['project']
like image 913
Jean-Christophe Rodrigue Avatar asked Aug 27 '16 07:08

Jean-Christophe Rodrigue


People also ask

How do I run BigQuery query in airflow?

Set up Airflow Connection to BigQueryStart your Airflow and then add a new connection. Paste the content of the JSON file you got from the previous section into the "Keyfile JSON" field then save it. The connection ID my_bigquery_connection will be used later in the next section.

What is airflow BigQuery?

It allows users to focus on analyzing data to find meaningful insights using familiar SQL. Airflow provides operators to manage datasets and tables, run queries and validate data.

How do I install Google airflow provider?

In order to install Airflow you need to either downgrade pip to version 20.2. 4 pip install --upgrade pip==20.2. 4 or, in case you use Pip 20.3, you need to add option --use-deprecated legacy-resolver to your pip install command.


2 Answers

Took me a while to finally find it as it's not documented very clearly. In the airflow UI, go to Admin -> Connection. That connection id is what is being referenced by the parameters bigquery_connection_id. You must add in the "extras" field a json object that defines a k,v pair of "project" : ""

You must also add keys for "service_account" and "key_path" if you have not explicitly authorized an account on the box you're running Airflow. (gcloud auth)

like image 138
J.Fratzke Avatar answered Oct 26 '22 00:10

J.Fratzke


If you need to do this programmatically, I use this as an entrypoint in our stack to create the connection if it doesn't already exist:

from airflow.models import Connection
from airflow.settings import Session

session = Session()
gcp_conn = Connection(
    conn_id='bigquery',
    conn_type='google_cloud_platform',
    extra='{"extra__google_cloud_platform__project":"<YOUR PROJECT HERE>"}')
if not session.query(Connection).filter(
        Connection.conn_id == gcp_conn.conn_id).first():
    session.add(gcp_conn)
    session.commit()
like image 30
clifton Avatar answered Oct 26 '22 00:10

clifton