Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading through multiple files in chunks in R

I'm trying to read through multiple compressed tables that are 5GB+ in size in R, and because I have insufficient memory to read them into memory all at once I need to process them one chunk at a time, for example the first 1000 rows of each file, then the next 1000 rows of each file, etc. I know how to keep a file open with a cursor or file pointer saved in basically any language other than R. How can I do that here?

I'm currently doing something a lot like this:

library(data.table)
library(R.utils)

inFiles = c("file1.tsv.gz", "file2.tsv.gz", "file3.tsv.gz")
totallines <- 10000
chunksize <- 1000

iters          <- 1
skip_val       <- 0
max_iters      <- ceiling(totallines/chunksize)

while (iters <= max_iters) {

    
    data = lapply(inFiles,function(file) {
      data.table::fread(file, nrows=chunksize, skip=skip_val,
                        col.names=data_colnames, sep="\t")
    })

    # Process the data in omitted code here

    # Move on to the next chunk
    iters    = iters + 1
    skip_val = skip_val + chunksize
}

The problem is that these files are large-ish and compressed, and the smaller the chunksize or larger the file, the program spends more and more of its time just reading because of the skipped lines. Every single time it reads the next chunk, it also has to decompress and skip all of the previous lines.

I looked at readr::read_delim_chunked , but am not sure how I could use it to iterate through many files at once.

like image 531
Alex Petty Avatar asked Nov 17 '25 07:11

Alex Petty


1 Answers

You're looking for pipe(). When used inside a loop like repeat(), readLines() continues from the current position — it doesn't restart gunzip or re-decompress previous content.

process_chunks <- \(x, total.lines=1e5, chunk.size=1e3) {
  n_chunks <- ceiling(total.lines/chunk.size)
  unix <- identical(.Platform$OS.type, "unix")
  ## open pipe
  if (!unix) {
    con <- pipe(sprintf("7z e -so %s", shQuote(x)), open="r")  ## Windows fallback (not tested)
  } else {
    con <- pipe(sprintf("gunzip -c %s", shQuote(x)), open="r")
  }
  on.exit(try(close(con), silent=TRUE))  ## ensure pipe is closed gracefully on exit
  res_list <- vector(mode='list', length=n_chunks)
  i <- 1
  repeat {
    lins <- readLines(con, n=chunk.size)
    if (length(lins) == 0) break
    df <- data.table::fread(text=lins)
    ## Process data, save in list
    res_list[[i]] <- colSums(df)  
    ## ++++++++++++++++++++++++++
    i <- i + 1
  }
  do.call(rbind, res_list)  ## rbind result
}

Note: Solution as is assumes there's just data in the .tsv's, no header.

Usage

Single file:

> process_chunks("foo1.tsv.gz") |> head()
             V1          V2         V3        V4
[1,] -25.824427 -38.1319442 -15.260574  11.32532
[2,]  -5.317994 -66.8804838  -3.754295  40.01791
[3,]  -3.206987  -0.4199584  31.328836  11.47539
[4,] -21.786821  36.2002708 -25.986968 -12.03419
[5,] -15.829041  -5.8027936 -25.947610  26.12207
[6,]  23.008565  34.1792188  71.192981 -13.35848

Multiple files:

> in_Files <- c("foo1.tsv.gz", "foo2.tsv.gz", "foo3.tsv.gz")
> lapply(in_Files, process_chunks, total.lines=1e5, chunk.size=1e3) |> lapply(head)
[[1]]
             V1          V2         V3        V4
[1,] -25.824427 -38.1319442 -15.260574  11.32532
[2,]  -5.317994 -66.8804838  -3.754295  40.01791
[3,]  -3.206987  -0.4199584  31.328836  11.47539
[4,] -21.786821  36.2002708 -25.986968 -12.03419
[5,] -15.829041  -5.8027936 -25.947610  26.12207
[6,]  23.008565  34.1792188  71.192981 -13.35848

[[2]]
             V1          V2         V3        V4
[1,] -25.824427 -38.1319442 -15.260574  11.32532
[2,]  -5.317994 -66.8804838  -3.754295  40.01791
[3,]  -3.206987  -0.4199584  31.328836  11.47539
[4,] -21.786821  36.2002708 -25.986968 -12.03419
[5,] -15.829041  -5.8027936 -25.947610  26.12207
[6,]  23.008565  34.1792188  71.192981 -13.35848

[[3]]
             V1          V2         V3        V4
[1,] -25.824427 -38.1319442 -15.260574  11.32532
[2,]  -5.317994 -66.8804838  -3.754295  40.01791
[3,]  -3.206987  -0.4199584  31.328836  11.47539
[4,] -21.786821  36.2002708 -25.986968 -12.03419
[5,] -15.829041  -5.8027936 -25.947610  26.12207
[6,]  23.008565  34.1792188  71.192981 -13.35848

On Linux we might use parallel::mclapply:

parallel::mclapply(in_Files, process_chunks, mc.cores=parallel::detectCores() - 1)

Enhanced Alternative

No need to specify total lines; a flexible function (FX) is applied per chunk, metadata lines (skip) can be skipped, and a header is supported. The shell command (unz) is customizable for any decompression tool. matrix calculations are supported by default, and a warning is issued if the last chunk is smaller than expected.

process_chunks2 <- \(x, FX, csz=1e3, skip=0L, header=FALSE, matrix=TRUE, 
                     unz='gunzip -c', warn=TRUE, ...) {
  unix <- identical(.Platform$OS.type, "unix")
  xq <- shQuote(x, if (!unix) 'cmd' else 'sh')
  con <- pipe(sprintf("%s %s", unz, xq), open="r")  ## open pipe
  on.exit(try(close(con), silent=TRUE))  ## ensure pipe is closed gracefully on exit
  res_list <- list()
  i <- 1
  if (skip > 0L) {
    readLines(con, n=skip)
  }
  if (header) {
   hd <- colnames(data.table::fread(text=readLines(con, n=1)))
  }
  repeat {
    lins <- readLines(con, n=csz)
    if (length(lins) == 0) break
    ch <- data.table::fread(text=lins)
    if (matrix) {
      ch <- as.matrix(ch)
    }
    if (warn && (nr <- nrow(ch)) < csz) {
      warning(sprintf("Final chunk short: %d < %d", nr, csz))
    }
    res_list[[i]] <- FX(ch, ...)  ## process chunk
    i <- i + 1
  }
  out <- do.call(rbind, res_list)  ## rbind result
  if (header) {
    `colnames<-`(out, hd)
  } else{
    `colnames<-`(out, NULL)
  }
}

> process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=6, header=FALSE) |> head(2)
             [,1]        [,2]         [,3]       [,4]
[1,] -0.025824427 -0.03813194 -0.015260574 0.01132532
[2,] -0.005317994 -0.06688048 -0.003754295 0.04001791
> process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=5, header=TRUE) |> head(2)
               A1          A2           A3         A4
[1,] -0.025824427 -0.03813194 -0.015260574 0.01132532
[2,] -0.005317994 -0.06688048 -0.003754295 0.04001791

Example where total rows are not divisible by chunk size (e.g., m <- 1e5 - 1 in Data, infra):

> process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=6, header=FALSE) |> head(2)
             [,1]        [,2]        [,3]       [,4]
[1,] -0.025824427 -0.03763184 -0.01190839 0.01348543
[2,] -0.005317994 -0.06963092 -0.00367911 0.03837964
Warning message:
In process_chunks2(x = "bar.tsv.gz", FX = matrixStats::colMeans2,  :
  Final chunk short: 999 < 1000

Data:

(For Linux. Eight files will be created in current directory.)

m <- 1e5; n <- 4
set.seed(42)
mat <- matrix(rnorm(m*n), m, n)
mat |> 
  write.table('foo.tsv', row.names=FALSE, col.names=FALSE, sep='\t')
system('pigz -p 7 -f foo.tsv')
system('for i in 1 2 3; do cp foo.tsv.gz foo${i}.tsv.gz; done')

mat |> 
  `colnames<-`(paste0('A', seq_len(n))) |> 
  data.table::fwrite('bar.tmp', row.names=FALSE, col.names=TRUE, sep='\t')
writeLines(c(
  "# File:       bar.tsv.gz",
  "# Created:    2025-04-06",
  "# Rows:       100000 (approx.)",
  "# Delimiter:  tab",
  "# Generator:  R/data.table::fwrite()"
), "meta.tmp")
system("cat meta.txt bar.tmp > bar.tsv")
file.remove("meta.tmp", "bar.tmp")
system('pigz -p 7 -f bar.tsv')
system('for i in 1 2 3; do cp bar.tsv.gz bar${i}.tsv.gz; done')
like image 119
jay.sf Avatar answered Nov 19 '25 23:11

jay.sf



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!