#' Find the latest modified file with a given S3 prefix.
#'
#' @note This helper function is used when no input key
#' is specified to the \code{\link{s3read}} function to fetch the
#' latest modified file in that bucket. If the bucket has
#' many files, this can be very slow.
#' @param path character. The S3 prefix to search for the
#' latest uploaded key.
grab_latest_file_in_s3_dir <- function(path = s3path()) {
ensure_s3cmd_present()
paths <- system(paste('s3cmd ls ', path, '*', sep = ''), intern = TRUE)
times <- as.POSIXct(substring(paths, 1, 16))
latest <- which(max(times) == times)
regex <- paste(str_replace(path, '\\/', '\\\\/'), '(.+)', sep = '')
results <- gregexpr(regex, paths, perl = TRUE)
substring(regmatches(paths, results)[[latest[1]]], 1 + nchar(path))
}
# Copied from https://github.com/rstudio/packrat/blob/master/R/platform.R
is.windows <- function() {
Sys.info()["sysname"] == "Windows"
}
is.mac <- function() {
Sys.info()["sysname"] == "Darwin"
}
is.linux <- function() {
Sys.info()["sysname"] == "Linux"
}
s3.get <- function (bucket, bucket.location = "US", verbose = FALSE, debug = FALSE) {
AWS.tools:::check.bucket(bucket)
# Helper function for fetching data from s3
fetch <- function(){
x.serialized <- tempfile()
on.exit(unlink(x.serialized), add = TRUE)
if (file.exists(x.serialized)) unlink(x.serialized, force = TRUE)
s3.cmd <- paste("s3cmd get", bucket, x.serialized, paste("--bucket-location",
bucket.location), ifelse(verbose, "--verbose --progress",
"--no-progress"), ifelse(debug, "--debug", ""))
system(s3.cmd)
ans <- readRDS(x.serialized)
ans
}
# Check for the bucket in the cache
# If it does not exist, create and return its entry
if (!s3LRUcache$exists(bucket)) {
ans <- fetch()
s3LRUcache$set(bucket, ans)
} else{
# Check time on s3LRUcache's copy
last_cached <- s3LRUcache$last_accessed(bucket) # assumes a POSIXct object
# Check time on s3 remote's copy
s3.cmd <- paste("s3cmd ls ", bucket, "| awk '{print $1\" \"$2}' ")
last_updated <- as.POSIXct(system(s3.cmd, intern = TRUE), tz="GMT")
# Update the cache if remote is newer
if (last_updated > last_cached) {
ans <- fetch()
s3LRUcache$set(bucket, ans)
} else {
ans <- s3LRUcache$get(bucket)
}
}
ans
}
# Overwrite AWS.tools::s3.put because we would like to check md5.
s3.put <- function (x, bucket, bucket.location = "US", verbose = FALSE,
debug = FALSE, encrypt = FALSE) {
AWS.tools:::check.bucket(bucket)
x.serialized <- tempfile()
saveRDS(x, x.serialized)
s3.cmd <- paste("s3cmd put", x.serialized, bucket, ifelse(encrypt,
"--encrypt", ""), paste("--bucket-location", bucket.location),
ifelse(verbose, "--verbose --progress", "--no-progress"), ifelse(debug,
"--debug", ""), '--check-md5')
res <- system(s3.cmd, intern = TRUE)
unlink(x.serialized)
res
}
#' A caching layer around s3mpi calls.
#'
#' Fetching large files from the S3 MPI can be expensive when performed
#' multiple times. This methods allows one to add a caching layer
#' around S3 fetching. The user should specify the configuration option
#' \code{options(s3mpi.cache = 'some/dir')}. The recommended cache
#' directory (where files will be stored) is \code{"~/.s3cache"}.
#'
#' @param s3key character. The full S3 key to attempt to read or write
#' to the cache.
#' @param value ANY. The R object to save in the cache. If missing,
#' a cache read will be performed instead.
s3cache <- function(s3key, value) {
if (!cache_enabled())
stop("Cannot use s3mpi::s3cache until you set options(s3mpi.cache) ",
"to a directory in which to place cache contents.")
dir.create(d <- cache_directory(), FALSE, TRUE)
dir.create(file.path(d, 'info'), FALSE, TRUE)
dir.create(file.path(d, 'data'), FALSE, TRUE)
if (missing(value)) fetch_from_cache(s3key, d)
else save_to_cache(s3key, value, d)
}
#' Helper function for fetching a file from a cache directory.
#'
#' This function will also test to determine whether the file has been
#' modified on S3 since the last cache save. If the file has never been
#' cached or the cache is invalidated, it will return \code{s3mpi::not_cached}.
#'
#' @param key character. The key under which the cache entry is stored.
#' @param cache_dir character. The cache directory. The default is
#' \code{cache_directory()}.
#' @return the cached object if the cache has not invalidated. Otherwise,
#' return \code{s3mpi::not_cached}.
fetch_from_cache <- function(key, cache_dir) {
cache_key <- digest::digest(key)
cache_file <- function(dir) file.path(cache_dir, dir, cache_key)
if (!file.exists(cache_file('data'))) return(not_cached)
if (!file.exists(cache_file('info'))) {
# Somehow the cache became corrupt: data exists without accompanying
# meta-data. In this case, simply wipe the cache.
file.remove(cache_file('data'))
return(not_cached)
}
info <- readRDS(cache_file('info'))
# Check if cache is invalid.
connected <- has_internet()
if (!connected) {
warning("Your network connection seems to be unavailable. s3mpi will ",
"use the latest cache entries instead of pulling from S3.",
call. = FALSE, immediate. = FALSE)
}
if (connected && !identical(info$mtime, last_modified(key))) {
not_cached
} else {
readRDS(cache_file('data'))
}
}
#' Helper function for saving a file to a cache directory.
#'
#' @param key character. The key under which the cache entry is stored.
#' @param value ANY. The R object to save in the cache.
#' @param cache_dir character. The cache directory. The default is
#' \code{cache_directory()}.
save_to_cache <- function(key, value, cache_dir = cache_directory()) {
require(digest)
cache_key <- digest::digest(key)
cache_file <- function(dir) file.path(cache_dir, dir, cache_key)
saveRDS(value, cache_file('data'))
info <- list(mtime = last_modified(key), key = key)
saveRDS(info, cache_file('info'))
invisible(NULL)
}
#' Determine the last modified time of an S3 object.
#'
#' @param key character. The s3 key of the object.
#' @return the last modified time or \code{NULL} if it does not exist on S3.
last_modified <- function(key) {
if (!has_internet()) { return(as.POSIXct(as.Date("2000-01-01"))) }
s3result <- system(paste0('s3cmd ls ', key), intern = TRUE)[1]
if (is.character(s3result) && !is.na(s3result) && nzchar(s3result)) {
strptime(substring(s3result, 1, 16), '%Y-%m-%d %H:%M')
}
}
not_cached <- local({ tmp <- list(); class(tmp) <- 'not_cached'; tmp })
is.not_cached <- function(x) identical(x, not_cached)
#' Determine whether object exists on S3
#'
#' Test whether or not the given object exists at the
#' give S3 path
#'
#' @param name string. Name of file to look for
#' @param path string. Path to file. If missing, the entire s3 path must be provided in name.
#' @export
s3exists <- function(name, .path = s3path(), ...) {
if (is.null(name)) return(FALSE) # issue #22
s3key <- paste(.path, name, sep = '')
s3key <- gsub('/$', '', s3key) # strip terminal /
if (!grepl('^s3://', s3key)) stop("s3 paths must begin with \"s3://\"")
s3cmd <- paste('s3cmd ls', s3key)
results <- system(s3cmd, intern = TRUE)
sum(grepl(paste(s3key, '(/[0-9A-Za-z]+)*/?$', sep = ''), results)) > 0
}
#' s3mpi
#'
#' @name s3mpi
#' @docType package
#' @import AWS.tools crayon cacher digest stringr
NULL
s3normalize <- function(object, read = TRUE) {
if (object.size(object) == 0) {
warning("Size-0 object is being normalized", call. = TRUE)
return(NULL)
}
if (read)
(attr(object, "s3mpi.serialize")$read %||% identity)(object)
else
(attr(object, "s3mpi.serialize")$write %||% identity)(object)
}
s3path <- function() {
if(is.null(path <- getOption('s3mpi.path'))) {
stop("Please set your s3 path using ",
"options(s3mpi.path = 's3://your_bucket/your/path/'). ",
"This is where all of your uploaded R objects will be stored.")
}
path
}
#' Read an R object in S3 by key
#'
#' Any type of object that can be serialized as an RDS file
#' is capable of being stored using this interface.
#'
#' @param name character. The key to grab from S3.
#' @param .path. The location of your S3 bucket.
#' @param cache logical. If true, use the local s3cache if available. If false, do not use cache.
#'
#' @export
#' @examples
#' \dontrun{
#' s3store(c(1,2,3), 'test123')
#' print(s3read('test123'))
#' # [1] 1 2 3
#' }
s3read <- function(name = NULL, .path = s3path(), cache = TRUE, ...) {
if (is.null(name)) name <- grab_latest_file_in_s3_dir(.path)
if (substr(.path, nchar(.path), nchar(.path)) != "/") { .path <- paste0(.path, "/") }
s3key <- paste(.path, name, sep = '')
if (!isTRUE(cache) || is.null(getOption('s3mpi.cache'))) {
value <- s3.get(s3key, ...)
} else if (is.not_cached(value <- s3cache(s3key))) {
value <- s3.get(s3key, ...)
s3cache(s3key, value)
}
s3normalize(value, TRUE)
}
#' Store an R object in S3 by key
#'
#' Any type of object that can be serialized as an RDS file
#' is capable of being retrieved using this interface.
#'
#' @export
#' @param obj ANY. An R object to save to S3.
#' @param name character. The S3 key to save to.
#' @param .path character. The S3 prefix, e.g., "s3://yourbucket/some/path/".
#' @param safe logical. Whether or not to overwrite existing fails by
#' default or error if they exist.
#' @param ... additional arguments to \code{s3mpi:::s3.put}.
#' @examples
#' \dontrun{
#' s3store(c(1,2,3), 'test123')
#' print(s3read('test123'))
#' # [1] 1 2 3
#' }#'
s3store <- function(obj, name = NULL, .path = s3path(), safe = FALSE, ...) {
if (is.null(name)) name <- deparse(substitute(obj))
s3key <- paste(.path, name, sep = '')
if (isTRUE(safe) && s3exists(name, .path = .path, ...)) {
# using cat prints to stdout as opposed to messages, so it can be seen from syberia::run_model()
cat("An object with name", name, "on path", .path,
"already exists. Use `safe = FALSE` to overwrite\n", sep = " ")
stop("-------------------------^")
}
obj4save <- s3normalize(obj, FALSE)
s3mpi:::s3.put(obj4save, s3key, ...)
if (!is.null(getOption('s3mpi.cache'))) s3cache(s3key, obj4save)
if (is.environment(obj4save)) s3normalize(obj4save) # Revert side effects
invisible(s3key)
}
#' @export
#' @rdname s3store
#' @note \code{s3put} is equivalent to \code{s3store} except that
#' it will fail by default if you try to overwrite an existing key.
s3put <- function(..., safe = TRUE) { s3store(..., safe = safe) }
A standard helper: if x
is null, y
will be returned instead.
`%||%` <- function(x, y) if (is.null(x)) y else x
We use the memoise package to ensure this check only gets run once in a given R session. This means a user will have to restart R if they install s3cmd during a session, but we are comfortable with that!
ensure_s3cmd_present <- memoise::memoise(function() {
check <- try(system("s3cmd --help", intern = TRUE), silent = TRUE)
if (is(check, "try-error")) {
It is always preferable to make life as easy as possible for the user! If they have the homebrew package manager, we give them the fastest installation instructions.
if (is.mac() && system2("which", "brew", stdout = FALSE)) {
stop("Please install the ", crayon::yellow("s3cmd"), " command-line ",
"utility using by running ", crayon::green("brew install s3cmd"),
" from your terminal and then configuring your S3 credentials ",
"using ", crayon::yellow("s3cmd --configure"), call. = FALSE)
} else {
Otherwise, manual it is!
stop("Please install s3cmd, the S3 command line utility: ",
"http://s3tools.org/kb/item14.htm\nand then setup your S3 ",
"credentials using ", crayon::yellow("s3cmd --configure"),
call. = FALSE)
}
}
})
A sexy least recently used cache using the cacher package.
s3LRUcache <- cacher::LRUcache(getOption("s3mpi.cache_size", 10))
cache_enabled <- function() {
!is.null(tmp <- cache_directory()) && nzchar(tmp)
}
cache_directory <- function() {
dir <- getOption('s3mpi.cache')
if (!is.null(dir) && !(is.character(dir) && length(dir) == 1 && !is.na(dir))) {
stop("Please set the ", sQuote("s3mpi.cache"), " to a character ",
"vector of length 1 giving a directory path.")
}
dir
}
has_internet <- local({
has_internet_flag <- NULL
function() {
if (!is.null(getOption('s3mpi.skip_connection_check'))) return(FALSE)
if (!is.null(has_internet_flag)) { return(has_internet_flag) }
has_internet_flag <<- suppressWarnings({
internet_check <- try(file('http://google.com', 'r'))
if (!is(internet_check, 'try-error') && is(internet_check, 'connection')) {
on.exit(close.connection(internet_check))
}
!(is(internet_check, 'try-error') &&
grepl('cannot open', internet_check$message))
})
}
})