Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PostgresOperator in Airflow getting timeout

I made a function in Postgres that have the following statement:

FUNCTION
        SET statement_timeout TO "3600s"
     SELECT * FROM schema.table_name
        END 
FUNCTION

In Airflow I use the PostgresOperator to execute this function, but I receive the message [2018-06-01 00:00:01,066] {models.py:1595} ERROR - canceling statement due to statement timeout.

I saw that PostgresOperator uses the postgres_hook, and postgres_hook uses the psycopg2 as connector.

As I see, I can be a timeout by a cli application instead a timeout from the database.

I would like to know how to solve this thing? Do I need to configure the Psycopg in Airflow or can I use some environmental variables to set the timeout to avoid this problem?

like image 589
Flavio Avatar asked Apr 19 '26 22:04

Flavio


1 Answers

You can pass in connection arguments into psycopg2 library through the Airflow extras property on connection. At the time of writing the postgres_hook supports the following arguments

['sslmode', 'sslcert', 'sslkey','sslrootcert', 'sslcrl', 'application_name', 'keepalives_idle']

In order to pass in the statement_timeout argument to the PostgresHook you will need to override the get_conn of the PostgresHook to accept your desired argument.

Ex. Class Method Override

class NewPostgresHook(PostgresHook):
    def __init__(self, *args, **kwargs):
        super(NewPostgresHook, self).__init__(*args, **kwargs)

    def get_conn(self):
        conn = self.get_connection(self.postgres_conn_id)
        conn_args = dict(
            host=conn.host,
            user=conn.login,
            password=conn.password,
            dbname=self.schema or conn.schema,
            port=conn.port)
        # check for ssl parameters in conn.extra
        for arg_name, arg_val in conn.extra_dejson.items():
            if arg_name in ['sslmode', 'sslcert', 'sslkey',
                            'sslrootcert', 'sslcrl', 'application_name',
                            'keepalives_idle', 'statement_timeout']:
                conn_args[arg_name] = arg_val

        self.conn = psycopg2.connect(**conn_args)
        return self.conn

You can then specify this argument on the connection extras field in the form of a JSON string.

Ex. JSON String in Connection Extras Field

{'statement_timeout': '3600s'}

like image 70
andscoop Avatar answered Apr 21 '26 10:04

andscoop



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!