I'm new to Apache Airflow. I want to call a REST end point using DAG. REST end point for example
@PostMapping(path = "/api/employees", consumes = "application/json")
Now I want to call this rest end point using Airflow DAG, and schedule it. What I'm doing is using SimpleHttpOperator to call the Rest end point.
t1 = SimpleHttpOperator(
task_id='post_op',
endpoint='http://localhost:8084/api/employees',
data=json.dumps({"department": "Digital","id": 102,"name": "Rakesh","salary": 80000}),
headers={"Content-Type": "application/json"},
dag=dag,)
When I trigger the DAG the task is getting failed
[2019-12-30 09:09:06,330] {{taskinstance.py:862}} INFO - Executing <Task(SimpleHttpOperator):
post_op> on 2019-12-30T08:57:00.674386+00:00
[2019-12-30 09:09:06,331] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run',
'example_http_operator', 'post_op', '2019-12-30T08:57:00.674386+00:00', '--job_id', '6', '--pool',
'default_pool', '--raw', '-sd', 'DAGS_FOLDER/ExampleHttpOperator.py', '--cfg_path',
'/tmp/tmpf9t6kzxb']
[2019-12-30 09:09:07,446] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30
09:09:07,445] {{__init__.py:51}} INFO - Using executor SequentialExecutor
[2019-12-30 09:09:07,446] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30
09:09:07,446] {{dagbag.py:92}} INFO - Filling up the DagBag from
/usr/local/airflow/dags/ExampleHttpOperator.py
[2019-12-30 09:09:07,473] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30
09:09:07,472] {{cli.py:545}} INFO - Running <TaskInstance: example_http_operator.post_op 2019-12-
30T08:57:00.674386+00:00 [running]> on host 855dbc2ce3a3
[2019-12-30 09:09:07,480] {{http_operator.py:87}} INFO - Calling HTTP method
[2019-12-30 09:09:07,483] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,483]
{{base_hook.py:84}} INFO - Using connection to: id: http_default. Host: https://www.google.com/,
Port: None, Schema: None, Login: None, Password: None, extra: {}
[2019-12-30 09:09:07,484] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,484]
{{http_hook.py:131}} INFO - Sending 'POST' to url:
https://www.google.com/http://localhost:8084/api/employees
[2019-12-30 09:09:07,501] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,501]
{{http_hook.py:181}} WARNING - HTTPSConnectionPool(host='www.google.com', port=443): Max retries
exceeded with url: /http://localhost:8084/api/employees (Caused by SSLError(SSLError("bad handshake:
SysCallError(-1, 'Unexpected EOF')"))) Tenacity will retry to execute the operation
[2019-12-30 09:09:07,501] {{taskinstance.py:1058}} ERROR -
HTTPSConnectionPool(host='www.google.com', port=443): Max retries exceeded with url:
/http://localhost:8084/api/employees (Caused by SSLError(SSLError("bad handshake: SysCallError(-1,
'Unexpected EOF')")))
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 485, in wrap_socket
cnx.do_handshake()
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1934, in do_handshake
self._raise_ssl_error(self._ssl, result)
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1664, in _raise_ssl_error
raise SysCallError(-1, "Unexpected EOF")
OpenSSL.SSL.SysCallError: (-1, 'Unexpected EOF')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 376, in _make_request
self._validate_conn(conn)
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 994, in _validate_conn
conn.connect()
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 394, in connect
ssl_context=context,
File "/usr/local/lib/python3.7/site-packages/urllib3/util/ssl_.py", line 370, in ssl_wrap_socket
return context.wrap_socket(sock, server_hostname=server_hostname)
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 491, in wrap_socket
raise ssl.SSLError("bad handshake: %r" % e)
ssl.SSLError: ("bad handshake: SysCallError(-1, 'Unexpected EOF')",)
Airflow is running on Docker and the docker image is puckel/docker-airflow. Why it is calling the host http_default. Host: https://www.google.com/
Airflow exposes an REST API. It is available through the webserver. Endpoints are available at /api/experimental/ .
You need to consider both the Operator
you are using and the underlying Hook
which it uses to connect.
The Hook
fetches connection information from an Airflow Connection
which is just a container used to store credentials and other connection information. You can configure Connections
in the Airflow UI (using the Airflow UI -> Admin -> Connections).
So in this case, you need to first configure your HTTP Connection
.
From the http_hook documentation:
http_conn_id (str) – connection that has the base API url i.e https://www.google.com/
It so happens that for the httpHook
, you should configure the Connection
by setting the host
argument equal to the base_url
of your endpoint: http://localhost:8084/
.
Since your operator has the default http_conn_id
, the hook will use the Airflow Connection
called "http_default" in the Airflow UI.
If you don't want to change the default one you can create another Airflow Connection
using the Airflow UI, and pass the new conn_id argument to your operator.
See the source code to get a better idea how the Connection
object is used.
Lastly, according to the http_operator documentation:
endpoint (str) – The relative part of the full url. (templated)
You should only be passing the relative part of your URL to the operator. The rest it will get from the underlying http_hook
.
In this case, the value of endpoint
for your Operator
should be api/employees
(not the full URL).
The Airflow project documentation is unfortunately not very clear in this case. Please consider contributing an improvement, they are always welcome :)
I think you need to set your ENV variable of connection string in your Dockerfile or docker run command:
ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN my_conn_string
see this and this
Connections
The connection information to external systems is stored in the Airflow metadata database and managed in the UI (Menu -> Admin -> Connections) A conn_id is defined there and hostname / login / password / schema information attached to it. Airflow pipelines can simply refer to the centrally managed conn_id without having to hard code any of this information anywhere.
Many connections with the same conn_id can be defined and when that is the case, and when thehooks uses the get_connection method from BaseHook, Airflow will choose one connection randomly, allowing for some basic load balancing and fault tolerance when used in conjunction with retries.
Airflow also has the ability to reference connections via environment variables from the operating system. The environment variable needs to be prefixed with AIRFLOW_CONN_ to be considered a connection. When referencing the connection in the Airflow pipeline, the conn_id should be the name of the variable without the prefix. For example, if the conn_id is named POSTGRES_MASTER the environment variable should be named AIRFLOW_CONN_POSTGRES_MASTER. Airflow assumes the value returned from the environment variable to be in a URI format (e.g.postgres://user:password@localhost:5432/master).
see this
therefore you are now using the default:
Using connection to: id: http_default. Host: https://www.google.com/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With