I have a large set of csv files (file_1.csv
, file_2.csv
), separated by time period, that cant be fit into memory. Each file will be in the format mentioned below.
| instrument | time | code | val |
|------------|------|----------|---------------|
| 10 | t1 | c1_at_t1 | v_of_c1_at_t1 |
| 10 | t1 | c2_at_t1 | v_of_c2_at_t1 |
| 10 | t2 | c1_at_t2 | v_of_c1_at_t2 |
| 10 | t2 | c3_at_t2 | v_of_c3_at_t2 |
| 11 | t1 | c4_at_t1 | v_of_c4_at_t1 |
| 11 | t1 | c5_at_t1 | v_of_c5_at_t1 |
| 12 | t2 | c6_at_t2 | v_of_c6_at_t2 |
| 13 | t3 | c9_at_t3 | v_of_c9_at_t3 |
Each file is about instrument logs that are consistent in their format. There are set of instruments which can emit different codes(code
) at a given timestamp(time
). The value of that code
at a given time
for a given instrument is saved in val
column
I would like to split each file (ex: file_1.csv
) using the instrument
column(ex: 10
) and then join the files extracted for the instrument (ex: 10
) across all files (file_1.csv
, file_2.csv
)
I am thinking about using dask
groupby operation on the instrument
column. Is there any alternative or better approach to do it instead of using groupby
or better way to extract the files by instrument
?
Code that I have written to do the above operation is
import glob
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
def read_files(files):
files = glob.glob(files)
for f in files:
df = dd.read_csv(f, blocksize='256MB')
unique_inst = df['instrument'].unique()
gb = df.groupby('instrument')
for v in unique_inst:
gb.get_group(v).to_parquet(f'{v}_{f[:-4]}.parquet')
pass
Once I have the files in f'{v}_{f[:-4]}.parquet'
format, I can concat them using pandas
extracted from all the files (file_1.csv
, file_2.csv
)
The final file for instrument 10
should be something like below where the observations at t7
, t9
are concatenated from observations for instrument 10
in other files
time | code | val |
-----|----------|---------------|
t1 | c1_at_t1 | v_of_c1_at_t1 |
t1 | c2_at_t1 | v_of_c2_at_t1 |
t2 | c1_at_t2 | v_of_c1_at_t2 |
t2 | c3_at_t2 | v_of_c3_at_t2 |
t7 | c4_at_t7 | v_of_c4_at_t7 |
t9 | c5_at_t9 | v_of_c5_at_t9 |
I'm not exactly sure what you need to achieve but I don't think you need any group by for your problem. It seems to me a simple filtering issue.
You can just loop over all your files and create new instrument files and append on those.
Also I don't have example files to experiment but I think you can also just use pandas with chunksize to read large csv files.
Example:
import pandas as pd
import glob
import os
# maybe play around to get better performance
chunksize = 1000000
files = glob.glob('./file_*.csv')
for f in files:
for chunk in pd.read_csv(f, chunksize=chunksize):
u_inst = chunk['instrument'].unique()
for inst in u_inst:
# filter instrument data
inst_df = chunk[chunk.instrument == inst]
# filter columns
inst_df = inst_df[['time', 'code', 'val']]
# append to instrument file
# only write header if not exist yet
inst_file = f'./instrument_{inst}.csv'
file_exist = os.path.isfile(inst_file)
inst_df.to_csv(inst_file, mode='a', header=not file_exist)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With