How to work with Django models inside Airflow tasks?
According to official Airflow documentation, Airflow provides hooks for interaction with databases (like MySqlHook / PostgresHook / etc) that can be later used in Operators for row query execution. Attaching the core code fragments:
class MySqlHook(DbApiHook):
conn_name_attr = 'mysql_conn_id'
default_conn_name = 'mysql_default'
supports_autocommit = True
def get_conn(self):
"""
Returns a mysql connection object
"""
conn = self.get_connection(self.mysql_conn_id)
conn_config = {
"user": conn.login,
"passwd": conn.password or ''
}
conn_config["host"] = conn.host or 'localhost'
conn_config["db"] = conn.schema or ''
conn = MySQLdb.connect(**conn_config)
return conn
class MySqlOperator(BaseOperator):
@apply_defaults
def __init__(
self, sql, mysql_conn_id='mysql_default', parameters=None,
autocommit=False, *args, **kwargs):
super(MySqlOperator, self).__init__(*args, **kwargs)
self.mysql_conn_id = mysql_conn_id
self.sql = sql
self.autocommit = autocommit
self.parameters = parameters
def execute(self, context):
logging.info('Executing: ' + str(self.sql))
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
hook.run(
self.sql,
autocommit=self.autocommit,
parameters=self.parameters)
As we can see Hook incapsulates the connection configuration while Operator provides ability to execute custom queries.
It's very convenient to use different ORM for fetching and processing database objects instead of raw SQL for the following reasons:
For some reason, there are no examples of working with ORM in Airflow tasks in terms of hooks and operators. According to Using Django database layer outside of Django? question, it's needed to set up a connection configuration to the database, and then straight-forwardly execute queires in ORM, but doing that outside appropriate hooks / operators breaks Airflow principles. It's like calling BashOperator with "python work_with_django_models.py" command.
So what are the best practisies in this case? Do we share any hooks / operators for Django ORM / other ORMs? In order to have the following code real (treat as pseudo-code!):
import os
import django
os.environ.setdefault(
"DJANGO_SETTINGS_MODULE",
"myapp.settings"
)
django.setup()
from your_app import models
def get_and_modify_models(ds, **kwargs):
all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
all_objects[15].my_int_field = 25
all_objects[15].save()
return list(all_objects)
django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')
instead of implementing this functionality in raw SQL.
I think it's pretty important topic, as the whole banch of ORM-based frameworks and processes are not able to dive into Airflow in this case.
Thanks in advance!
Django doesn't have a separate package for it's ORM, but it's still possible to use it on its own.
One of the most powerful features of Django is its Object-Relational Mapper (ORM), which enables you to interact with your database, like you would with SQL. In fact, Django's ORM is just a pythonical way to create SQL to query and manipulate your database and get results in a pythonic fashion.
The Django ORM is a very powerful tool, and one of the great attractions of Django. It makes writing simple queries trivial, and does a great job of abstracting away the database layer in your application. And sometimes, you shouldn't use it.
The main postulate that characterizes ORM is that it encapsulates database interaction inside an object. One part of the object keeps the data and the second one knows how to deal with the data and transfers it to the relational database (this functionality is implemented inside the ORM engine).
I agree we should continue to have this discussion as having access Django ORM can significantly reduce complexity of solutions.
My approach has been to 1) create a DjangoOperator
import os, sys
from airflow.models import BaseOperator
def setup_django_for_airflow():
# Add Django project root to path
sys.path.append('./project_root/')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
import django
django.setup()
class DjangoOperator(BaseOperator):
def pre_execute(self, *args, **kwargs):
setup_django_for_airflow()
and 2) Extend that DjangoOperator for logic / operators what would benefit from having access to ORM
from .base import DjangoOperator
class DjangoExampleOperator(DjangoOperator):
def execute(self, context):
from myApp.models import model
model.objects.get_or_create()
With this strategy, you can then distinguish between operators that use Raw SQL / ORM. Also note, that for the Django operator, all django model imports need to be within the execution context, demonstrated above.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With