I need to
1. run a select query on MYSQL DB and fetch the records.
2. Records are processed by python script.
I am unsure about the way I should proceed. Is xcom the way to go here? Also, MYSQLOperator only executes the query, doesn't fetch the records. Is there any inbuilt transfer operator I can use? How can I use a MYSQL hook here?
you may want to use a PythonOperator that uses the hook to get the data, apply transformation and ship the (now scored) rows back some other place.
Can someone explain how to proceed regarding the same.
Refer - http://markmail.org/message/x6nfeo6zhjfeakfe
def do_work():
mysqlserver = MySqlHook(connection_id)
sql = "SELECT * from table where col > 100 "
row_count = mysqlserver.get_records(sql, schema='testdb')
print row_count[0][0]
callMYSQLHook = PythonOperator(
task_id='fetch_from_testdb',
python_callable=mysqlHook,
dag=dag
)
Is this the correct way to proceed? Also how do we use xcoms to store the records for the following MySqlOperator?'
t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)
Configuring the ConnectionSpecify the user name to connect. Specify the password to connect. Specify the extra parameters (as json dictionary) that can be used in MySQL connection. Note that you can choose the client to connect to the database by setting the client extra field.
Airflow SQL Server Integration allows users to automatically load query results from one Microsoft SQL Server to another Server. Airflow SQL Server Integration makes it easier for companies to automate and orchestrate the Data Pipeline workflows.
I was really struggling with this for the past 90 minutes, here is a more declarative way to follow for newcomers:
from airflow.hooks.mysql_hook import MySqlHook def fetch_records(): request = "SELECT * FROM your_table" mysql_hook = MySqlHook(mysql_conn_id = 'the_connection_name_sourced_from_the_ui', schema = 'specific_db') connection = mysql_hook.get_conn() cursor = connection.cursor() cursor.execute(request) sources = cursor.fetchall() print(sources) ...your DAG() as dag: code task = PythonOperator( task_id = 'fetch_records', python_callable = fetch_records )
This returns to the logs the contents of your DB query.
I hope this is of use to someone else.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With