I'm working on a little proof of concept about Airflow on Google Cloud.
Essentially, I want to create a workflow that download data from an REST API (https), transform this data into JSON format and upload it on a Google Cloud storage unit.
I've already done this with pure Python code and it works. Pretty straightforward! But because I want to schedule this and there is some dependencies, Airflow should be the ideal tool for this.
After careful reading of the Airflow documentation, I've seen the HttpOperator and/or HttpHook can do the trick for the download part.
I've created my Http connection into the WebUI with my email/password for the authorization as the following:
{Conn Id: "atlassian_marketplace", Conn Type: "HTTP", Host: "https://marketplace.atlassian.com/rest/2", Schema: None/Blank, Login: "my username", Password: "my password", Port: None/Blank, Extra: None/Blank}
First question: -When to use the SimpleHttpOperator versus the HttpHook?
Second question: -How do we use SimpleHttpOperator or HttpHook with HTTPs calls?
Third question: -How do we access the data returned by the API call?
In my case, the XCOM feature will not do the trick because these API calls can return a lot of data (100-300mb)!
I've look on Google to find an example code on how to use the operaor/hook for my use case but i didn't find anything useful, yet.
Any ideas?
I put here the skeleton of my code so far.
# Usual Airflow import
# Dag creation
dag = DAG(
'get_reporting_links',
default_args=default_args,
description='Get reporting links',
schedule_interval=timedelta(days=1))
# Task 1: Dummy start
start = DummyOperator(task_id="Start", retries=2, dag=dag)
# Task 2: Connect to Atlassian Marketplace
get_data = SimpleHttpOperator(
http_conn_id="atlassian_marketplace",
endpoint="/vendors/{vendorId}/reporting".format({vendorId: "some number"}),
method="GET")
# Task 3: Save JSON data locally
# TODO: transform_json: transform to JSON get_data.json()?
# Task 4: Upload data to GCP
# TODO: upload_gcs: use Airflow GCS connection
# Task 5: Stop
stop = DummyOperator(task_id="Stop", retries=2, dag=dag)
# Dependencies
start >> get_data >> transform_json >> upload_gcs >> stop
Hooks are interfaces to services external to the Airflow Cluster. While Operators provide a way to create tasks that may or may not communicate with some external service, hooks provide a uniform interface to access external services like S3, MySQL, Hive, Qubole, etc.
Apache Airflow has an API interface that can help you to perform tasks like getting information about tasks and DAGs, getting Airflow configuration, updating DAGs, listing users, and adding and deleting connections.
Look at the following example:
# Usual Airflow import
# Dag creation
dag = DAG(
'get_reporting_links',
default_args=default_args,
description='Get reporting links',
schedule_interval=timedelta(days=1))
# Task 1: Dummy start
start = DummyOperator(task_id="Start", retries=2, dag=dag)
# Task 2: Connect to Atlassian Marketplace
get_data = SimpleHttpOperator(
task_id="get_data",
http_conn_id="atlassian_marketplace",
endpoint="/vendors/{vendorId}/reporting".format({vendorId: "some number"}),
method="GET",
xcom_push=True,
)
def transform_json(**kwargs):
ti = kwargs['ti']
pulled_value_1 = ti.xcom_pull(key=None, task_ids='get_data')
...
# transform the json here and save the content to a file
# Task 3: Save JSON data locally
save_and_transform = PythonOperator(
task_id="save_and_transform",
python_callable=transform_json,
provide_context=True,
)
# Task 4: Upload data to GCP
upload_to_gcs = FileToGoogleCloudStorageOperator(...)
# Task 5: Stop
stop = DummyOperator(task_id="Stop", retries=2, dag=dag)
# Dependencies
start >> get_data >> save_and_transform >> upload_to_gcs >> stop
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With