I started Airflow with no FERNET_KEY. Once I realised it, I did the following: https://airflow.apache.org/configuration.html#connections
pip install apache-airflow[crypto]
from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key)
took the key and placed it in airflow.cfg
and then called airflow initdb
, but the error still appears.
What am I doing wrong?
When I do:
airflow webserver -D
I get:
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 713, in extra_dejson
if self.extra:
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/attributes.py", line 293, in __get__
return self.descriptor.__get__(instance, owner)
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 632, in get_extra
return fernet.decrypt(bytes(self._extra, 'utf-8')).decode()
File "/usr/lib/python2.7/dist-packages/cryptography/fernet.py", line 101, in decrypt
raise InvalidToken
The log indicates that there is issue with this code:
def get_conn(conn_id, session=None):
conn = (session.query(Connection)
.filter(Connection.conn_id == conn_id)
.first())
return conn
def my_python_function():
conn = get_conn('s3connection')
key_id = conn.extra_dejson.get('aws_access_key_id')
secret_key = conn.extra_dejson.get('aws_secret_access_key')
default_region = conn.extra_dejson.get('region_name')
return key_id,secret_key,default_region
You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field.
If you need to generate a new fernet key you can use the following code snippet. from cryptography. fernet import Fernet fernet_key = Fernet. generate_key() print(fernet_key.
with DAG( "the_dag", params={"x": Param(5, type="integer", minimum=3)}, render_template_as_native_obj=True ) as the_dag: This way, the Param's type is respected when its provided to your task. Another way to access your param is via a task's context kwarg.
This is no longer required. Airflow will now auto align the start_date and the schedule , by using the start_date as the moment to start looking.
Using Fernet, Airflow encrypt all the passwords for its connections in the backend database.
In your case, Airflow backend is using previous fernet key and you have generated a key using which you have created new connection.
My recommendation is to do the following First,
airflow resetdb
this will help in deleting all the existing records in your backend db.
Then,
airflow initdb
this will initialize backend like fresh.
Then start airflow web server and scheduler
airflow web server -p {port}
airflow scheduler
Then create new connection of s3 in UI (in extra - {"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"})
Now you should be able to test s3 file watcher by following solution from - Airflow s3 connection using UI
Airflow usually generates one for you.
Here's an example:
$ python
>>> from cryptography.fernet import Fernet
>>> k=Fernet.generate_key()
>>> print(k)
Z6BkzaWcF7r5cC-VMAumjpBpudSyjGskQ0ObquGJhG0=
>>> ^D
$ $EDITOR $AIRFLOW_HOME/airflow.cfg
There change:
# Secret key to save connection passwords in the db
fernet_key = cryptography_not_found_storing_passwords_in_plain_text
to:
# Secret key to save connection passwords in the db
fernet_key = Z6BkzaWcF7r5cC-VMAumjpBpudSyjGskQ0ObquGJhG0=
Check if it's set as expected (or it'll generate a random one each time)
$ python
Python 2.7.13 (default, Jul 18 2017, 09:17:00)
[GCC 4.2.1 Compatible Apple LLVM 8.1.0 (clang-802.0.42)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow import configuration as conf
[2018-06-14 17:53:36,200] {__init__.py:57} INFO - Using executor SequentialExecutor
>>> conf.get('core','fernet_key')
'Z6BkzaWcF7r5cC-VMAumjpBpudSyjGskQ0ObquGJhG0='
>>>
The above should be v1.9.0 & v1.8.2 syntax [fixed], I've double checked this with the latter.
Whenever you change your fernet key, you need to delete all the connections and variables that are using encryption, as they won't decrypt anymore.
You could reset your db, but that's probably overdoing it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With