Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I log sql execution results in airflow?

I use airflow python operators to execute sql queries against a redshift/postgres database. In order to debug, I'd like the DAG to return the results of the sql execution, similar to what you would see if executing locally in a console:

I'm using psycop2 to create a connection/cursor and execute the sql. Having this logged would be extremely helpful to confirm the parsed parameterized sql, and confirm that data was actually inserted (I have painfully experiences issues where differences in environments caused unexpected behavior)

I do not have deep knowledge of airflow or the low level workings of the python DBAPI, but the pscyopg2 documentation does seem to refer to some methods and connection configurations that may allow this.

I find it very perplexing that this is difficult to do, as I'd imagine it would be a primary use case of running ETLs on this platform. I've heard suggestions to simply create additional tasks that query the table before and after, but this seems clunky and ineffective.

Could anyone please explain how this may be possible, and if not, explain why? Alternate methods of achieving similar results welcome. Thanks!

So far I have tried the connection.status_message() method, but it only seems to return the first line of the sql and not the results. I have also attempted to create a logging cursor, which produces the sql, but not the console results

import logging
import psycopg2 as pg
from psycopg2.extras import LoggingConnection

conn = pg.connect(
    connection_factory=LoggingConnection,
    ...
)
conn.autocommit = True

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler(sys.stdout))
conn.initialize(logger)

cur = conn.cursor()

sql = """    
    INSERT INTO mytable (
    SELECT *
    FROM other_table
    );
"""

cur.execute(sql)

I'd like the logger to return something like:

sql> INSERT INTO mytable (
     SELECT ...
[2019-07-25 23:00:54] 912 rows affected in 4 s 442 ms
like image 221
Jonathan Avatar asked Oct 28 '22 11:10

Jonathan


2 Answers

Let's assume you are writing an operator that uses postgres hook to do something in sql.

Anything printed inside an operator is logged.

So, if you want to log the statement, just print the statement in your operator.

print(sql)

If you want to log the result, fetch the result and print the result. E.g.

result = cur.fetchall()
for row in result:
    print(row)

Alternatively you can use self.log.info in place of print, where self refers to the operator instance.

like image 128
dstandish Avatar answered Nov 02 '22 00:11

dstandish


Ok, so after some trial and error I've found a method that works for my setup and objective. To recap, my goal is to run ETL's via python scripts, orchestrated in Airflow. Referring to the documentation for statusmessage:

Read-only attribute containing the message returned by the last command:

The key is to manage logging in context with transactions executed on the server. In order for me to do this, I had to specifically set con.autocommit = False, and wrap SQL blocks with BEGIN TRANSACTION; and END TRANSACTION;. If you insert cur.statusmessage directly following a statement that deletes or inserts, you will get a response such as 'INSERT 0 92380'.

This still isn't as verbose as I would prefer, but it is a much better than nothing, and is very useful for troubleshooting ETL issues within Airflow logs.

Side notes: - When autocommit is set to False, you must explicitly commit transactions. - It may not be necessary to state transaction begin/end in your SQL. It may depend on your DB version.

con = psy.connect(...)
con.autocommit = False
cur = con.cursor()

try:
    cur.execute([some_sql])
    logging.info(f"Cursor statusmessage: {cur.statusmessage})
except:
    con.rollback()
finally:
    con.close()

There is some buried functionality within psycopg2 that I'm sure can be utilized, but the documentation is pretty thin and there are no clear examples. If anyone has suggestions on how to utilize things such as logobjects, or returning join PID to somehow retrieve additional information.

like image 43
Jonathan Avatar answered Nov 02 '22 02:11

Jonathan