Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to call a REST end point using Airflow DAG

Tags:

I'm new to Apache Airflow. I want to call a REST end point using DAG. REST end point for example

@PostMapping(path = "/api/employees", consumes = "application/json")

Now I want to call this rest end point using Airflow DAG, and schedule it. What I'm doing is using SimpleHttpOperator to call the Rest end point.

t1 = SimpleHttpOperator(
task_id='post_op',
endpoint='http://localhost:8084/api/employees',
data=json.dumps({"department": "Digital","id": 102,"name": "Rakesh","salary": 80000}),
headers={"Content-Type": "application/json"},
dag=dag,)

When I trigger the DAG the task is getting failed

[2019-12-30 09:09:06,330] {{taskinstance.py:862}} INFO - Executing <Task(SimpleHttpOperator): 
post_op> on 2019-12-30T08:57:00.674386+00:00
[2019-12-30 09:09:06,331] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run', 
'example_http_operator', 'post_op', '2019-12-30T08:57:00.674386+00:00', '--job_id', '6', '--pool', 
'default_pool', '--raw', '-sd', 'DAGS_FOLDER/ExampleHttpOperator.py', '--cfg_path', 
'/tmp/tmpf9t6kzxb']
[2019-12-30 09:09:07,446] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30 
09:09:07,445] {{__init__.py:51}} INFO - Using executor SequentialExecutor
[2019-12-30 09:09:07,446] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30 
09:09:07,446] {{dagbag.py:92}} INFO - Filling up the DagBag from 
/usr/local/airflow/dags/ExampleHttpOperator.py
[2019-12-30 09:09:07,473] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30 
09:09:07,472] {{cli.py:545}} INFO - Running <TaskInstance: example_http_operator.post_op 2019-12- 
30T08:57:00.674386+00:00 [running]> on host 855dbc2ce3a3
[2019-12-30 09:09:07,480] {{http_operator.py:87}} INFO - Calling HTTP method
[2019-12-30 09:09:07,483] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,483] 
{{base_hook.py:84}} INFO - Using connection to: id: http_default. Host: https://www.google.com/, 
Port: None, Schema: None, Login: None, Password: None, extra: {}
[2019-12-30 09:09:07,484] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,484] 
{{http_hook.py:131}} INFO - Sending 'POST' to url: 
https://www.google.com/http://localhost:8084/api/employees
[2019-12-30 09:09:07,501] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,501] 
{{http_hook.py:181}} WARNING - HTTPSConnectionPool(host='www.google.com', port=443): Max retries 
exceeded with url: /http://localhost:8084/api/employees (Caused by SSLError(SSLError("bad handshake: 
SysCallError(-1, 'Unexpected EOF')"))) Tenacity will retry to execute the operation
[2019-12-30 09:09:07,501] {{taskinstance.py:1058}} ERROR - 
HTTPSConnectionPool(host='www.google.com', port=443): Max retries exceeded with url: 
/http://localhost:8084/api/employees (Caused by SSLError(SSLError("bad handshake: SysCallError(-1, 
'Unexpected EOF')")))
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 485, in wrap_socket
cnx.do_handshake()
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1934, in do_handshake
self._raise_ssl_error(self._ssl, result)
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1664, in _raise_ssl_error
raise SysCallError(-1, "Unexpected EOF")
OpenSSL.SSL.SysCallError: (-1, 'Unexpected EOF')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 376, in _make_request
self._validate_conn(conn)
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 994, in _validate_conn
conn.connect()
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 394, in connect
ssl_context=context,
File "/usr/local/lib/python3.7/site-packages/urllib3/util/ssl_.py", line 370, in ssl_wrap_socket
return context.wrap_socket(sock, server_hostname=server_hostname)
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 491, in wrap_socket
raise ssl.SSLError("bad handshake: %r" % e)
ssl.SSLError: ("bad handshake: SysCallError(-1, 'Unexpected EOF')",)

Airflow is running on Docker and the docker image is puckel/docker-airflow. Why it is calling the host http_default. Host: https://www.google.com/

like image 490
Govil Kumar Avatar asked Jan 03 '20 06:01

Govil Kumar


People also ask

Does Airflow have a REST API?

Airflow exposes an REST API. It is available through the webserver. Endpoints are available at /api/experimental/ .


2 Answers

You need to consider both the Operator you are using and the underlying Hook which it uses to connect. The Hook fetches connection information from an Airflow Connection which is just a container used to store credentials and other connection information. You can configure Connections in the Airflow UI (using the Airflow UI -> Admin -> Connections).

So in this case, you need to first configure your HTTP Connection.

From the http_hook documentation:

http_conn_id (str) – connection that has the base API url i.e https://www.google.com/

It so happens that for the httpHook, you should configure the Connection by setting the host argument equal to the base_url of your endpoint: http://localhost:8084/.

Since your operator has the default http_conn_id, the hook will use the Airflow Connection called "http_default" in the Airflow UI. If you don't want to change the default one you can create another Airflow Connection using the Airflow UI, and pass the new conn_id argument to your operator.

See the source code to get a better idea how the Connection object is used.

Lastly, according to the http_operator documentation:

endpoint (str) – The relative part of the full url. (templated)

You should only be passing the relative part of your URL to the operator. The rest it will get from the underlying http_hook.

In this case, the value of endpoint for your Operator should be api/employees (not the full URL).

The Airflow project documentation is unfortunately not very clear in this case. Please consider contributing an improvement, they are always welcome :)

like image 69
BjornO Avatar answered Sep 19 '22 11:09

BjornO


I think you need to set your ENV variable of connection string in your Dockerfile or docker run command:

ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN my_conn_string

see this and this

Connections

The connection information to external systems is stored in the Airflow metadata database and managed in the UI (Menu -> Admin -> Connections) A conn_id is defined there and hostname / login / password / schema information attached to it. Airflow pipelines can simply refer to the centrally managed conn_id without having to hard code any of this information anywhere.

Many connections with the same conn_id can be defined and when that is the case, and when thehooks uses the get_connection method from BaseHook, Airflow will choose one connection randomly, allowing for some basic load balancing and fault tolerance when used in conjunction with retries.

Airflow also has the ability to reference connections via environment variables from the operating system. The environment variable needs to be prefixed with AIRFLOW_CONN_ to be considered a connection. When referencing the connection in the Airflow pipeline, the conn_id should be the name of the variable without the prefix. For example, if the conn_id is named POSTGRES_MASTER the environment variable should be named AIRFLOW_CONN_POSTGRES_MASTER. Airflow assumes the value returned from the environment variable to be in a URI format (e.g.postgres://user:password@localhost:5432/master).

see this

therefore you are now using the default:

Using connection to: id: http_default. Host: https://www.google.com/
like image 40
LinPy Avatar answered Sep 22 '22 11:09

LinPy