I regularly use dask.dataframe
to read multiple files, as so:
import dask.dataframe as dd
df = dd.read_csv('*.csv')
However, the origin of each row, i.e. which file the data was read from, seems to be forever lost.
Is there a way to add this as a column, e.g. df.loc[:100, 'partition'] = 'file1.csv'
if file1.csv
is the first file and contains 100 rows. This would be applied to each "partition" / file that is read into the dataframe, when compute
is triggered as part of a workflow.
The idea is that different logic can then be applied depending on the source.
You can do this by reading each CSV file into DataFrame and appending or concatenating the DataFrames to create a single DataFrame with data from all files. Here, I will use read_csv() to read CSV files and concat() function to concatenate DataFrams together to create one big DataFrame.
dask. bag uses the multiprocessing scheduler by default.
Dask runs faster than pandas for this query, even when the most inefficient column type is used, because it parallelizes the computations. pandas only uses 1 CPU core to run the query. My computer has 4 cores and Dask uses all the cores to run the computation.
Using dask instead of pandas to merge large data sets The python package dask is a powerful python package that allows you to do data analytics in parallel which means it should be faster and more memory efficient than pandas .
Dask functions read_csv, read_table, and read_fwf now include a parameter include_path_column
:
include_path_column:bool or str, optional
Whether or not to include the path to each particular file.
If True a new column is added to the dataframe called path.
If str, sets new column name. Default is False.
Assuming you have or can make a file_list
list that has the file path of each csv file, and each individual file fits in RAM (you mentioned 100 rows), then this should work:
import pandas as pd
import dask.dataframe as dd
from dask import delayed
def read_and_label_csv(filename):
# reads each csv file to a pandas.DataFrame
df_csv = pd.read_csv(filename)
df_csv['partition'] = filename.split('\\')[-1]
return df_csv
# create a list of functions ready to return a pandas.DataFrame
dfs = [delayed(read_and_label_csv)(fname) for fname in file_list]
# using delayed, assemble the pandas.DataFrames into a dask.DataFrame
ddf = dd.from_delayed(dfs)
With some customization, of course. If your csv files are bigger-than-RAM, then a concatentation of dask.DataFrame
s is probably the way to go.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With