I'm trying out airflow with the BigQueryOperator
. I thought I would use google composer later on, but I want it running locally first. I have airflow up and running an BashOperator
works fine, I can also run airflow test <dag> <task>
where task
is the big query task I want to run, but when I trigger the DAG from the UI the bigquery task is never queued. Instead they have the REMOVED
state and nothing happens.
My DAG definition looks like this:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
yesterday = datetime.combine(
datetime.today() - timedelta(1),
datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'tutorial', default_args=default_args) as dag:
operators
t1 = BashOperator(
task_id='print_date',
bash_command='date')
template_sql = '''
SELECT 'TOMAS' name, '{{ params.my_value }}' value, "{{ params.my_value2 }}" tables
'''
sampleBigQuery1 = BigQueryOperator(
task_id='bq_simple_sql1',
bql=template_sql,
use_legacy_sql=False,
destination_dataset_table='temp_tomas.airflow_1',
allow_large_results=True,
params={'my_value': (datetime.now()).strftime("%D %T"),
'my_value2': "yolo"}, # getTables()},
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE'
)
t1 >> sampleBigQuery1
So how do I debug a case when it works to run airflow test ...
but not when it triggered by scheduler or UI? Is it something that seems to be wrong with what I have here?
Locally I'm using a standard install of airflow with sqllite, but that shouldn't have any impact I think. I am running everything in one python env, so it should be pretty contained.
If you want to re-run a task in Airflow, the best way to do so is to press Clear or Delete (language depends on the Airflow version you're running), not Run . Hitting this will clear the state of your failed task and allow the scheduler to pick it back up and re-run it.
queue is an attribute of BaseOperator, so any task can be assigned to any queue. The default queue for the environment is defined in the airflow. cfg 's celery -> default_queue . This defines the queue that tasks get assigned to when not specified, as well as which queue Airflow workers listen to when started.
Now, in the "Actions" menu, select "Clear" and apply it to all of the queued tasks. Confirm your choice to Clear the queued tasks. Airflow should immediately prepare to run the queued tasks.
If this is your first Airflow setup, you might want to check those things first: Airflow 1.9.0 is queuing but not launching tasks
Additionally, here I'd recommend especially the last step:
This might give you more of an idea why the task is not being scheduled.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With