Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Filtering a large dataframe in pandas using multiprocessing

I have a dataframe and I need to filter it according to the following conditions

CITY == 'Mumbai' & LANGUAGE == 'English' & GENRE == 'ACTION' & count_GENRE >= 1
CITY == 'Mumbai' & LANGUAGE == 'English' & GENRE == 'ROMANCE' & count_GENRE >= 1
CITY == 'Mumbai' & LANGUAGE == 'Hindi' & count_LANGUAGE >= 1 & GENRE == 'ACTION' 

when I am trying to do that by

  df1 = df.query(condition1)
  df2 = df.query(condition2)

I am getting memory error(since my dataframe size is Huge).

SO I planned to go by filtering main condition then sub condition, so that the load will be less and performance will be better.

By parsing above conditions, somehow managed to get

main_filter = "CITY == 'Mumbai'"
sub_cond1 = "LANGUAGE == 'English'"
sub_cond1_cond1 = "GENRE == 'ACTION' & count_GENRE >= 1"
sub_cond1_cond2 = "GENRE == 'ROMANCE' & count_GENRE >= 1"
sub_cond2 = "LANGUAGE == 'Hindi' & count_LANGUGE >= 1"
sub_cond2_cond1 = "GENRE == 'COMEDY'"

So think it as a tree structure(not binary of course and actually it is not a tree at all).

Now I want to follow a multiprocessing method (deep -- sub process under subprocess)

Now I want something like

on level 1
 df = df_main.query(main_filter)
on level 2
 df1 = df.query(sub_cond1)
 df2 = df.query(sub_cond2)
onlevel 3
  df11 = df1.query(sub_cond1_cond1)
  df12 = df1.query(sub_cond1_cond2)
  df21 = df2.query(sub_cond2_cond1)  ######like this

So problem is how to pass conditions properly to each level(if I am going to store all conditions in a list(Actually not even thought about that)).

NB: result from each filteration should export to separate separate csvs.

Ex:

df11.to_csv('CITY == 'Mumbai' & LANGUAGE == 'English' & GENRE == 'ACTION' & count_GENRE >= 1')

As a starter I don't know how to follow multiprocessing (its syntax & way of execution, etc particularly for this). But got the task unfortunately. Hence not able to post any codes.

So can anybody give a codeline example to achieve this.

If you have any better idea (class object or node traversing), please suggest.

like image 928
Satya Avatar asked Nov 29 '22 23:11

Satya


1 Answers

This looks like a problem suitable for dask, the python module that helps you deal with larger-than-memory data.

I will show how to solve this problem using the dask.dataframe. Let's start by creating some data:

import pandas as pd
from collections import namedtuple
Record = namedtuple('Record', "CITY LANGUAGE GENRE count_GENRE count_LANGUAGE")

cities = ['Mumbai', 'Chennai', 'Bengalaru', 'Kolkata']
languages = ['English', 'Hindi', 'Spanish', 'French']
genres = ['Action', 'Romance', 'Comedy', 'Drama']

import random

df = pd.DataFrame([Record(random.choice(cities), 
                          random.choice(languages), 
                          random.choice(genres), 
                          random.choice([1,2,3]), 
                          random.choice([1,2,3])) for i in range(4000000)])

df.to_csv('temp.csv', index=False)    
print(df.head())

        CITY LANGUAGE    GENRE  count_GENRE  count_LANGUAGE
0    Chennai  Spanish   Action            2               1
1  Bengalaru  English    Drama            2               3
2    Kolkata  Spanish   Action            2               1
3     Mumbai   French  Romance            1               2
4    Chennai   French   Action            2               3

The data created above has 4 million rows, and occupies 107 MB. It is not larger-than-memory, but good enough to use in this example.

Below I show the transcript of a python session where I filtered the data according to the criteria in the question:

>>> import dask.dataframe as dd
>>> dask_df = dd.read_csv('temp.csv', header=0)
>>> dask_df.npartitions
4

# We see above that dask.dataframe has decided to split the 
# data into 4 partitions

# We now execute the query:
>>> result = dask_df[(dask_df['CITY'] == 'Mumbai') &
...                  (dask_df['LANGUAGE'] == 'English') &
...                  (dask_df['GENRE'] == 'Action') &
...                  (dask_df['count_GENRE'] > 1)]
>>>

# The line above takes very little time to execute.  In fact, nothing has
# really been computed yet.  Behind the scenes dask has create a plan to  
# execute the query, but has not yet pulled the trigger.

# The result object is a dask dataframe:
>>> type(result)
<class 'dask.dataframe.core.DataFrame'>
>>> result
dd.DataFrame<series-slice-read-csv-temp.csv-fc62a8c019c213f4cd106801b9e45b29[elemwise-cea80b0dd8dd29ae325a9db1896b027c], divisions=(None, None, None, None, None)>

# We now pull the trigger by calling the compute() method on the dask
# dataframe.  The execution of the line below takes a few seconds:
>>> dfout = result.compute()

# The result is a regular pandas dataframe:
>>> type(dfout)
<class 'pandas.core.frame.DataFrame'>

# Of our 4 million records, only ~40k match the query:
>>> len(dfout)
41842

>>> dfout.head()
       CITY LANGUAGE   GENRE  count_GENRE  count_LANGUAGE
225  Mumbai  English  Action            2               3
237  Mumbai  English  Action            3               2
306  Mumbai  English  Action            3               3
335  Mumbai  English  Action            2               2
482  Mumbai  English  Action            2               3

I hope this gets you started on the solution to your problem. For more info on dask see the tutorial and examples.

like image 164
Pedro M Duarte Avatar answered Dec 04 '22 02:12

Pedro M Duarte