Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to separate files using dask groupby on a column

I have a large set of csv files (file_1.csv, file_2.csv), separated by time period, that cant be fit into memory. Each file will be in the format mentioned below.


| instrument | time | code     | val           |
|------------|------|----------|---------------|
| 10         | t1   | c1_at_t1 | v_of_c1_at_t1 |
| 10         | t1   | c2_at_t1 | v_of_c2_at_t1 |
| 10         | t2   | c1_at_t2 | v_of_c1_at_t2 |
| 10         | t2   | c3_at_t2 | v_of_c3_at_t2 |
| 11         | t1   | c4_at_t1 | v_of_c4_at_t1 |
| 11         | t1   | c5_at_t1 | v_of_c5_at_t1 |
| 12         | t2   | c6_at_t2 | v_of_c6_at_t2 |
| 13         | t3   | c9_at_t3 | v_of_c9_at_t3 |

Each file is about instrument logs that are consistent in their format. There are set of instruments which can emit different codes(code) at a given timestamp(time). The value of that code at a given time for a given instrument is saved in val column

I would like to split each file (ex: file_1.csv) using the instrument column(ex: 10) and then join the files extracted for the instrument (ex: 10) across all files (file_1.csv, file_2.csv)

I am thinking about using dask groupby operation on the instrument column. Is there any alternative or better approach to do it instead of using groupby or better way to extract the files by instrument?

Code that I have written to do the above operation is

import glob
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

def read_files(files):

    files = glob.glob(files)

    for f in files:

        df = dd.read_csv(f, blocksize='256MB')
        unique_inst = df['instrument'].unique()
        gb = df.groupby('instrument')  

        for v in unique_inst:
            gb.get_group(v).to_parquet(f'{v}_{f[:-4]}.parquet')

    pass

Once I have the files in f'{v}_{f[:-4]}.parquet' format, I can concat them using pandas extracted from all the files (file_1.csv, file_2.csv)

The final file for instrument 10 should be something like below where the observations at t7, t9 are concatenated from observations for instrument 10 in other files

time | code     | val           |
-----|----------|---------------|
t1   | c1_at_t1 | v_of_c1_at_t1 |
t1   | c2_at_t1 | v_of_c2_at_t1 |
t2   | c1_at_t2 | v_of_c1_at_t2 |
t2   | c3_at_t2 | v_of_c3_at_t2 |
t7   | c4_at_t7 | v_of_c4_at_t7 |
t9   | c5_at_t9 | v_of_c5_at_t9 |
like image 774
RTM Avatar asked Nov 20 '19 18:11

RTM


1 Answers

I'm not exactly sure what you need to achieve but I don't think you need any group by for your problem. It seems to me a simple filtering issue.

You can just loop over all your files and create new instrument files and append on those.

Also I don't have example files to experiment but I think you can also just use pandas with chunksize to read large csv files.

Example:

import pandas as pd
import glob
import os

# maybe play around to get better performance 
chunksize = 1000000

files = glob.glob('./file_*.csv')
for f in files:

     for chunk in pd.read_csv(f, chunksize=chunksize):
         u_inst = chunk['instrument'].unique()

         for inst in u_inst:
             # filter instrument data
            inst_df = chunk[chunk.instrument == inst]
            # filter columns
            inst_df = inst_df[['time', 'code', 'val']]
            # append to instrument file
            # only write header if not exist yet
            inst_file = f'./instrument_{inst}.csv'
            file_exist = os.path.isfile(inst_file)
            inst_df.to_csv(inst_file, mode='a', header=not file_exist)
like image 146
mjspier Avatar answered Oct 23 '22 12:10

mjspier