Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stream files to kafka using airflow

What is the best approach to stream CSV files to a kafka topic using airflow ?

Writing a custom Operator for airflow ?

like image 450
bsd Avatar asked Oct 16 '17 20:10

bsd


1 Answers

Probably best to use the PythonOperator to process the files line-by-line. I have a use case where I poll and SFTP server for files and when I find some, I process them line-by-line, writing the results out as JSON. I do things like parse dates into a YYYY-MM-DD format, etc. Something like this might work for you:

def csv_file_to_kafka(**context):

    f = '/path/to/downloaded/csv_file.csv'
    csvfile = open(f, 'r')
    reader = csv.DictReader(csvfile)

    for row in reader:
        """
        Send the row to Kafka
        """
    return 

csv_file_to_kafka = PythonOperator(
   task_id='csv_file_to_kafka',
   python_callable=csv_file_to_kafka,
   dag=dag
)

Now it's really up to you how you would get the files to download. In my case, I use the SSHHook and GoogleCloudStorageHook to get files from an SFTP Server and then pass the names of the files to an task that parses and cleans the csv files. I do this by pulling the files down from SFTP and putting them into Google Cloud Storage:

"""
HOOKS: Connections to external systems
"""
def sftp_connection():
    """
    Returns an SFTP connection created using the SSHHook
    """
    ssh_hook = SSHHook(ssh_conn_id='sftp_connection')
    ssh_client = ssh_hook.get_conn()
    return ssh_client.open_sftp()
def gcs_connection():
    """
    Returns an GCP connection created using the GoogleCloudStorageHook
    """
    return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection')

"""
PYTHON CALLABLES: Called by PythonOperators
"""
def get_files(**context):
    """
    Looks at all files on the FTP server and returns a list files.
    """
    sftp_client = sftp_connection()
    all_files = sftp_client.listdir('/path/to/files/')
    files = []

    for f in all_files:
        files.append(f)

    return files

def save_files(**context):
    """
    Looks to see if a file already exists in GCS. If not, the file is downloaed
    from SFTP server and uploaded to GCS. A list of
    """
    files = context['task_instance'].xcom_pull(task_ids='get_files')

    sftp_client = sftp_connection()
    gcs = gcs_connection()
    new_files = []
    new_outcomes_files = []
    new_si_files = []

    new_files = process_sftp_files(files, gcs, sftp_client)

    return new_files

def csv_file_to_kafka(**context):
    """
    Untested sample parse csv files and send to kafka
    """
    files = context['task_instance'].xcom_pull(task_ids='save_files')
    for f in new_files:
        csvfile = open(f, 'r')
        reader = csv.DictReader(csvfile)

        for row in reader:
            """
            Send the row to Kafka
            """
    return 

get_files = PythonOperator(
   task_id='get_files',
   python_callable=get_files,
   dag=dag
)
save_files = PythonOperator(
   task_id='save_files',
   python_callable=save_files,
   dag=dag
)
csv_file_to_kafka = PythonOperator(
   task_id='csv_file_to_kafka',
   python_callable=csv_file_to_kafka,
   dag=dag
)

I know I could do this all in one big python callable, that's how I'm refactoring the code now so that in the callable. So it polls the SFTP server, pulls the latest files, and parses them according to my rules all in one single python function. I have heard that using XCom isn't ideal, Airflow tasks aren't supposed to communicate with each other too much, supposedly.

Depending on your use case, you might even want to explore something like Apache Nifi, I'm actually looking into that now too.

like image 189
Mike Avatar answered Sep 29 '22 13:09

Mike