I have written the following code in my airflow dag
, I have tested my script locally and it works like a dream. Now I am trying to get it working in my airflow dag
but not getting any luck, I have tried multiply things but to no avail
def fill_nulls (ds,file_in):
csv_file = glob.glob(os.path.join(r'/tmp/', file_in))
df = pd.read_csv(csv_file, sep='\t',header=None,error_bad_lines=False, index_col=False, dtype='unicode')
df = df.fillna(r'\N')
df.loc[:,df.dtypes==object].apply(lambda s:s.str.replace(" ", r'\N'))
df.to_csv(csv_file,sep='\t',header=None,index=False, quoting=csv.QUOTE_NONE)
fill_nulls = PythonOperator(
task_id='fill_nulls',
python_callable=fill_nulls,
provide_context=True,
templates_dict = {'file_in':'apollo_export_{{macros.ds_format(macros.ds_add( ds, -2),\'%Y-%m-%d\',\'%Y%m%d\')}}.csv'},
dag=dag
)
I am getting the following error:
: Traceback (most recent call last):
[2018-01-25 10:11:50,016] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-01-25 10:11:50,017] {base_task_runner.py:98} INFO - Subtask: args.func(args)
[2018-01-25 10:11:50,017] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-01-25 10:11:50,017] {base_task_runner.py:98} INFO - Subtask: pool=args.pool,
[2018-01-25 10:11:50,018] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-01-25 10:11:50,018] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
[2018-01-25 10:11:50,019] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-01-25 10:11:50,019] {base_task_runner.py:98} INFO - Subtask: result = task_copy.execute(context=context)
[2018-01-25 10:11:50,020] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 89, in execute
[2018-01-25 10:11:50,020] {base_task_runner.py:98} INFO - Subtask: return_value = self.execute_callable()
[2018-01-25 10:11:50,020] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
[2018-01-25 10:11:50,021] {base_task_runner.py:98} INFO - Subtask: return self.python_callable(*self.op_args, **self.op_kwargs)
[2018-01-25 10:11:50,021] {base_task_runner.py:98} INFO - Subtask: TypeError: fill_nulls() got an unexpected keyword argument 'next_execution_date'
Any help would be much appreciated!
I think that you're missing the kwargs in your function definition.
def fill_nulls(ds, file_in, **kwargs):
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With