We are using Apache 1.9.0. I have written a snowflake hook plugin. I have placed the hook in the $AIRFLOW_HOME/plugins directory.
$AIRFLOW_HOME
+--plugins
+--snowflake_hook2.py
snowflake_hook2.py
# This is the base class for a plugin
from airflow.plugins_manager import AirflowPlugin
# This is necessary to expose the plugin in the Web interface
from flask import Blueprint
from flask_admin import BaseView, expose
from flask_admin.base import MenuLink
# This is the base hook for connecting to a database
from airflow.hooks.dbapi_hook import DbApiHook
# This is the snowflake provided Connector
import snowflake.connector
# This is the default python logging package
import logging
class SnowflakeHook2(DbApiHook):
"""
Airflow Hook to communicate with Snowflake
This is implemented as a Plugin
"""
def __init__(self, connname_in='snowflake_default', db_in='default', wh_in='default', schema_in='default'):
logging.info('# Connecting to {0}'.format(connname_in))
self.conn_name_attr = 'snowflake_conn_id'
self.connname = connname_in
self.superconn = super().get_connection(self.connname) #gets the values from Airflow
{SNIP - Connection stuff that works}
self.cur = self.conn.cursor()
def query(self,q,params=None):
"""From jmoney's db_wrapper allows return of a full list of rows(tuples)"""
if params == None: #no Params, so no insertion
self.cur.execute(q)
else: #make the parameter substitution
self.cur.execute(q,params)
self.results = self.cur.fetchall()
self.rowcount = self.cur.rowcount
self.columnnames = [colspec[0] for colspec in self.cur.description]
return self.results
{SNIP - Other class functions}
class SnowflakePluginClass(AirflowPlugin):
name = "SnowflakePluginModule"
hooks = [SnowflakeHook2]
operators = []
So I went ahead and put some print statements in Airflows plugin_manager to try and get a better handle on what is happening. After restarting the webserver and running airflow list_dags, these lines were showing the "new module name" (and no errors
SnowflakePluginModule [<class '__home__ubuntu__airflow__plugins_snowflake_hook2.SnowflakeHook2'>]
hook_module - airflow.hooks.snowflakepluginmodule
INTEGRATING airflow.hooks.snowflakepluginmodule
snowflakepluginmodule <module 'airflow.hooks.snowflakepluginmodule'>
As this is consistent with what the documentation says, I should be fine using this in my DAG:
from airflow import DAG
from airflow.hooks.snowflakepluginmodule import SnowflakeHook2
from airflow.operators.python_operator import PythonOperator
But the web throws this error
Broken DAG: [/home/ubuntu/airflow/dags/test_sf2.py] No module named 'airflow.hooks.snowflakepluginmodule'
So the question is, What am I doing wrong? Or have I uncovered a bug?
Airflow has a simple plugin manager built-in that can integrate external features to its core by simply dropping files in your $AIRFLOW_HOME/plugins folder. The python modules in the plugins folder get imported, and macros and web views get integrated to Airflow's main collections and become available for use.
You need to import as below:
from airflow import DAG
from airflow.hooks import SnowflakeHook2
from airflow.operators.python_operator import PythonOperator
OR
from airflow import DAG
from airflow.hooks.SnowflakePluginModule import SnowflakeHook2
from airflow.operators.python_operator import PythonOperator
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With