Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Airflow macro outside of operators

Is there any way to use Airflow macro outside of any operators?

For example, in the DAG I have an action:

datestamp = '{{ ds }}'

print(datestamp) # prints string not the date when I run it for any date

scanner = S3KeySensor(
        task_id='scanner',
        poke_interval=60,
        timeout=24 * 60 * 60,
        soft_fail=False,
        wildcard_match=True,
        bucket_key=getPath() + datestamp, #datestamp correctly replaced with execution date
        bucket_name=bucketName,
        dag=dag)

So when calling scanner, "ds" value gets replaced with execution date which is expected, but I want to use "ds" value in some other places. But in that case, it doesn't replace value, instead it gets entire string as "{{ ds }}". In the above example. print statement prints "{{ ds }}" not the execution date.

like image 688
dreamer Avatar asked Jul 12 '17 18:07

dreamer


1 Answers

Lucky for you bucket_key is templated, just put the jinja template inside.

…
bucket_key=getPath() + '{{ ds }}',
…

Completely outside an operator you cannot use these macros. Because the file is interpreted regularly by the scheduler, and not just during a dag run. So what would the value of ds be when the dag isn't running?

However since you're unlikely to want to do anything with it outside the tasks, you could put it into a templated field. You can also extend another field to be templated.

class MySensor(S3KeySensor):
    template_fields = ('bucket_key', 'bucket_name', 'my_thing')

    def __init__(self, my_thing=None, *args, **kwargs):
        super(MySensor, self).__init__(*args, **kwargs)
        self.my_thing = my_thing

    def post_execute(self, context):
        logging.info(
           "I probably wanted to over-ride poke to use {}".format(self.my_thing)

scanner = MySensor(
    my_thing='{{ ds }}',
    task_id='scanner',
    poke_interval=60,
    timeout=24 * 60 * 60,
    soft_fail=False,
    wildcard_match=True,
    bucket_key=getPath() + '{{ ds }}',
    bucket_name=bucketName,
    dag=dag)

Edit: IIRC self.my_thing doesn't change after the init, rather, the context.my_thing will get templated just before (?pre_execute and) execute is called.

like image 75
dlamblin Avatar answered Nov 03 '22 00:11

dlamblin