Last active
January 29, 2021 04:39
-
-
Save JBGruber/dee4c44e7d38d537426f57ba1e4f84ab to your computer and use it in GitHub Desktop.
Recovers damaged Twitter stream data (JSON file from rtweet) into parsed data frame.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#' Recovers Twitter damaged stream data (JSON file) into parsed data frame. | |
#' | |
#' @param path Character, name of JSON file with data collected by | |
#' \code{\link{stream_tweets}}. | |
#' @param dir Character, name of a directory where intermediate files are | |
#' stored. | |
#' @param verbose Logical, should progress be displayed? | |
#' | |
#' @family stream tweets | |
recover_stream <- function(path, dir = NULL, verbose = TRUE) { | |
# read file and split to tweets | |
lines <- readChar(path, file.info(path)$size, useBytes = TRUE) | |
tweets <- stringi::stri_split_fixed(lines, "\n{")[[1]] | |
tweets[-1] <- paste0("{", tweets[-1]) | |
tweets <- tweets[!(tweets == "" | tweets == "{")] | |
# remove misbehaving characters | |
tweets <- gsub("\r", "", tweets, fixed = TRUE) | |
tweets <- gsub("\n", "", tweets, fixed = TRUE) | |
# write tweets to disk and try to read them in individually | |
if (is.null(dir)) { | |
dir <- paste0(tempdir(), "/tweets/") | |
dir.create(dir, showWarnings = FALSE) | |
} | |
if (verbose) { | |
pb <- progress::progress_bar$new( | |
format = "Processing tweets [:bar] :percent, :eta remaining", | |
total = length(tweets), clear = FALSE | |
) | |
pb$tick(0) | |
} | |
tweets_l <- lapply(tweets, function(t) { | |
pb$tick() | |
id <- unlist(stringi::stri_extract_first_regex(t, "(?<=id\":)\\d+(?=,)"))[1] | |
f <- paste0(dir, id, ".json") | |
writeLines(t, f, useBytes = TRUE) | |
out <- tryCatch(rtweet::parse_stream(f), | |
error = function(e) {}) | |
if ("tbl_df" %in% class(out)) { | |
return(out) | |
} else { | |
return(id) | |
} | |
}) | |
# test which ones failed | |
test <- vapply(tweets_l, is.character, FUN.VALUE = logical(1L)) | |
bad_files <- unlist(tweets_l[test]) | |
# Let user decide what to do | |
if (length(bad_files) > 0) { | |
message("There were ", length(bad_files), | |
" tweets with problems. Should they be copied to your working directory?") | |
sel <- menu(c("no", "yes", "copy a list with status_ids")) | |
if (sel == 2) { | |
dir.create(paste0(getwd(), "/broken_tweets/"), showWarnings = FALSE) | |
file.copy( | |
from = paste0(dir, bad_files, ".json"), | |
to = paste0(getwd(), "/broken_tweets/", bad_files, ".json") | |
) | |
} else if (sel == 3) { | |
writeLines(bad_files, "broken_tweets.txt") | |
} | |
} | |
# clean up | |
unlink(dir, recursive = TRUE) | |
# return good tweets | |
return(dplyr::bind_rows(tweets_l[!test])) | |
} |
The slow part is basically the lapply
loop. You could replace this with, for example, with pbapply::pblapply
:
recover_stream <- function(path, dir = NULL, verbose = TRUE, cores = 1) {
# read file and split to tweets
lines <- readChar(path, file.info(path)$size, useBytes = TRUE)
tweets <- stringi::stri_split_fixed(lines, "\n{")[[1]]
tweets[-1] <- paste0("{", tweets[-1])
tweets <- tweets[!(tweets == "" | tweets == "{")]
# remove misbehaving characters
tweets <- gsub("\r", "", tweets, fixed = TRUE)
tweets <- gsub("\n", "", tweets, fixed = TRUE)
# write tweets to disk and try to read them in individually
if (is.null(dir)) {
dir <- paste0(tempdir(), "/tweets/")
dir.create(dir, showWarnings = FALSE)
}
tweets_l <- pbapply::pblapply(tweets, function(t) {
id <- unlist(stringi::stri_extract_first_regex(t, "(?<=id\":)\\d+(?=,)"))[1]
f <- paste0(dir, id, ".json")
writeLines(t, f, useBytes = TRUE)
out <- tryCatch(rtweet::parse_stream(f),
error = function(e) {})
if ("tbl_df" %in% class(out)) {
return(out)
} else {
return(id)
}
}, cl = cores)
# test which ones failed
test <- vapply(tweets_l, is.character, FUN.VALUE = logical(1L))
bad_files <- unlist(tweets_l[test])
# Let user decide what to do
if (length(bad_files) > 0) {
message("There were ", length(bad_files),
" tweets with problems. Should they be copied to your working directory?")
sel <- menu(c("no", "yes", "copy a list with status_ids"))
if (sel == 2) {
dir.create(paste0(getwd(), "/broken_tweets/"), showWarnings = FALSE)
file.copy(
from = paste0(dir, bad_files, ".json"),
to = paste0(getwd(), "/broken_tweets/", bad_files, ".json")
)
} else if (sel == 3) {
writeLines(bad_files, "broken_tweets.txt")
}
}
# clean up
unlink(dir, recursive = TRUE)
# return good tweets
return(dplyr::bind_rows(tweets_l[!test]))
}
Use it like this:
recover_stream(file, cores = 3)
A disclaimer though: I tried to further develop this and I couldn't get it to work consistently. Often it works and recovers most or even all tweets. But sometimes it doesn't and I don't know why. Since my Twitter project came to an end, I eventually gave up. The respective issue is still open: ropensci-archive/rtweet#354
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Is there anyway to parallelize this and make it faster?