Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Component Gateway with DataprocOperator on Airflow

In GCP it is fairly simply to install and run a JupyterHub component from the UI or the gcloud command. I'm trying to script the processus through Airflow and the DataprocClusterCreateOperator, here an extract of the DAG

from airflow.contrib.operators import dataproc_operator  

create_cluster=dataproc_operator.DataprocClusterCreateOperator(
        task_id='create-' + CLUSTER_NAME, 
        cluster_name=CLUSTER_NAME,
        project_id=PROJECT_ID,
        num_workers=3,
        num_masters=1,
        master_machine_type='n1-standard-2',
        worker_machine_type='n1-standard-2',
        master_disk_size=100,
        worker_disk_size=100,
        storage_bucket='test-dataproc-jupyter', 
        region='europe-west4', 
        zone='europe-west4-a',
        auto_delete_ttl=21600, 
        optional_components=['JUPYTER', 'ANACONDA']
    )

However I can not managed to specifify the needed enable-component-gateway parameter. Looking at the source code, it seems the parameters is not intended (both in the deprecated or the last stable operator).

I know the REST API provides the endpointConfig.enableHttpPortAccess, but I would rather use the official operator. Does anyone has an idea how to achieved that?

like image 973
kwn Avatar asked Jan 02 '20 18:01

kwn


2 Answers

Edit, a fix fit for composer-1.8.3 with airflow-1.10.3

In Airflow 1.10.3, the cluster configuration cannot be externally created. However we can inherit the cluster creation operator and override the configuration creation. This will also let us to set the optional components, a parameter missing from this Airflow's version.

class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):

    def __init__(self, *args, **kwargs):
        super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)

    def _build_cluster_data(self):
        cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
        cluster_data['config']['endpointConfig'] = {
            'enableHttpPortAccess': True
        }
        cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
        return cluster_data

#Start DataProc Cluster
dataproc_cluster = CustomDataprocClusterCreateOperator(
    task_id='create-' + CLUSTER_NAME, 
    cluster_name=CLUSTER_NAME,
    project_id=PROJECT_ID,
    num_workers=3,
    num_masters=1,
    master_machine_type='n1-standard-2',
    worker_machine_type='n1-standard-2',
    master_disk_size=100,
    worker_disk_size=100,
    storage_bucket='test-dataproc-jupyter', 
    region='europe-west4', 
    zone='europe-west4-a',
    auto_delete_ttl=21600, 
    dag=dag
)

Original answer, for Airflow 1.10.7

While not optimal, you can create the Cluster data structure yourself instead of having Airflow's ClusterGenerator to do that. It should work on the latest version (1.10.7)

cluster = {
  'clusterName': CLUSTER_NAME,
  'config': {
    'gceClusterConfig': {
      'zoneUri': 'europe-west4-a'
    },
    'masterConfig': {
      'numInstances': 1,
      'machineTypeUri': 'n1-standard-2',
      'diskConfig': {
        'bootDiskSizeGb': 100
      },
    },
    'workerConfig': {
      'numInstances': 3,
      'machineTypeUri': 'n1-standard-2',
      'diskConfig': {
        'bootDiskSizeGb': 100
      },
    },
    'softwareConfig': {
      'optionalComponents': [
        'ANACONDA',
        'JUPYTER'
      ]
    },
    'lifestyleConfig': {
      'autoDeleteTtl': 21600
    },
    'endpointConfig': {
      'enableHttpPortAccess': True
    }
  },
  'projectId': PROJECT_ID
}
#Start DataProc Cluster
dataproc_cluster = DataprocClusterCreateOperator(
    task_id='create-' + CLUSTER_NAME,
    project_id=PROJECT_ID,
    num_workers=3,
    region='europe-west4', 
    zone='europe-west4-a',
    cluster = cluster,
    dag=DAG
)

In case you are using another Airflow version, please specify that.

You can also vote for the bug I've opened: AIRFLOW-6432

like image 50
David Rabinowitz Avatar answered Oct 21 '22 10:10

David Rabinowitz


David answer is correct for older Airflow versions however for Airflow>=2.0.0 there is now easier way using ClusterGenerator. This means there is no need to create custom operator:

from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator, DataprocCreateClusterOperator

CLUSTER_GENERATOR = ClusterGenerator(
    project_id=PROJECT_ID,
    region=REGION,
    ...,
    enable_component_gateway=True,
    optional_components = [ 'JUPYTER', 'ANACONDA' ]
).make()

DataprocCreateClusterOperator(
    ...,
    cluster_config=CLUSTER_GENERATOR
)

You can check this example dag for more details. You can view all possible parameters of ClusterGenerator in the source code.

like image 41
Elad Kalif Avatar answered Oct 21 '22 11:10

Elad Kalif