I'm trying to use SimpleHttpOperator for consuming a RESTful API. But, As the name suggests, it only supporting HTTP protocol where I need to consume a HTTPS URI. so, now, I have to use either "requests" object from Python or handle the invocation from within the application code. But, It may not be a standard way. so, I'm looking for any other options available to consume HTTPS URI from within Airflow. Thanks.
On the Airflow UI, go to the Admin section and click on Connections. On the "Add Connection" screen, configure the details such as Conn Id (where the name “weather_api” is configured), Conn Type (which is HTTP), and the Host details. Click on Save, and that's it: http_conn_id is configured!
http_conn_id (str) – The http connection to run the operator against. endpoint (Optional[str]) – The relative part of the full url. (
The first step to calling the Airflow REST API on Astronomer is to create a Deployment-level Service Account, which will assume a user role and set of permissions and output an API key that you can use to authenticate with your request. You can use the Software UI or the Astro CLI to create a Service account.
class airflow.operators.dummy_operator. DummyOperator (*args, **kwargs)[source] Operator that does literally nothing. It can be used to group tasks in a DAG. The task is evaluated by the scheduler but never processed by the executor.
I dove into this and am pretty sure that this behavior is a bug in airflow. I have created a ticket for it here: https://issues.apache.org/jira/browse/AIRFLOW-2910
For now, the best you can do is override SimpleHttpOperator as well as HttpHook in order to change the way that HttpHook.get_conn works (to accept https). I may end up doing this, and if I do I'll post some code.
Update:
Operator override:
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.exceptions import AirflowException
from operators.https_support.https_hook import HttpsHook
class HttpsOperator(SimpleHttpOperator):
def execute(self, context):
http = HttpsHook(self.method, http_conn_id=self.http_conn_id)
self.log.info("Calling HTTP method")
response = http.run(self.endpoint,
self.data,
self.headers,
self.extra_options)
if self.response_check:
if not self.response_check(response):
raise AirflowException("Response check returned False.")
if self.xcom_push_flag:
return response.text
Hook override
from airflow.hooks.http_hook import HttpHook
import requests
class HttpsHook(HttpHook):
def get_conn(self, headers):
"""
Returns http session for use with requests. Supports https.
"""
conn = self.get_connection(self.http_conn_id)
session = requests.Session()
if "://" in conn.host:
self.base_url = conn.host
elif conn.schema:
self.base_url = conn.schema + "://" + conn.host
elif conn.conn_type: # https support
self.base_url = conn.conn_type + "://" + conn.host
else:
# schema defaults to HTTP
self.base_url = "http://" + conn.host
if conn.port:
self.base_url = self.base_url + ":" + str(conn.port) + "/"
if conn.login:
session.auth = (conn.login, conn.password)
if headers:
session.headers.update(headers)
return session
Usage:
Drop-in replacement for SimpleHttpOperator.
This is a couple of months old now, but for what it is worth I did not have any issue with making an HTTPS call on Airflow 1.10.2.
In my initial test I was making a request for templates from sendgrid, so the connection was set up like this:
Conn Id : sendgrid_templates_test
Conn Type : HTTP
Host : https://api.sendgrid.com/
Extra : { "authorization": "Bearer [my token]"}
and then in the dag code:
get_templates = SimpleHttpOperator(
task_id='get_templates',
method='GET',
endpoint='/v3/templates',
http_conn_id = 'sendgrid_templates_test',
trigger_rule="all_done",
xcom_push=True
dag=dag,
)
and that worked. Also notice that my request happens after a Branch Operator, so I needed to set the trigger rule appropriately (to "all_done" to make sure it fires even when one of the branches is skipped), which has nothing to do with the question, but I just wanted to point it out.
Now to be clear, I did get an Insecure Request warning as I did not have certificate verification enabled. But you can see the resulting logs below
[2019-02-21 16:15:01,333] {http_operator.py:89} INFO - Calling HTTP method
[2019-02-21 16:15:01,336] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,335] {base_hook.py:83} INFO - Using connection to: id: sendgrid_templates_test. Host: https://api.sendgrid.com/, Port: None, Schema: None, Login: None, Password: XXXXXXXX, extra: {'authorization': 'Bearer [my token]'}
[2019-02-21 16:15:01,338] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,337] {http_hook.py:126} INFO - Sending 'GET' to url: https://api.sendgrid.com//v3/templates
[2019-02-21 16:15:01,956] {logging_mixin.py:95} WARNING - /home/csconnell/.pyenv/versions/airflow/lib/python3.6/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
InsecureRequestWarning)
[2019-02-21 16:15:05,242] {logging_mixin.py:95} INFO - [2019-02-21 16:15:05,241] {jobs.py:2527} INFO - Task exited with return code 0
I was having the same problem with HTTP/HTTPS when trying to set the connections using environment variables (although it works when i set the connection on the UI).
I've checked the issue @melchoir55 opened (https://issues.apache.org/jira/browse/AIRFLOW-2910) and you don't need to make a custom operator for that, the problem is not that HttpHook or HttpOperator can't use HTTPS, the problem is the way get_hook parse the connection string when dealing with HTTP, it actually understand that the first part (http:// or https://) is the connection type.
In summary, you don't need a custom operator, you can just set the connection in your env as the following:
AIRFLOW_CONN_HTTP_EXAMPLE=http://https%3a%2f%2fexample.com/
Instead of:
AIRFLOW_CONN_HTTP_EXAMPLE=https://example.com/
Or set the connection on the UI.
It is not a intuitive way to set up a connection but I think they are working on a better way to parse connections for Ariflow 2.0.
In Airflow 2.x you can use https URLs by passing https
for schema value while setting up your connection and can still use SimpleHttpOperator
like shown below.
my_api = SimpleHttpOperator(
task_id="my_api",
http_conn_id="YOUR_CONN_ID",
method="POST",
endpoint="/base-path/end-point",
data=get_data,
headers={"Content-Type": "application/json"},
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With