Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow Python script exiting with Task exited with return code -9, how to solve that?

I don't know what this error means, some people say it's memory error, I'm not sure, because the error is not explicit, but the table I load is large, 1 million of lines.

Here is the part of my script where the error happens:

# snapshot_profiles
  df_snapshot_profiles = load_table('snapshot_profiles', conn)

  def return_key(x, key):
    try:
      return (x[key])
    except:
      return (None)

  df_snapshot_profiles['is_manager'] = df_snapshot_profiles["payload"].apply(
      lambda x: return_key(x, 'is_manager'))
  df_snapshot_profiles_actual = df_snapshot_profiles.loc[:,
                                                         ['profile_id', 'date']]
  df_snapshot_profiles_actual.sort_values(['profile_id', 'date'], inplace=True)
  df_snapshot_profiles_actual = df_snapshot_profiles_actual.groupby(
      'profile_id').max().reset_index()
  df_snapshot_profiles.drop(
      ['id', 'payload', 'version', 'company_id', 'inserted_at', 'updated_at'],
      axis=1,
      inplace=True)
  df_snapshot_profiles_actual = df_snapshot_profiles_actual.merge(
      df_snapshot_profiles, on=['date', 'profile_id'], how='left')
  df_snapshot_profiles_actual.drop('date', axis=1, inplace=True)

  df = df.merge(df_snapshot_profiles_actual, on='profile_id', how='left')
  del df_snapshot_profiles

  # Excluir do banco empresas com menos de dois usuários (Empresas de testes)
  df_companies = df.groupby('company_name').count()
  df_companies.reset_index(inplace=True)
  df_companies = df_companies[df_companies['user_id'] > 2]
  df_companies.sort_values('user_id', ascending=False)

  companies = list(df_companies.company_name)

  df['check_company'] = df['company_name'].apply(lambda x: 'T'
                                                 if x in companies else 'F')
  df = df[df['check_company'] == 'T']
  df.drop('check_company', axis=1, inplace=True)

And here is the script to load the tables and print the memory usage:

def usage():
  process = psutil.Process(os.getpid())
  return process.memory_info()[0] / float(2**20)


def load_table(table, conn):
    print_x(f'{usage()} Mb')
    print_x(f'loading table {table}')
    cursor = conn.cursor()
    cursor.execute(f'''select * from {ORIGIN_SCHEMA}.{table};''')
    df = cursor.fetchall()
    cursor.execute(f'''
        select column_name from information_schema.columns where table_name = '{table}';
    ''')
    labels = cursor.fetchall()
    label_list = []
    for label in labels:
        label_list.append(label[0])
    df = pd.DataFrame.from_records(df, columns=label_list)
    return (df)

Is there a way to avoid the error, by reducing the memory usage or other way??

like image 223
Felipe Augusto Avatar asked Mar 03 '23 14:03

Felipe Augusto


1 Answers

Well. It should be out of memory issue. You can expand you memory or switch part of work out of core(load work in batch mode)

  • If you has budget, expand memory. 1 million line * terrible string length(1000) per column =1M*1K = 1G memory for data load. when merge dataframe, or transform dataframe, you need extra memory, so 16G should be OK.

  • if you are expert, try out of core mode, that mean work on the hard disk.

    • dask is one of pandas out of core module. computer on batch mode. slow but still work.
    • use database for some feature work. I found most of database can do similar work like pandas though complicated SQL code needed.

Good luck. If you like my answer, plse vote it up.

like image 188
Yong Wang Avatar answered May 08 '23 13:05

Yong Wang