I have a dataframe and I need to filter it according to the following conditions
CITY == 'Mumbai' & LANGUAGE == 'English' & GENRE == 'ACTION' & count_GENRE >= 1
CITY == 'Mumbai' & LANGUAGE == 'English' & GENRE == 'ROMANCE' & count_GENRE >= 1
CITY == 'Mumbai' & LANGUAGE == 'Hindi' & count_LANGUAGE >= 1 & GENRE == 'ACTION'
when I am trying to do that by
df1 = df.query(condition1)
df2 = df.query(condition2)
I am getting memory error(since my dataframe size is Huge).
SO I planned to go by filtering main condition then sub condition, so that the load will be less and performance will be better.
By parsing above conditions, somehow managed to get
main_filter = "CITY == 'Mumbai'"
sub_cond1 = "LANGUAGE == 'English'"
sub_cond1_cond1 = "GENRE == 'ACTION' & count_GENRE >= 1"
sub_cond1_cond2 = "GENRE == 'ROMANCE' & count_GENRE >= 1"
sub_cond2 = "LANGUAGE == 'Hindi' & count_LANGUGE >= 1"
sub_cond2_cond1 = "GENRE == 'COMEDY'"
So think it as a tree structure(not binary of course and actually it is not a tree at all).
Now I want to follow a multiprocessing method (deep -- sub process under subprocess)
Now I want something like
on level 1
df = df_main.query(main_filter)
on level 2
df1 = df.query(sub_cond1)
df2 = df.query(sub_cond2)
onlevel 3
df11 = df1.query(sub_cond1_cond1)
df12 = df1.query(sub_cond1_cond2)
df21 = df2.query(sub_cond2_cond1) ######like this
So problem is how to pass conditions properly to each level(if I am going to store all conditions in a list(Actually not even thought about that)).
NB: result from each filteration should export to separate separate csvs.
Ex:
df11.to_csv('CITY == 'Mumbai' & LANGUAGE == 'English' & GENRE == 'ACTION' & count_GENRE >= 1')
As a starter I don't know how to follow multiprocessing (its syntax & way of execution, etc particularly for this). But got the task unfortunately. Hence not able to post any codes.
So can anybody give a codeline example to achieve this.
If you have any better idea (class object or node traversing), please suggest.
This looks like a problem suitable for dask
, the python module that helps you deal with larger-than-memory data.
I will show how to solve this problem using the dask.dataframe
. Let's start by creating some data:
import pandas as pd
from collections import namedtuple
Record = namedtuple('Record', "CITY LANGUAGE GENRE count_GENRE count_LANGUAGE")
cities = ['Mumbai', 'Chennai', 'Bengalaru', 'Kolkata']
languages = ['English', 'Hindi', 'Spanish', 'French']
genres = ['Action', 'Romance', 'Comedy', 'Drama']
import random
df = pd.DataFrame([Record(random.choice(cities),
random.choice(languages),
random.choice(genres),
random.choice([1,2,3]),
random.choice([1,2,3])) for i in range(4000000)])
df.to_csv('temp.csv', index=False)
print(df.head())
CITY LANGUAGE GENRE count_GENRE count_LANGUAGE
0 Chennai Spanish Action 2 1
1 Bengalaru English Drama 2 3
2 Kolkata Spanish Action 2 1
3 Mumbai French Romance 1 2
4 Chennai French Action 2 3
The data created above has 4 million rows, and occupies 107 MB. It is not larger-than-memory, but good enough to use in this example.
Below I show the transcript of a python
session where I filtered the data according to the criteria in the question:
>>> import dask.dataframe as dd
>>> dask_df = dd.read_csv('temp.csv', header=0)
>>> dask_df.npartitions
4
# We see above that dask.dataframe has decided to split the
# data into 4 partitions
# We now execute the query:
>>> result = dask_df[(dask_df['CITY'] == 'Mumbai') &
... (dask_df['LANGUAGE'] == 'English') &
... (dask_df['GENRE'] == 'Action') &
... (dask_df['count_GENRE'] > 1)]
>>>
# The line above takes very little time to execute. In fact, nothing has
# really been computed yet. Behind the scenes dask has create a plan to
# execute the query, but has not yet pulled the trigger.
# The result object is a dask dataframe:
>>> type(result)
<class 'dask.dataframe.core.DataFrame'>
>>> result
dd.DataFrame<series-slice-read-csv-temp.csv-fc62a8c019c213f4cd106801b9e45b29[elemwise-cea80b0dd8dd29ae325a9db1896b027c], divisions=(None, None, None, None, None)>
# We now pull the trigger by calling the compute() method on the dask
# dataframe. The execution of the line below takes a few seconds:
>>> dfout = result.compute()
# The result is a regular pandas dataframe:
>>> type(dfout)
<class 'pandas.core.frame.DataFrame'>
# Of our 4 million records, only ~40k match the query:
>>> len(dfout)
41842
>>> dfout.head()
CITY LANGUAGE GENRE count_GENRE count_LANGUAGE
225 Mumbai English Action 2 3
237 Mumbai English Action 3 2
306 Mumbai English Action 3 3
335 Mumbai English Action 2 2
482 Mumbai English Action 2 3
I hope this gets you started on the solution to your problem. For more info on dask
see the tutorial and examples.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With