Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow SimpleHttpOperator for HTTPS

Tags:

https

airflow

I'm trying to use SimpleHttpOperator for consuming a RESTful API. But, As the name suggests, it only supporting HTTP protocol where I need to consume a HTTPS URI. so, now, I have to use either "requests" object from Python or handle the invocation from within the application code. But, It may not be a standard way. so, I'm looking for any other options available to consume HTTPS URI from within Airflow. Thanks.

like image 575
Kris Avatar asked Aug 01 '18 09:08

Kris


People also ask

How do you use Simplehttpoperator in airflow?

On the Airflow UI, go to the Admin section and click on Connections. On the "Add Connection" screen, configure the details such as Conn Id (where the name “weather_api” is configured), Conn Type (which is HTTP), and the Host details. Click on Save, and that's it: http_conn_id is configured!

What is Http_conn_id in airflow?

http_conn_id (str) – The http connection to run the operator against. endpoint (Optional[str]) – The relative part of the full url. (

How do I call REST API from airflow?

The first step to calling the Airflow REST API on Astronomer is to create a Deployment-level Service Account, which will assume a user role and set of permissions and output an API key that you can use to authenticate with your request. You can use the Software UI or the Astro CLI to create a Service account.

What is DummyOperator in airflow?

class airflow.operators.dummy_operator. DummyOperator (*args, **kwargs)[source] Operator that does literally nothing. It can be used to group tasks in a DAG. The task is evaluated by the scheduler but never processed by the executor.


4 Answers

I dove into this and am pretty sure that this behavior is a bug in airflow. I have created a ticket for it here: https://issues.apache.org/jira/browse/AIRFLOW-2910

For now, the best you can do is override SimpleHttpOperator as well as HttpHook in order to change the way that HttpHook.get_conn works (to accept https). I may end up doing this, and if I do I'll post some code.

Update:

Operator override:

from airflow.operators.http_operator import SimpleHttpOperator
from airflow.exceptions import AirflowException
from operators.https_support.https_hook import HttpsHook


class HttpsOperator(SimpleHttpOperator):
    def execute(self, context):
        http = HttpsHook(self.method, http_conn_id=self.http_conn_id)

        self.log.info("Calling HTTP method")

        response = http.run(self.endpoint,
                            self.data,
                            self.headers,
                            self.extra_options)
        if self.response_check:
            if not self.response_check(response):
                raise AirflowException("Response check returned False.")
        if self.xcom_push_flag:
            return response.text

Hook override

from airflow.hooks.http_hook import HttpHook
import requests


class HttpsHook(HttpHook):
    def get_conn(self, headers):
        """
        Returns http session for use with requests. Supports https.
        """
        conn = self.get_connection(self.http_conn_id)
        session = requests.Session()

        if "://" in conn.host:
            self.base_url = conn.host
        elif conn.schema:
            self.base_url = conn.schema + "://" + conn.host
        elif conn.conn_type:  # https support
            self.base_url = conn.conn_type + "://" + conn.host
        else:
            # schema defaults to HTTP
            self.base_url = "http://" + conn.host

        if conn.port:
            self.base_url = self.base_url + ":" + str(conn.port) + "/"
        if conn.login:
            session.auth = (conn.login, conn.password)
        if headers:
            session.headers.update(headers)

        return session

Usage:

Drop-in replacement for SimpleHttpOperator.

like image 198
melchoir55 Avatar answered Oct 14 '22 22:10

melchoir55


This is a couple of months old now, but for what it is worth I did not have any issue with making an HTTPS call on Airflow 1.10.2.

In my initial test I was making a request for templates from sendgrid, so the connection was set up like this:

Conn Id   : sendgrid_templates_test
Conn Type : HTTP   
Host      :   https://api.sendgrid.com/
Extra     : { "authorization": "Bearer [my token]"}

and then in the dag code:

get_templates = SimpleHttpOperator(
        task_id='get_templates',
        method='GET',
        endpoint='/v3/templates',
        http_conn_id = 'sendgrid_templates_test',
        trigger_rule="all_done",
        xcom_push=True
        dag=dag,
    )

and that worked. Also notice that my request happens after a Branch Operator, so I needed to set the trigger rule appropriately (to "all_done" to make sure it fires even when one of the branches is skipped), which has nothing to do with the question, but I just wanted to point it out.

Now to be clear, I did get an Insecure Request warning as I did not have certificate verification enabled. But you can see the resulting logs below

[2019-02-21 16:15:01,333] {http_operator.py:89} INFO - Calling HTTP method
[2019-02-21 16:15:01,336] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,335] {base_hook.py:83} INFO - Using connection to: id: sendgrid_templates_test. Host:  https://api.sendgrid.com/, Port: None, Schema: None, Login: None, Password: XXXXXXXX, extra: {'authorization': 'Bearer [my token]'}
[2019-02-21 16:15:01,338] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,337] {http_hook.py:126} INFO - Sending 'GET' to url:  https://api.sendgrid.com//v3/templates
[2019-02-21 16:15:01,956] {logging_mixin.py:95} WARNING - /home/csconnell/.pyenv/versions/airflow/lib/python3.6/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
  InsecureRequestWarning)
[2019-02-21 16:15:05,242] {logging_mixin.py:95} INFO - [2019-02-21 16:15:05,241] {jobs.py:2527} INFO - Task exited with return code 0
like image 32
CS Connell Avatar answered Oct 14 '22 22:10

CS Connell


I was having the same problem with HTTP/HTTPS when trying to set the connections using environment variables (although it works when i set the connection on the UI).

I've checked the issue @melchoir55 opened (https://issues.apache.org/jira/browse/AIRFLOW-2910) and you don't need to make a custom operator for that, the problem is not that HttpHook or HttpOperator can't use HTTPS, the problem is the way get_hook parse the connection string when dealing with HTTP, it actually understand that the first part (http:// or https://) is the connection type.

In summary, you don't need a custom operator, you can just set the connection in your env as the following:

AIRFLOW_CONN_HTTP_EXAMPLE=http://https%3a%2f%2fexample.com/

Instead of:

AIRFLOW_CONN_HTTP_EXAMPLE=https://example.com/

Or set the connection on the UI.

It is not a intuitive way to set up a connection but I think they are working on a better way to parse connections for Ariflow 2.0.

like image 37
Renato Romão Avatar answered Oct 14 '22 22:10

Renato Romão


In Airflow 2.x you can use https URLs by passing https for schema value while setting up your connection and can still use SimpleHttpOperator like shown below.

    my_api = SimpleHttpOperator(
        task_id="my_api",
        http_conn_id="YOUR_CONN_ID",
        method="POST",
        endpoint="/base-path/end-point",
        data=get_data,
        headers={"Content-Type": "application/json"},
    )

enter image description here

like image 20
Dheemanth Bhat Avatar answered Oct 14 '22 21:10

Dheemanth Bhat