I am using the following docker-compose image, I got this image from: https://github.com/apache/airflow/blob/main/docs/apache-airflow/start/docker-compose.yaml
version: "3"
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.0-python3.7}
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ""
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on: &airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test:
[
"CMD-SHELL",
'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"',
]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: "true"
_AIRFLOW_WWW_USER_CREATE: "true"
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
######################################################
# SPARK SERVICES
######################################################
jupyterlab:
image: andreper/jupyterlab:3.0.0-spark-3.0.0
container_name: jupyterlab
ports:
- 8888:8888
- 4040:4040
volumes:
- shared-workspace:/opt/workspace
spark-master:
image: andreper/spark-master:3.0.0
container_name: spark-master
ports:
- 8081:8080
- 7077:7077
volumes:
- shared-workspace:/opt/workspace
spark-worker-1:
image: andreper/spark-worker:3.0.0
container_name: spark-worker-1
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8082:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
spark-worker-2:
image: andreper/spark-worker:3.0.0
container_name: spark-worker-2
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8083:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
volumes:
postgres-db-volume:
shared-workspace:
name: "jordi_airflow"
driver: local
driver_opts:
type: "none"
o: "bind"
device: "/Users/jordicrespoguzman/Projects/custom_airflow_spark/spark_folder"
I am trying to run the following DAG:
from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
import csv
import requests
import json
default_args = {
"owner": "airflow",
"email_on_failure": False,
"email_on_retry": False,
"email": "[email protected]",
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def printar():
print("success!")
with DAG(
"forex_data_pipeline",
start_date=datetime(2021, 1, 1),
schedule_interval="@daily",
default_args=default_args,
catchup=False,
) as dag:
downloading_rates = PythonOperator(task_id="test1", python_callable=printar)
forex_processing = SparkSubmitOperator(
task_id="spark1",
application="/opt/airflow/dags/test.py",
conn_id="spark_conn",
verbose=False,
)
downloading_rates >> forex_processing
But I see this error in the airflow ui:
Broken DAG: [/opt/airflow/dags/dag_spark.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/dags/dag_spark.py", line 7, in <module>
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
ModuleNotFoundError: No module named 'airflow.providers.apache'
I have specified to install additional requirements in the docker-compose file:
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
I am writing it wrong? how I should specify the additional requirements I want to install in airflow? can I pass a requirements.txt? if so, how I specify the path?
In order to install Airflow you need to either downgrade pip to version 20.2. 4 pip upgrade --pip==20.2. 4 or, in case you use Pip 20.3, you need to add option --use-deprecated legacy-resolver to your pip install command. You also need database client packages (Postgres or MySQL) if you want to use those databases.
Installing via Poetry or pip-tools is not currently supported. If you wish to install airflow using those tools you should use the constraints and convert them to appropriate format and workflow that your tool requires.
You can do it in one of those ways: add your modules to one of the folders that Airflow automatically adds to PYTHONPATH. add extra folders where you keep your code to PYTHONPATH. package your code into a Python package and install it together with Airflow.
I used:
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- package1 package2 package3 }
# Example
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-apache-spark}
# or:
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-oracle apache-airflow-providers-microsoft-mssql}
(note: add 1 space)
Then when airflow docker first run Python, It will auto pip install the requirements packages.
lxml==4.6.3 charset-normalizer==1.4.1
. Available in Airflow image 2.1.1 and above.Support for _PIP_ADDITIONAL_REQUIREMENTS
environment variable has not been released yet. It is only supported by the developer/unreleased version of the docker image. It is planned that this feature will be available in Airflow 2.1.1. For more information, see: Adding extra requirements for build and runtime of the PROD image.
For the older version, you should build a new image and set this image in the docker-compose.yaml
. To do this, you need to follow a few steps.
Dockerfile
with the following content:
FROM apache/airflow:2.0.0
RUN pip install --no-cache-dir apache-airflow-providers
docker build . --tag my-company-airflow:2.0.0
docker-compose.yaml
file:
echo "AIRFLOW_IMAGE_NAME=my-company-airflow:2.0.0" >> .env
For more information, see: Official guide about running Airflow in docker-compose environment
In particular, I recommend this fragment which describes what to do as you need to install a new pip package.
ModuleNotFoundError: No module named 'XYZ'
The Docker Compose file uses the latest Airflow image (apache/airflow). If you need to install a new Python library or system library, you can customize and extend it.
I recommend you check out the guide about building Docker Image. This explains how to install even more complex dependencies.
I also recommend only using Docker-compose files from the official website and intended for a specific version. Docker-compose files from newer versions may not work with older versions of Airflow, because we are making many improvements to these files all the time to improve stability reliability, and user experience.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With