Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to obtain and process mysql records using Airflow?

I need to

1. run a select query on MYSQL DB and fetch the records.              
2. Records are processed by python script.

I am unsure about the way I should proceed. Is xcom the way to go here? Also, MYSQLOperator only executes the query, doesn't fetch the records. Is there any inbuilt transfer operator I can use? How can I use a MYSQL hook here?

you may want to use a PythonOperator that uses the hook to get the data, apply transformation and ship the (now scored) rows back some other place.

Can someone explain how to proceed regarding the same.

Refer - http://markmail.org/message/x6nfeo6zhjfeakfe

def do_work():
    mysqlserver = MySqlHook(connection_id)
    sql = "SELECT * from table where col > 100 "
    row_count = mysqlserver.get_records(sql, schema='testdb')
    print row_count[0][0]

callMYSQLHook = PythonOperator(
    task_id='fetch_from_testdb',
    python_callable=mysqlHook,
    dag=dag
)

Is this the correct way to proceed? Also how do we use xcoms to store the records for the following MySqlOperator?'

t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)
like image 272
gpk27 Avatar asked Sep 22 '17 07:09

gpk27


People also ask

How do I connect to an Airflow in MySQL?

Configuring the ConnectionSpecify the user name to connect. Specify the password to connect. Specify the extra parameters (as json dictionary) that can be used in MySQL connection. Note that you can choose the client to connect to the database by setting the client extra field.

Does Airflow work with SQL?

Airflow SQL Server Integration allows users to automatically load query results from one Microsoft SQL Server to another Server. Airflow SQL Server Integration makes it easier for companies to automate and orchestrate the Data Pipeline workflows.


1 Answers

I was really struggling with this for the past 90 minutes, here is a more declarative way to follow for newcomers:

from airflow.hooks.mysql_hook import MySqlHook  def fetch_records():   request = "SELECT * FROM your_table"   mysql_hook = MySqlHook(mysql_conn_id = 'the_connection_name_sourced_from_the_ui', schema = 'specific_db')   connection = mysql_hook.get_conn()   cursor = connection.cursor()   cursor.execute(request)   sources = cursor.fetchall()   print(sources)  ...your DAG() as dag: code  task = PythonOperator(   task_id = 'fetch_records',   python_callable = fetch_records ) 

This returns to the logs the contents of your DB query.

I hope this is of use to someone else.

like image 174
dimButTries Avatar answered Sep 25 '22 04:09

dimButTries