Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Problems using Airflow v1.9 Python Operator

I have written the following code in my airflow dag, I have tested my script locally and it works like a dream. Now I am trying to get it working in my airflow dag but not getting any luck, I have tried multiply things but to no avail

def fill_nulls (ds,file_in):
      csv_file = glob.glob(os.path.join(r'/tmp/', file_in))
      df = pd.read_csv(csv_file, sep='\t',header=None,error_bad_lines=False, index_col=False, dtype='unicode')
      df = df.fillna(r'\N')
      df.loc[:,df.dtypes==object].apply(lambda s:s.str.replace(" ", r'\N'))
      df.to_csv(csv_file,sep='\t',header=None,index=False, quoting=csv.QUOTE_NONE)


    fill_nulls = PythonOperator(
      task_id='fill_nulls',
      python_callable=fill_nulls,
      provide_context=True,
      templates_dict = {'file_in':'apollo_export_{{macros.ds_format(macros.ds_add( ds, -2),\'%Y-%m-%d\',\'%Y%m%d\')}}.csv'},
      dag=dag

    ) 

I am getting the following error:

: Traceback (most recent call last):
[2018-01-25 10:11:50,016] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/bin/airflow", line 27, in <module>
[2018-01-25 10:11:50,017] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-01-25 10:11:50,017] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-01-25 10:11:50,017] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-01-25 10:11:50,018] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-01-25 10:11:50,018] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-01-25 10:11:50,019] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-01-25 10:11:50,019] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-01-25 10:11:50,020] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 89, in execute
[2018-01-25 10:11:50,020] {base_task_runner.py:98} INFO - Subtask:     return_value = self.execute_callable()
[2018-01-25 10:11:50,020] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
[2018-01-25 10:11:50,021] {base_task_runner.py:98} INFO - Subtask:     return self.python_callable(*self.op_args, **self.op_kwargs)
[2018-01-25 10:11:50,021] {base_task_runner.py:98} INFO - Subtask: TypeError: fill_nulls() got an unexpected keyword argument 'next_execution_date'

Any help would be much appreciated!

like image 478
D_usv Avatar asked Jan 25 '18 10:01

D_usv


1 Answers

I think that you're missing the kwargs in your function definition.

def fill_nulls(ds, file_in, **kwargs):
like image 53
Antoine Augusti Avatar answered Sep 17 '22 23:09

Antoine Augusti