-
-
Save retrography/359e0cc56d2cf1acd161b5645bc801a8 to your computer and use it in GitHub Desktop.
Parallelize RDS compression/decompression to improve serialization performance in R
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The functions below use parallelized versions of gzip, xz, and bzip2 to | |
# improve compression/decompression performance of RDS serialization in R. | |
# Each function searches for the appropriate program (based on the required | |
# compression format) and if found, offloads the compression handling to the | |
# external program and therefore leaves R free to do the data import/export. | |
# The two main functions (saveRDS and readRDS) mask R's native read and write | |
# functions. The functions have been only tested on macOS, but they must work | |
# on any Linux/Unix. | |
# | |
# Requires the following packages: pxz, pbzip2, and pigz. | |
# | |
# Run the following line at the command prompt before using the functions. | |
# | |
# brew install pigz pbzip2 pigz | |
# | |
#! Todo: Use saveRDS's original structure from base (create new gzfile, etc. functions instead of creating specialized saveRDS functions) | |
#! Todo: Rename loadRDS and writeRDS to something appropriate | |
#! Note: Tested on Ubuntu and it all works, but pxz doesn't compile on macOS for now while pbzip2 hangs up on macOS. No idea about Windows for now. | |
library(parallel) | |
cmdAvail <- function(cmd) as.logical(nchar(Sys.which(cmd))) | |
writeRDS <- function(object, con) { | |
tryCatch({ | |
base::saveRDS( | |
object, | |
file = con | |
) | |
}, warning = function(w) { | |
print(paste("WARNING: ", w)) | |
}, error = function(e) { | |
print(paste("ERROR: ", e)) | |
}, finally = { | |
close(con) | |
}) | |
} | |
loadRDS <- function(con) { | |
tryCatch({ | |
base::readRDS( | |
file = con | |
) | |
}, warning = function(w) { | |
print(paste("WARNING: ", w)) | |
}, error = function(e) { | |
print(paste("ERROR: ", e)) | |
}, finally = { | |
close(con) | |
}) | |
} | |
saveRDS.xz <- | |
function(object, | |
file, | |
threads = parallel::detectCores(), | |
compression_level = 6) { | |
if (cmdAvail("pxz")) { | |
writeRDS( | |
object, | |
pipe( | |
paste0( | |
"pxz -c -k -T", | |
threads, | |
" -", | |
compression_level, | |
" > ", | |
file | |
), | |
"wb" | |
) | |
) | |
} else { | |
base::saveRDS( | |
object, | |
file = file, | |
compress = "xz" | |
) | |
} | |
} | |
readRDS.xz <- | |
function(file, | |
threads = parallel::detectCores()) { | |
if (cmdAvail("pxz")) { | |
object <- | |
loadRDS( | |
pipe( | |
paste0( | |
"pxz -d -k -c -T", | |
threads, | |
" ", | |
file | |
) | |
) | |
) | |
} else { | |
object <- | |
base::readRDS( | |
file | |
) | |
} | |
return(object) | |
} | |
saveRDS.gz <- | |
function(object, | |
file, | |
threads = parallel::detectCores(), | |
compression_level = 6) { | |
if (cmdAvail("pigz")) { | |
writeRDS( | |
object, | |
pipe( | |
paste0( | |
"pigz -c -k -p", | |
threads, | |
" -", | |
compression_level, | |
" > ", | |
file | |
), | |
"wb" | |
) | |
) | |
} else { | |
base::saveRDS( | |
object, | |
file = file, | |
compress = "gzip" | |
) | |
} | |
} | |
readRDS.gz <- | |
function(file, | |
threads = parallel::detectCores()) { | |
if (cmdAvail("pigz")) { | |
object <- | |
loadRDS( | |
pipe( | |
paste0( | |
"pigz -d -k -c -p", | |
threads, | |
" ", | |
file | |
) | |
) | |
) | |
} else { | |
object <- | |
base::readRDS( | |
file | |
) | |
} | |
return(object) | |
} | |
saveRDS.bz2 <- | |
function(object, | |
file, | |
threads = parallel::detectCores(), | |
compression_level = 9) { | |
if (cmdAvail("pbzip2")) { | |
writeRDS( | |
object, | |
pipe( | |
paste0( | |
"pbzip2 -c -k -p", | |
threads, | |
" -", | |
compression_level, | |
" > ", | |
file | |
), | |
"wb" | |
) | |
) | |
} else { | |
base::saveRDS( | |
object, | |
file = file, | |
compress = "bzip2" | |
) | |
} | |
} | |
readRDS.bz2 <- | |
function(file, | |
threads = parallel::detectCores()) { | |
if (cmdAvail("pbzip2")) { | |
object <- | |
loadRDS( | |
pipe( | |
paste0( | |
"pbzip2 -d -k -c -p", | |
threads, | |
" ", | |
file | |
) | |
) | |
) | |
} else { | |
object <- | |
base::readRDS( | |
file | |
) | |
} | |
return(object) | |
} | |
readRDS <- | |
function(file, | |
threads = parallel::detectCores()) { | |
if (!file.exists(file)) { | |
stop( | |
paste0( | |
file, | |
" does not exist!" | |
) | |
) | |
} | |
fileDetails <- | |
system2( | |
"file", | |
args = file, | |
stdout = TRUE | |
) | |
selector <- | |
sapply( | |
c("gzip", "XZ", "bzip2"), | |
function (x) {grepl(x, fileDetails)} | |
) | |
format <- | |
names(selector)[selector] | |
if (length(format) == 0) format <- "none" | |
if (format == "gzip") { | |
object <- readRDS.gz(file, threads = threads) | |
} else if (format == "XZ") { | |
object <- readRDS.xz(file, threads = threads) | |
} else if (format == "bzip2") { | |
object <- readRDS.bz2(file, threads = threads) | |
} else { | |
object <- force(base::readRDS(file)) | |
} | |
return(object) | |
} | |
saveRDS <- | |
function(object, | |
file = "", | |
compress = TRUE) { | |
if (compress %in% c(TRUE, "gz", "gzip")) { | |
saveRDS.gz(object, file) | |
} else if (compress %in% c("bzip", "bzip2", "bz", "bz2")) { | |
saveRDS.bz2(object, file) | |
} else if (compress %in% c("xz", "7zip", "7z")) { | |
saveRDS.xz(object, file) | |
} else if (compress == FALSE) { | |
base::saveRDS(object, file) | |
} else { | |
stop(paste0(compress, " is not a recognized compression method!")) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment