Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

apache airflow initdb fails at kubernetes_resource_checkingpoint for mysql

I want to use MySQL as the backend database for apache airflow after installing dependencies when I run

airflow initdb

Airflow starts setting up the database, but then it fails with the following log

shahbaz@OpenSource:~$ airflow initdb
[2019-07-11 12:01:13,726] {settings.py:182} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, 
pool_recycle=1800, pid=17492
[2019-07-11 12:01:13,917] {__init__.py:51} INFO - Using executor 
LocalExecutor
DB: mysql+mysqldb://airflow:***@localhost:3306/airflow
[2019-07-11 12:01:14,276] {db.py:350} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl MySQLImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, 
current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 
1507a7289a2f, create is_encrypted
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 
13eb55f81627, maintain history for compatibility with earlier 
migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 
338e90f54d61, More logging into task_instance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 
52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 
502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 
1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 
2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 
40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 
561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 
4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> 
bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> 
bba5a7cfc896, Add a column to track the encryption state of the 
'Extra' field in connection
INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 
1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 
2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 
211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 
64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> 
f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 
4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 
8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 
5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 
127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> 
cc1e65623dc7, add max tries column to task instance
INFO  [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> 
bdaa763e6c56, Make xcom value column a large binary
INFO  [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 
947454bf1dff, add ti job_id index
INFO  [alembic.runtime.migration] Running upgrade 947454bf1dff -> 
d2ae31099d61, Increase text size for MySQL (not relevant for other 
DBs' text types)
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 
0e2a74e0fc9f, Add time zone awareness
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 
33ae817a1ff4, kubernetes_resource_checkpointing
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist- 
packages/sqlalchemy/engine/base.py", line 1236, in _execute_context
cursor, statement, parameters, context


File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 536, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (3812, "An expression of non-boolean type specified to a check constraint 'kube_resource_version_one_row_id'.")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/shahbaz/.local/bin/airflow", line 32, in <module>
    args.func(args)
  File "/usr/local/lib/python3.6/dist-packages/airflow/bin/cli.py", line 1096, in initdb
    db.initdb(settings.RBAC)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 91, in initdb
    upgradedb()
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 358, in upgradedb
    command.upgrade(config, 'heads')
  File "/usr/local/lib/python3.6/dist-packages/alembic/command.py", line 254, in upgrade
    script.run_env()
  File "/usr/local/lib/python3.6/dist-packages/alembic/script/base.py", line 427, in run_env
    util.load_python_file(self.dir, 'env.py')
  File "/usr/local/lib/python3.6/dist-packages/alembic/util/pyfiles.py", line 81, in load_python_file
    module = load_module_py(module_id, path)
  File "/usr/local/lib/python3.6/dist-packages/alembic/util/compat.py", line 83, in load_module_py
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/env.py", line 92, in <module>
    run_migrations_online()
  File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/env.py", line 86, in run_migrations_online
    context.run_migrations()
  File "<string>", line 8, in run_migrations
  File "/usr/local/lib/python3.6/dist-packages/alembic/runtime/environment.py", line 836, in run_migrations
    self.get_context().run_migrations(**kw)
  File "/usr/local/lib/python3.6/dist-packages/alembic/runtime/migration.py", line 330, in run_migrations
    step.migration_fn(**kw)
  File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py", line 55, in upgrade
    *columns_and_constraints
  File "<string>", line 8, in create_table
  File "<string>", line 3, in create_table
  File "/usr/local/lib/python3.6/dist-packages/alembic/operations/ops.py", line 1120, in create_table
    return operations.invoke(op)
  File "/usr/local/lib/python3.6/dist-packages/alembic/operations/base.py", line 319, in invoke
    return fn(self, operation)
  File "/usr/local/lib/python3.6/dist-packages/alembic/operations/toimpl.py", line 101, in create_table
    operations.impl.create_table(table)
  File "/usr/local/lib/python3.6/dist-packages/alembic/ddl/impl.py", line 194, in create_table
    self._exec(schema.CreateTable(table))
  File "/usr/local/lib/python3.6/dist-packages/alembic/ddl/impl.py", line 118, in _exec
    return conn.execute(construct, *multiparams, **params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 980, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/ddl.py", line 72, in _execute_on_connection
    return connection._execute_ddl(self, multiparams, params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1042, in _execute_ddl
    compiled,
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 276, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1236, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 536, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (3812, "An expression of non-boolean type specified to a check constraint 'kube_resource_version_one_row_id'.") [SQL: '\nCREATE TABLE kube_resource_version (\n\tone_row_id BOOL NOT NULL DEFAULT true, \n\tresource_version VARCHAR(255), \n\tPRIMARY KEY (one_row_id), \n\tCONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id), \n\tCHECK (one_row_id IN (0, 1))\n)\n\n'] (Background on this error at: http://sqlalche.me/e/e3q8)

You can see that the initdb command fails for kubernetes_resource_checkpointing

and the last trace of log states that it was because of the OperationalError in sqlalchemy.

sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) 
(3812, "An expression of non-boolean type specified to a check 
constraint 'kube_resource_version_one_row_id'.") [SQL: '\nCREATE TABLE 
kube_resource_version (\n\tone_row_id BOOL NOT NULL DEFAULT true, 
\n\tresource_version VARCHAR(255), \n\tPRIMARY KEY (one_row_id), 
\n\tCONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id), 
\n\tCHECK (one_row_id IN (0, 1))\n)\n\n'] (Background on this error 
at: http://sqlalche.me/e/e3q8)

I would like to state here that I am able to run the apache-airflow using Postgres database and I am using airflow with Postgres only because its acting weird for MySQL.

I am using

apache-airflow version 1.10.3

mysql version 8.0.16 (MySQL Community Server - GPL)

Also I have tried to set SQL_MODE for MYSQL with 'ANSI' as was stated by airflow docs but it was all in vain.

Any help will be appreciated

[EDIT]

Thanks 'skadya' for pointing out the issue link Let me share what I found I checked out the code files pointed out by 'Shi Chen' two files are responsible for this behaviour.

33ae817a1ff4_add_kubernetes_resource_checkpointing.py
86770d1215c0_add_kubernetes_scheduler_uniqueness.py

Both the files are migration files using alembic and sqlalchemy libraries I found that following sqlalchemy code written in file 33ae817a1ff4_add_kubernetes_resource_checkpointing.py

def upgrade():

    columns_and_constraints = [
        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True),
        sa.Column("resource_version", sa.String(255))
    ]

    conn = op.get_bind()

    # alembic creates an invalid SQL for mssql dialect
    if conn.dialect.name not in ('mssql'):
        columns_and_constraints.append(sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id"))

    table = op.create_table(
        RESOURCE_TABLE,
        *columns_and_constraints
    )

    op.bulk_insert(table, [
        {"resource_version": ""}
    ])

is interpreted into following SQL query which is not correct

CREATE TABLE 
kube_resource_version (one_row_id BOOL NOT NULL DEFAULT true, 
resource_version VARCHAR(255), PRIMARY KEY (one_row_id), 
CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id), 
CHECK (one_row_id IN (0, 1))

Instead the SQL query should be some what like this

CREATE TABLE 
kube_resource_version (one_row_id BOOL NOT NULL DEFAULT true, 
resource_version VARCHAR(255), PRIMARY KEY (one_row_id), 
CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id IN (0, 
1)))

The link provided by 'skadya' was helpful I got the system to work after making the changes in the code of the two above mentioned files.

you simple need to change the following code from

if conn.dialect.name not in ('mssql'):
        columns_and_constraints.append(
            sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id")
        )

to

if conn.dialect.name not in ('mssql', 'mysql'):
    columns_and_constraints.append(
        sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id")
    )
like image 787
Shahbaz Ali Avatar asked Jul 11 '19 11:07

Shahbaz Ali


3 Answers

There is an open bug in airflow bug tracker.

https://issues.apache.org/jira/browse/AIRFLOW-4995.

As a workaround, you may apply the proposed changes in pull request manually.

Update: This bug is fixed in airflow version: 1.10.4

like image 57
skadya Avatar answered Oct 24 '22 02:10

skadya


I run into exactly same issue. Someone knows what to do?

BTW, I run into another issue, complaining that dag_stats table already exist while resetting db. I had to manually drop dag_stats to make resetting go through that step. But still blocked on this constraint.

CREATE TABLE kube_resource_version (
    one_row_id BOOL NOT NULL DEFAULT true,
    resource_version VARCHAR(255),
    PRIMARY KEY (one_row_id),
    CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id),
    CHECK (one_row_id IN (0, 1))
)
like image 3
Shi Chen Avatar answered Oct 24 '22 02:10

Shi Chen


you simple need to change the following code in these files

33ae817a1ff4_add_kubernetes_resource_checkpointing.py 86770d1215c0_add_kubernetes_scheduler_uniqueness.py

from

if conn.dialect.name not in ('mssql'):
        columns_and_constraints.append(
            sa.CheckConstraint("one_row_id", 
name="kube_resource_version_one_row_id")
        )

to

if conn.dialect.name not in ('mssql', 'mysql'):
    columns_and_constraints.append(
        sa.CheckConstraint("one_row_id", 
name="kube_resource_version_one_row_id")
    )
like image 1
user11876714 Avatar answered Oct 24 '22 02:10

user11876714