Last active
December 25, 2018 22:44
-
-
Save tgirke/6123d606e2f84989c8e55f8a176e5957 to your computer and use it in GitHub Desktop.
Streaming through large tabular files in R
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
################################################ | |
## Streaming through large tabular files in R ## | |
################################################ | |
## Author: Thomas Girke | |
## Last update: 21-Dec-2018 | |
## Utility: the function 'processFileInLineChunks' streams through a file in | |
## batches of lines and applies to each imported line batch a function assigned | |
## to the 'myFct' argument. The number of lines processed in each iteration can | |
## be defined under the 'n_rows' argument. As importer functions, both 'fread' | |
## and 'read_*' from the 'data.table' and 'readr' packages are supported, | |
## respectively. The results can be returned as list, vector and tabular | |
## objects. Depending on the chosen import function the latter will be a | |
## 'tibble' or a 'data.table'. Alternatively, the processing results of each | |
## imported file batch can be appended to a file after each iteration. This way | |
## the memory consumption can be minimized since no larger R objects will be | |
## generated. For testing and debugging one can take advantage of the 'nchunks' | |
## option that allows to limit the number of chunks imported and processed. | |
## The function can also be used to simply import the data of tabular files | |
## without further processing by assigning to 'myFct <- function(x) x'. | |
## Source function from R | |
# library(RCurl) | |
# source(textConnection(getURL("https://gist.githubusercontent.com/tgirke/6123d606e2f84989c8e55f8a176e5957/raw/processFileInChunks.R"))) | |
## Updating this gist from command-line (only relevant for maintainer) | |
# git clone https://gist.github.com/6123d606e2f84989c8e55f8a176e5957.git | |
## -> make edits here | |
# git commit -am "some message"; git push -u origin master | |
processFileInChunks <- function(filepath, read_fct="read_tsv", | |
myFct=colSums, writetofile=NULL, n_rows=10, | |
skip=0, returnformat="list", nchunks=Inf, | |
verbose=TRUE) { | |
## Input validity checks | |
if(any(c(class(filepath) != "character", length(filepath)!=1))) stop("Argument 'filepath' needs to be character vector of length one.") | |
if(!is.function(myFct)) stop("Argument 'myFct' needs to be a function.") | |
if(skip!=0) warning("Argument should be 0 in most cases.") | |
if(!returnformat %in% c("list", "tabular", "vector")) stop("Argument 'returnformat' has to be one of 'list', 'tabular' or 'vector'.") | |
## Storage container | |
bucket <- list() | |
loopcounter <- 2 | |
## Read first row chunk of file via import method defined under 'read_fct' | |
if(read_fct=="fread") { | |
require(data.table) | |
suppressMessages(slice <- fread(filepath, nrows=n_rows, skip=skip, header=TRUE)) | |
} else if(read_fct %in% c("read_tsv", "read_delim", "read_csv", "read_csv2")) { | |
require(readr); require(dplyr) | |
suppressMessages(slice <- eval(parse(text=read_fct))(filepath, n_max=n_rows, skip=skip, col_names=TRUE)) | |
} else { | |
stop("Value assigned to read_fct can only be 'fread', 'read_tsv', 'read_delim', 'read_csv', 'read_csv2'.") | |
} | |
## To maintain original column titles, if possible | |
if(!is.null(dim(slice))) mycolnames <- colnames(slice) | |
## Apply stats function provided by 'myFct' argument | |
result <- myFct(as.matrix(slice)) | |
bucket <- c(bucket, list(result)) | |
## Create names for aggregated line-wise results | |
aggrname <- paste0("rows:", as.character(skip+1), "-", as.character(skip+nrow(slice))) | |
names(bucket)[length(bucket)] <- aggrname | |
if(verbose==TRUE) print(paste("Processed lines:", aggrname)) | |
## Process subsequent row chunks in loop | |
while(n_rows == nrow(slice)) { | |
skip <- skip + n_rows | |
if(read_fct=="fread") { | |
suppressMessages(slice <- fread(filepath, nrows=n_rows, skip=skip+1, header=FALSE)) | |
} else if(read_fct %in% c("read_tsv", "read_delim", "read_csv", "read_csv2")) { | |
suppressMessages(slice <- eval(parse(text=read_fct))(filepath, n_max=n_rows, skip=skip+1, col_names=FALSE)) | |
} else { | |
stop("Value assigned to read_fct can only be 'fread' or read_tsv") | |
} | |
## To maintain original column titles, if possible | |
if(!is.null(dim(slice))) colnames(slice) <- mycolnames[1:ncol(slice)] | |
## Apply stats function provided by 'myFct' argument | |
result <- myFct(as.matrix(slice)) | |
bucket <- c(bucket, list(result)) | |
## Create names for aggregated line-wise results | |
aggrname <- paste0("rows:", as.character(skip+1), "-", as.character(skip+nrow(slice))) | |
names(bucket)[length(bucket)] <- aggrname | |
## Status statement | |
if(verbose==TRUE) print(paste("Processed lines:", aggrname)) | |
## Write results to file in each iteration | |
if(!is.null(writetofile)) { | |
if(length(bucket)==2) { | |
## vector data | |
if(is.null(dim(bucket[[1]]))) { | |
bucket[[1]] <- bind_cols(RowIndex=rep(names(bucket[1]), length(bucket[[1]])), data.frame(Stats=bucket[[1]])) | |
write_tsv(as.data.frame(bucket[[1]]), writetofile) | |
bucket[[2]] <- bind_cols(RowIndex=rep(names(bucket[2]), length(bucket[[2]])), data.frame(Stats=bucket[[2]])) | |
write_tsv(as.data.frame(bucket[[2]]), writetofile, append=TRUE) | |
## matrix data | |
} else { | |
colnames(bucket[[1]]) <- mycolnames | |
bucket[[1]] <- bind_cols(RowIndex=rep(names(bucket[1]), nrow(bucket[[1]])), data.frame(bucket[[1]])) | |
write_tsv(as.data.frame(bucket[[1]]), writetofile) | |
colnames(bucket[[2]]) <- mycolnames | |
bucket[[2]] <- bind_cols(RowIndex=rep(names(bucket[2]), nrow(bucket[[2]])), data.frame(bucket[[2]])) | |
write_tsv(as.data.frame(bucket[[2]]), writetofile, append=TRUE) | |
} | |
} | |
if(length(bucket)==1) { | |
## vector data | |
if(is.null(dim(bucket[[1]]))) { | |
bucket[[1]] <- bind_cols(RowIndex=rep(names(bucket[1]), length(bucket[[1]])), data.frame(Stats=bucket[[1]])) | |
write_tsv(as.data.frame(bucket[[1]]), writetofile, append=TRUE) | |
## matrix data | |
} else { | |
colnames(bucket[[1]]) <- mycolnames | |
bucket[[1]] <- bind_cols(RowIndex=rep(names(bucket[1]), nrow(bucket[[1]])), data.frame(bucket[[1]])) | |
write_tsv(as.data.frame(bucket[[1]]), writetofile, append=TRUE) | |
} | |
} | |
bucket <- list() | |
} | |
## Stop streaming process after specific number of iterations | |
## specified under 'nchunks'. This is useful for testing! | |
if(loopcounter >= nchunks) { break } | |
loopcounter <- loopcounter + 1 | |
} | |
## Return results as R object instead of file | |
if(is.null(writetofile)) { | |
## As a list | |
if(returnformat=="list") { | |
return(bucket) | |
} else if(returnformat=="tabular") { | |
if(is.null(dim(bucket[[1]]))) { | |
tmp <- bind_cols(RowIndex=rep(names(bucket), sapply(bucket, length)), as_tibble(unlist(bucket))) | |
} else { | |
tmp <- do.call("rbind", bucket) | |
if(read_fct %in% c("read_tsv", "read_delim", "read_csv", "read_csv2")) { | |
if("tbl" %in% class(tmp)) tmp <- bind_cols(RowIndex=names(bucket), tmp) | |
if(class(tmp)=="matrix") tmp <- bind_cols(RowIndex=names(bucket), as_tibble(tmp)) | |
} | |
} | |
if(read_fct=="fread") { | |
if(is.matrix(tmp)) tmp <- cbind(RowIndex=names(bucket), as.data.table(tmp)) | |
if(is.tbl(tmp)) tmp <- as.data.table(tmp) | |
} | |
return(tmp) | |
} else if(returnformat=="vector") { | |
return(unlist(bucket)) | |
} else { | |
stop("Unsupported value assigned to 'returnformat'.") | |
} | |
} | |
} | |
## Usage: | |
## Create test file | |
library(readr) | |
df <- data.frame(C1=rnorm(10001), C2=rnorm(10001), C3=rnorm(10001), C4=rnorm(10001)) | |
write_tsv(df, "testfile") | |
## Some functions to assign to 'myFct' | |
myFct <- function(x) tail(x, 1) | |
myFct <- function(x) x # for reading in data without processing | |
myFct <- rowSums | |
myFct <- function(x) t(as.matrix(colSums(x))) | |
myFct <- function(x) sapply(seq_along(x[,1]), function(i) t.test(x[i,1:2], x[i,3:4])$p.value) | |
myFct <- max | |
## Run processFileInChunks with list output | |
(r <- processFileInChunks(filepath="testfile", read_fct="read_tsv", | |
myFct=myFct, n_rows=1000, | |
returnformat="list", verbose=TRUE)) | |
do.call("rbind", r) | |
## Run processFileInChunks with tibble/data.table output | |
(r <- processFileInChunks(filepath="testfile", read_fct="read_tsv", | |
myFct=myFct, n_rows=1000, | |
returnformat="tabular", verbose=TRUE)) | |
## Run processFileInChunks with with vector output | |
(r <- processFileInChunks(filepath="testfile", read_fct="read_tsv", | |
myFct=myFct, n_rows=1000, | |
returnformat="vector", verbose=TRUE)) | |
## Run processFileInChunks with file output | |
processFileInChunks(filepath="testfile", read_fct="fread", nchunks=Inf, | |
myFct=myFct, writetofile="zzz", n_rows=1000, | |
returnformat="tabular", verbose=TRUE) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment