I want to use MySQL as the backend database for apache airflow after installing dependencies when I run
airflow initdb
Airflow starts setting up the database, but then it fails with the following log
shahbaz@OpenSource:~$ airflow initdb
[2019-07-11 12:01:13,726] {settings.py:182} INFO -
settings.configure_orm(): Using pool settings. pool_size=5,
pool_recycle=1800, pid=17492
[2019-07-11 12:01:13,917] {__init__.py:51} INFO - Using executor
LocalExecutor
DB: mysql+mysqldb://airflow:***@localhost:3306/airflow
[2019-07-11 12:01:14,276] {db.py:350} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl MySQLImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> e3a246e0dc1,
current schema
INFO [alembic.runtime.migration] Running upgrade e3a246e0dc1 ->
1507a7289a2f, create is_encrypted
INFO [alembic.runtime.migration] Running upgrade 1507a7289a2f ->
13eb55f81627, maintain history for compatibility with earlier
migrations
INFO [alembic.runtime.migration] Running upgrade 13eb55f81627 ->
338e90f54d61, More logging into task_instance
INFO [alembic.runtime.migration] Running upgrade 338e90f54d61 ->
52d714495f0, job_id indices
INFO [alembic.runtime.migration] Running upgrade 52d714495f0 ->
502898887f84, Adding extra to Log
INFO [alembic.runtime.migration] Running upgrade 502898887f84 ->
1b38cef5b76e, add dagrun
INFO [alembic.runtime.migration] Running upgrade 1b38cef5b76e ->
2e541a1dcfed, task_duration
INFO [alembic.runtime.migration] Running upgrade 2e541a1dcfed ->
40e67319e3a9, dagrun_config
INFO [alembic.runtime.migration] Running upgrade 40e67319e3a9 ->
561833c1c74b, add password column to user
INFO [alembic.runtime.migration] Running upgrade 561833c1c74b ->
4446e08588, dagrun start end
INFO [alembic.runtime.migration] Running upgrade 4446e08588 ->
bbc73705a13e, Add notification_sent column to sla_miss
INFO [alembic.runtime.migration] Running upgrade bbc73705a13e ->
bba5a7cfc896, Add a column to track the encryption state of the
'Extra' field in connection
INFO [alembic.runtime.migration] Running upgrade bba5a7cfc896 ->
1968acfc09e3, add is_encrypted column to variable table
INFO [alembic.runtime.migration] Running upgrade 1968acfc09e3 ->
2e82aab8ef20, rename user table
INFO [alembic.runtime.migration] Running upgrade 2e82aab8ef20 ->
211e584da130, add TI state index
INFO [alembic.runtime.migration] Running upgrade 211e584da130 ->
64de9cddf6c9, add task fails journal table
INFO [alembic.runtime.migration] Running upgrade 64de9cddf6c9 ->
f2ca10b85618, add dag_stats table
INFO [alembic.runtime.migration] Running upgrade f2ca10b85618 ->
4addfa1236f1, Add fractional seconds to mysql tables
INFO [alembic.runtime.migration] Running upgrade 4addfa1236f1 ->
8504051e801b, xcom dag task indices
INFO [alembic.runtime.migration] Running upgrade 8504051e801b ->
5e7d17757c7a, add pid field to TaskInstance
INFO [alembic.runtime.migration] Running upgrade 5e7d17757c7a ->
127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 ->
cc1e65623dc7, add max tries column to task instance
INFO [alembic.runtime.migration] Running upgrade cc1e65623dc7 ->
bdaa763e6c56, Make xcom value column a large binary
INFO [alembic.runtime.migration] Running upgrade bdaa763e6c56 ->
947454bf1dff, add ti job_id index
INFO [alembic.runtime.migration] Running upgrade 947454bf1dff ->
d2ae31099d61, Increase text size for MySQL (not relevant for other
DBs' text types)
INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 ->
0e2a74e0fc9f, Add time zone awareness
INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 ->
33ae817a1ff4, kubernetes_resource_checkpointing
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-
packages/sqlalchemy/engine/base.py", line 1236, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 536, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 255, in execute
self.errorhandler(self, exc, value)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
raise errorvalue
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 252, in execute
res = self._query(query)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 378, in _query
db.query(q)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (3812, "An expression of non-boolean type specified to a check constraint 'kube_resource_version_one_row_id'.")
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/shahbaz/.local/bin/airflow", line 32, in <module>
args.func(args)
File "/usr/local/lib/python3.6/dist-packages/airflow/bin/cli.py", line 1096, in initdb
db.initdb(settings.RBAC)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 91, in initdb
upgradedb()
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 358, in upgradedb
command.upgrade(config, 'heads')
File "/usr/local/lib/python3.6/dist-packages/alembic/command.py", line 254, in upgrade
script.run_env()
File "/usr/local/lib/python3.6/dist-packages/alembic/script/base.py", line 427, in run_env
util.load_python_file(self.dir, 'env.py')
File "/usr/local/lib/python3.6/dist-packages/alembic/util/pyfiles.py", line 81, in load_python_file
module = load_module_py(module_id, path)
File "/usr/local/lib/python3.6/dist-packages/alembic/util/compat.py", line 83, in load_module_py
spec.loader.exec_module(module)
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/env.py", line 92, in <module>
run_migrations_online()
File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/env.py", line 86, in run_migrations_online
context.run_migrations()
File "<string>", line 8, in run_migrations
File "/usr/local/lib/python3.6/dist-packages/alembic/runtime/environment.py", line 836, in run_migrations
self.get_context().run_migrations(**kw)
File "/usr/local/lib/python3.6/dist-packages/alembic/runtime/migration.py", line 330, in run_migrations
step.migration_fn(**kw)
File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py", line 55, in upgrade
*columns_and_constraints
File "<string>", line 8, in create_table
File "<string>", line 3, in create_table
File "/usr/local/lib/python3.6/dist-packages/alembic/operations/ops.py", line 1120, in create_table
return operations.invoke(op)
File "/usr/local/lib/python3.6/dist-packages/alembic/operations/base.py", line 319, in invoke
return fn(self, operation)
File "/usr/local/lib/python3.6/dist-packages/alembic/operations/toimpl.py", line 101, in create_table
operations.impl.create_table(table)
File "/usr/local/lib/python3.6/dist-packages/alembic/ddl/impl.py", line 194, in create_table
self._exec(schema.CreateTable(table))
File "/usr/local/lib/python3.6/dist-packages/alembic/ddl/impl.py", line 118, in _exec
return conn.execute(construct, *multiparams, **params)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 980, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/ddl.py", line 72, in _execute_on_connection
return connection._execute_ddl(self, multiparams, params)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1042, in _execute_ddl
compiled,
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception
util.raise_from_cause(sqlalchemy_exception, exc_info)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 276, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1236, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 536, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 255, in execute
self.errorhandler(self, exc, value)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
raise errorvalue
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 252, in execute
res = self._query(query)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 378, in _query
db.query(q)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (3812, "An expression of non-boolean type specified to a check constraint 'kube_resource_version_one_row_id'.") [SQL: '\nCREATE TABLE kube_resource_version (\n\tone_row_id BOOL NOT NULL DEFAULT true, \n\tresource_version VARCHAR(255), \n\tPRIMARY KEY (one_row_id), \n\tCONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id), \n\tCHECK (one_row_id IN (0, 1))\n)\n\n'] (Background on this error at: http://sqlalche.me/e/e3q8)
You can see that the initdb command fails for kubernetes_resource_checkpointing
and the last trace of log states that it was because of the OperationalError in sqlalchemy.
sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError)
(3812, "An expression of non-boolean type specified to a check
constraint 'kube_resource_version_one_row_id'.") [SQL: '\nCREATE TABLE
kube_resource_version (\n\tone_row_id BOOL NOT NULL DEFAULT true,
\n\tresource_version VARCHAR(255), \n\tPRIMARY KEY (one_row_id),
\n\tCONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id),
\n\tCHECK (one_row_id IN (0, 1))\n)\n\n'] (Background on this error
at: http://sqlalche.me/e/e3q8)
I would like to state here that I am able to run the apache-airflow using Postgres database and I am using airflow with Postgres only because its acting weird for MySQL.
I am using
apache-airflow version 1.10.3
mysql version 8.0.16 (MySQL Community Server - GPL)
Also I have tried to set SQL_MODE for MYSQL with 'ANSI' as was stated by airflow docs but it was all in vain.
Any help will be appreciated
[EDIT]
Thanks 'skadya' for pointing out the issue link Let me share what I found I checked out the code files pointed out by 'Shi Chen' two files are responsible for this behaviour.
33ae817a1ff4_add_kubernetes_resource_checkpointing.py
86770d1215c0_add_kubernetes_scheduler_uniqueness.py
Both the files are migration files using alembic and sqlalchemy libraries I found that following sqlalchemy code written in file 33ae817a1ff4_add_kubernetes_resource_checkpointing.py
def upgrade():
columns_and_constraints = [
sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True),
sa.Column("resource_version", sa.String(255))
]
conn = op.get_bind()
# alembic creates an invalid SQL for mssql dialect
if conn.dialect.name not in ('mssql'):
columns_and_constraints.append(sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id"))
table = op.create_table(
RESOURCE_TABLE,
*columns_and_constraints
)
op.bulk_insert(table, [
{"resource_version": ""}
])
is interpreted into following SQL query which is not correct
CREATE TABLE
kube_resource_version (one_row_id BOOL NOT NULL DEFAULT true,
resource_version VARCHAR(255), PRIMARY KEY (one_row_id),
CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id),
CHECK (one_row_id IN (0, 1))
Instead the SQL query should be some what like this
CREATE TABLE
kube_resource_version (one_row_id BOOL NOT NULL DEFAULT true,
resource_version VARCHAR(255), PRIMARY KEY (one_row_id),
CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id IN (0,
1)))
The link provided by 'skadya' was helpful I got the system to work after making the changes in the code of the two above mentioned files.
you simple need to change the following code from
if conn.dialect.name not in ('mssql'):
columns_and_constraints.append(
sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id")
)
to
if conn.dialect.name not in ('mssql', 'mysql'):
columns_and_constraints.append(
sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id")
)
There is an open bug in airflow bug tracker.
https://issues.apache.org/jira/browse/AIRFLOW-4995.
As a workaround, you may apply the proposed changes in pull request manually.
Update: This bug is fixed in airflow version: 1.10.4
I run into exactly same issue. Someone knows what to do?
BTW, I run into another issue, complaining that dag_stats table already exist while resetting db. I had to manually drop dag_stats to make resetting go through that step. But still blocked on this constraint.
CREATE TABLE kube_resource_version (
one_row_id BOOL NOT NULL DEFAULT true,
resource_version VARCHAR(255),
PRIMARY KEY (one_row_id),
CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id),
CHECK (one_row_id IN (0, 1))
)
you simple need to change the following code in these files
33ae817a1ff4_add_kubernetes_resource_checkpointing.py 86770d1215c0_add_kubernetes_scheduler_uniqueness.py
from
if conn.dialect.name not in ('mssql'):
columns_and_constraints.append(
sa.CheckConstraint("one_row_id",
name="kube_resource_version_one_row_id")
)
to
if conn.dialect.name not in ('mssql', 'mysql'):
columns_and_constraints.append(
sa.CheckConstraint("one_row_id",
name="kube_resource_version_one_row_id")
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With