Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Broken DAG: [/airflow/dags/a.py] Can't decrypt `extra` params for login=None, FERNET_KEY configuration is missing

Tags:

python

airflow

I started Airflow with no FERNET_KEY. Once I realised it, I did the following: https://airflow.apache.org/configuration.html#connections

pip install apache-airflow[crypto]

from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key)

took the key and placed it in airflow.cfg and then called airflow initdb, but the error still appears.

What am I doing wrong?

When I do:

airflow webserver -D

I get:

  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 713, in extra_dejson
    if self.extra:
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/attributes.py", line 293, in __get__
    return self.descriptor.__get__(instance, owner)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 632, in get_extra
    return fernet.decrypt(bytes(self._extra, 'utf-8')).decode()
  File "/usr/lib/python2.7/dist-packages/cryptography/fernet.py", line 101, in decrypt
    raise InvalidToken

The log indicates that there is issue with this code:

def get_conn(conn_id, session=None):
    conn = (session.query(Connection)
                   .filter(Connection.conn_id == conn_id)
                   .first())
    return conn


def my_python_function():
   conn = get_conn('s3connection')
   key_id = conn.extra_dejson.get('aws_access_key_id')
   secret_key = conn.extra_dejson.get('aws_secret_access_key')
   default_region = conn.extra_dejson.get('region_name')
   return key_id,secret_key,default_region
like image 282
jack Avatar asked Jun 13 '18 13:06

jack


People also ask

How do you pass parameters in Airflow DAG?

You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field.

How do I get a Fernet key?

If you need to generate a new fernet key you can use the following code snippet. from cryptography. fernet import Fernet fernet_key = Fernet. generate_key() print(fernet_key.

How do you access parameters in DAG?

with DAG( "the_dag", params={"x": Param(5, type="integer", minimum=3)}, render_template_as_native_obj=True ) as the_dag: This way, the Param's type is respected when its provided to your task. Another way to access your param is via a task's context kwarg.

Is Start_date mandatory in Airflow DAG?

This is no longer required. Airflow will now auto align the start_date and the schedule , by using the start_date as the moment to start looking.


2 Answers

Using Fernet, Airflow encrypt all the passwords for its connections in the backend database.

In your case, Airflow backend is using previous fernet key and you have generated a key using which you have created new connection.

My recommendation is to do the following First,

airflow resetdb

this will help in deleting all the existing records in your backend db.

Then,

airflow initdb

this will initialize backend like fresh.

Then start airflow web server and scheduler

airflow web server -p {port}
airflow scheduler

Then create new connection of s3 in UI (in extra - {"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"})

Now you should be able to test s3 file watcher by following solution from - Airflow s3 connection using UI

like image 66
shyam Avatar answered Oct 20 '22 16:10

shyam


Airflow usually generates one for you.

Here's an example:

$ python
>>> from cryptography.fernet import Fernet
>>> k=Fernet.generate_key()
>>> print(k)
Z6BkzaWcF7r5cC-VMAumjpBpudSyjGskQ0ObquGJhG0=
>>> ^D
$ $EDITOR $AIRFLOW_HOME/airflow.cfg

There change:

# Secret key to save connection passwords in the db
fernet_key = cryptography_not_found_storing_passwords_in_plain_text

to:

# Secret key to save connection passwords in the db
fernet_key = Z6BkzaWcF7r5cC-VMAumjpBpudSyjGskQ0ObquGJhG0=

Check if it's set as expected (or it'll generate a random one each time)

$ python
Python 2.7.13 (default, Jul 18 2017, 09:17:00)
[GCC 4.2.1 Compatible Apple LLVM 8.1.0 (clang-802.0.42)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow import configuration as conf
[2018-06-14 17:53:36,200] {__init__.py:57} INFO - Using executor SequentialExecutor
>>> conf.get('core','fernet_key')
'Z6BkzaWcF7r5cC-VMAumjpBpudSyjGskQ0ObquGJhG0='
>>>

The above should be v1.9.0 & v1.8.2 syntax [fixed], I've double checked this with the latter.

Whenever you change your fernet key, you need to delete all the connections and variables that are using encryption, as they won't decrypt anymore.

You could reset your db, but that's probably overdoing it.

like image 25
dlamblin Avatar answered Oct 20 '22 18:10

dlamblin