grab_latest_file_in_s3_dir.r

              
            
              #' Find the latest modified file with a given S3 prefix.
#'
#' @note This helper function is used when no input key
#'    is specified to the \code{\link{s3read}} function to fetch the
#'    latest modified file in that bucket. If the bucket has
#'    many files, this can be very slow.
#' @param path character. The S3 prefix to search for the
#'    latest uploaded key.
grab_latest_file_in_s3_dir <- function(path = s3path()) {
  ensure_s3cmd_present()

  paths <- system(paste('s3cmd ls ', path, '*', sep = ''), intern = TRUE)
  times <- as.POSIXct(substring(paths, 1, 16))
  latest <- which(max(times) == times)
  regex <- paste(str_replace(path, '\\/', '\\\\/'), '(.+)', sep = '')
  results <- gregexpr(regex, paths, perl = TRUE)
  substring(regmatches(paths, results)[[latest[1]]], 1 + nchar(path))
}

            

platform.R

              
            
              # Copied from https://github.com/rstudio/packrat/blob/master/R/platform.R
is.windows <- function() {
  Sys.info()["sysname"] == "Windows"
}

is.mac <- function() {
  Sys.info()["sysname"] == "Darwin"
}

is.linux <- function() {
  Sys.info()["sysname"] == "Linux"
}

            

s3.get.R

              
            
              s3.get <- function (bucket, bucket.location = "US", verbose = FALSE, debug = FALSE) {
  AWS.tools:::check.bucket(bucket)

  # Helper function for fetching data from s3
  fetch <- function(){
    x.serialized <- tempfile()
    on.exit(unlink(x.serialized), add = TRUE)

    if (file.exists(x.serialized)) unlink(x.serialized, force = TRUE)
    s3.cmd <- paste("s3cmd get", bucket, x.serialized, paste("--bucket-location",
    bucket.location), ifelse(verbose, "--verbose --progress",
    "--no-progress"), ifelse(debug, "--debug", ""))
    system(s3.cmd)

    ans <- readRDS(x.serialized)
    ans
  }

  # Check for the bucket in the cache
  # If it does not exist, create and return its entry
  if (!s3LRUcache$exists(bucket)) {
    ans <- fetch()
    s3LRUcache$set(bucket, ans)
  } else{
    # Check time on s3LRUcache's copy
    last_cached <- s3LRUcache$last_accessed(bucket) # assumes a POSIXct object

    # Check time on s3 remote's copy
    s3.cmd <- paste("s3cmd ls ", bucket, "| awk '{print $1\" \"$2}' ")
    last_updated <- as.POSIXct(system(s3.cmd, intern = TRUE), tz="GMT")

    # Update the cache if remote is newer
    if (last_updated > last_cached) {
      ans <- fetch()
      s3LRUcache$set(bucket, ans)
    } else {
      ans <- s3LRUcache$get(bucket)
    }
  }
  ans
}
            

s3.put.R

              
            
              # Overwrite AWS.tools::s3.put because we would like to check md5.
s3.put <- function (x, bucket, bucket.location = "US", verbose = FALSE,
    debug = FALSE, encrypt = FALSE) {
    AWS.tools:::check.bucket(bucket)
    x.serialized <- tempfile()
    saveRDS(x, x.serialized)
    s3.cmd <- paste("s3cmd put", x.serialized, bucket, ifelse(encrypt,
        "--encrypt", ""), paste("--bucket-location", bucket.location),
        ifelse(verbose, "--verbose --progress", "--no-progress"), ifelse(debug,
            "--debug", ""), '--check-md5')
    res <- system(s3.cmd, intern = TRUE)
    unlink(x.serialized)
    res
}
            

s3cache.R

              
            
              #' A caching layer around s3mpi calls.
#'
#' Fetching large files from the S3 MPI can be expensive when performed
#' multiple times. This methods allows one to add a caching layer
#' around S3 fetching. The user should specify the configuration option
#' \code{options(s3mpi.cache = 'some/dir')}. The recommended cache
#' directory (where files will be stored) is \code{"~/.s3cache"}.
#'
#' @param s3key character. The full S3 key to attempt to read or write
#'    to the cache.
#' @param value ANY. The R object to save in the cache. If missing,
#'    a cache read will be performed instead.
s3cache <- function(s3key, value) {
  if (!cache_enabled())
    stop("Cannot use s3mpi::s3cache until you set options(s3mpi.cache) ",
         "to a directory in which to place cache contents.")

  dir.create(d <- cache_directory(), FALSE, TRUE)
  dir.create(file.path(d, 'info'), FALSE, TRUE)
  dir.create(file.path(d, 'data'), FALSE, TRUE)

  if (missing(value)) fetch_from_cache(s3key, d)
  else save_to_cache(s3key, value, d)
}

#' Helper function for fetching a file from a cache directory.
#'
#' This function will also test to determine whether the file has been
#' modified on S3 since the last cache save. If the file has never been
#' cached or the cache is invalidated, it will return \code{s3mpi::not_cached}.
#'
#' @param key character. The key under which the cache entry is stored.
#' @param cache_dir character. The cache directory. The default is 
#'    \code{cache_directory()}.
#' @return the cached object if the cache has not invalidated. Otherwise,
#'   return \code{s3mpi::not_cached}.
fetch_from_cache <- function(key, cache_dir) {
  cache_key <- digest::digest(key)
  cache_file <- function(dir) file.path(cache_dir, dir, cache_key)

  if (!file.exists(cache_file('data'))) return(not_cached)

  if (!file.exists(cache_file('info'))) {
    # Somehow the cache became corrupt: data exists without accompanying
    # meta-data. In this case, simply wipe the cache.
    file.remove(cache_file('data'))
    return(not_cached)
  }

  info <- readRDS(cache_file('info'))
  # Check if cache is invalid.
  connected <- has_internet()
  if (!connected) {
    warning("Your network connection seems to be unavailable. s3mpi will ",
            "use the latest cache entries instead of pulling from S3.",
            call. = FALSE, immediate. = FALSE)
  }

  if (connected && !identical(info$mtime, last_modified(key))) {
    not_cached
  } else {
    readRDS(cache_file('data'))
  }
}

#' Helper function for saving a file to a cache directory.
#'
#' @param key character. The key under which the cache entry is stored.
#' @param value ANY. The R object to save in the cache.
#' @param cache_dir character. The cache directory. The default is 
#'    \code{cache_directory()}.
save_to_cache <- function(key, value, cache_dir = cache_directory()) {
  require(digest)
  cache_key <- digest::digest(key)
  cache_file <- function(dir) file.path(cache_dir, dir, cache_key)

  saveRDS(value, cache_file('data'))
  info <- list(mtime = last_modified(key), key = key)
  saveRDS(info, cache_file('info'))
  invisible(NULL)
}

#' Determine the last modified time of an S3 object.
#'
#' @param key character. The s3 key of the object.
#' @return the last modified time or \code{NULL} if it does not exist on S3.
last_modified <- function(key) {
  if (!has_internet()) { return(as.POSIXct(as.Date("2000-01-01"))) }
  s3result <- system(paste0('s3cmd ls ', key), intern = TRUE)[1]
  if (is.character(s3result) && !is.na(s3result) && nzchar(s3result)) {
    strptime(substring(s3result, 1, 16), '%Y-%m-%d %H:%M')
  }
}

not_cached <- local({ tmp <- list(); class(tmp) <- 'not_cached'; tmp })
is.not_cached <- function(x) identical(x, not_cached)

            

s3exists.R

              
            
              #' Determine whether object exists on S3
#' 
#' Test whether or not the given object exists at the 
#' give S3 path
#'
#' @param name string. Name of file to look for
#' @param path string. Path to file.  If missing, the entire s3 path must be provided in name.

#' @export
s3exists <- function(name, .path = s3path(), ...) {
  if (is.null(name)) return(FALSE)  # issue #22
  s3key <- paste(.path, name, sep = '')
  s3key <- gsub('/$', '', s3key) # strip terminal /
  if (!grepl('^s3://', s3key)) stop("s3 paths must begin with \"s3://\"")
  s3cmd <- paste('s3cmd ls', s3key)
  results <- system(s3cmd, intern = TRUE)
  sum(grepl(paste(s3key, '(/[0-9A-Za-z]+)*/?$', sep = ''), results)) > 0
}


            

s3mpi-package.r

              
            
              #' s3mpi
#'
#' @name s3mpi
#' @docType package
#' @import AWS.tools crayon cacher digest stringr
NULL
            

s3normalize.R

              
            
              s3normalize <- function(object, read = TRUE) {
  if (object.size(object) == 0) {
    warning("Size-0 object is being normalized", call. = TRUE)
    return(NULL)
  }
  if (read)
    (attr(object, "s3mpi.serialize")$read %||% identity)(object)
  else
    (attr(object, "s3mpi.serialize")$write %||% identity)(object)
}
            

s3path.r

              
            
              s3path <- function() {
  if(is.null(path <- getOption('s3mpi.path'))) {
    stop("Please set your s3 path using ",
         "options(s3mpi.path = 's3://your_bucket/your/path/'). ",
         "This is where all of your uploaded R objects will be stored.")
  }
  path
}
            

s3read.r

              
            
              #' Read an R object in S3 by key
#' 
#' Any type of object that can be serialized as an RDS file
#' is capable of being stored using this interface.
#'
#' @param name character. The key to grab from S3.
#' @param .path. The location of your S3 bucket.
#' @param cache logical. If true, use the local s3cache if available.  If false, do not use cache.
#'
#' @export
#' @examples
#' \dontrun{
#' s3store(c(1,2,3), 'test123')
#' print(s3read('test123'))
#' # [1] 1 2 3
#' } 
s3read <- function(name = NULL, .path = s3path(), cache = TRUE, ...) { 
  if (is.null(name)) name <- grab_latest_file_in_s3_dir(.path)

  if (substr(.path, nchar(.path), nchar(.path)) != "/") { .path <- paste0(.path, "/") }
  s3key <- paste(.path, name, sep = '')

  if (!isTRUE(cache) || is.null(getOption('s3mpi.cache'))) {
    value <- s3.get(s3key, ...)
  } else if (is.not_cached(value <- s3cache(s3key))) {
    value <- s3.get(s3key, ...)
    s3cache(s3key, value)
  }
  s3normalize(value, TRUE)
}
            

s3store.r

              
            
              #' Store an R object in S3 by key
#'
#' Any type of object that can be serialized as an RDS file
#' is capable of being retrieved using this interface.
#'
#' @export
#' @param obj ANY. An R object to save to S3.
#' @param name character. The S3 key to save to.
#' @param .path character. The S3 prefix, e.g., "s3://yourbucket/some/path/".
#' @param safe logical. Whether or not to overwrite existing fails by
#'    default or error if they exist.
#' @param ... additional arguments to \code{s3mpi:::s3.put}.
#' @examples
#' \dontrun{
#' s3store(c(1,2,3), 'test123')
#' print(s3read('test123'))
#' # [1] 1 2 3
#' }#' 
s3store <- function(obj, name = NULL, .path = s3path(), safe = FALSE, ...) {
  if (is.null(name)) name <- deparse(substitute(obj))
  s3key <- paste(.path, name, sep = '')
  if (isTRUE(safe) && s3exists(name, .path = .path, ...)) {
    # using cat prints to stdout as opposed to messages, so it can be seen from syberia::run_model()
    cat("An object with name", name, "on path", .path,
        "already exists. Use `safe = FALSE` to overwrite\n", sep = " ")
    stop("-------------------------^")
  }
  obj4save <- s3normalize(obj, FALSE)
  s3mpi:::s3.put(obj4save, s3key, ...)
  if (!is.null(getOption('s3mpi.cache'))) s3cache(s3key, obj4save)
  if (is.environment(obj4save)) s3normalize(obj4save) # Revert side effects
  invisible(s3key)
}

#' @export
#' @rdname s3store
#' @note \code{s3put} is equivalent to \code{s3store} except that
#'    it will fail by default if you try to overwrite an existing key.
s3put <- function(..., safe = TRUE) { s3store(..., safe = safe) }
            

utils.R

              
            

A standard helper: if x is null, y will be returned instead.

              `%||%` <- function(x, y) if (is.null(x)) y else x

            

We use the memoise package to ensure this check only gets run once in a given R session. This means a user will have to restart R if they install s3cmd during a session, but we are comfortable with that!

              ensure_s3cmd_present <- memoise::memoise(function() {
  check <- try(system("s3cmd --help", intern = TRUE), silent = TRUE)
  if (is(check, "try-error")) {
            

It is always preferable to make life as easy as possible for the user! If they have the homebrew package manager, we give them the fastest installation instructions.

                  if (is.mac() && system2("which", "brew", stdout = FALSE)) {
      stop("Please install the ", crayon::yellow("s3cmd"), " command-line ",
           "utility using by running ", crayon::green("brew install s3cmd"),
           " from your terminal and then configuring your S3 credentials ",
           "using ", crayon::yellow("s3cmd --configure"), call. = FALSE)
    } else {
            

Otherwise, manual it is!

                    stop("Please install s3cmd, the S3 command line utility: ",
           "http://s3tools.org/kb/item14.htm\nand then setup your S3 ",
           "credentials using ", crayon::yellow("s3cmd --configure"),
           call. = FALSE)
    }
  }
})

            
              s3LRUcache <- cacher::LRUcache(getOption("s3mpi.cache_size", 10))

cache_enabled <- function() {
  !is.null(tmp <- cache_directory()) && nzchar(tmp)
}

cache_directory <- function() {
  dir <- getOption('s3mpi.cache')
  if (!is.null(dir) && !(is.character(dir) && length(dir) == 1 && !is.na(dir))) {
    stop("Please set the ", sQuote("s3mpi.cache"), " to a character ",
         "vector of length 1 giving a directory path.")
  }
  dir
}

has_internet <- local({
  has_internet_flag <- NULL
  function() {
    if (!is.null(getOption('s3mpi.skip_connection_check'))) return(FALSE)
    if (!is.null(has_internet_flag)) { return(has_internet_flag) }
    has_internet_flag <<- suppressWarnings({
      internet_check <- try(file('http://google.com', 'r'))
      if (!is(internet_check, 'try-error') && is(internet_check, 'connection')) {
        on.exit(close.connection(internet_check))
      }
      !(is(internet_check, 'try-error') &&
        grepl('cannot open', internet_check$message))
    })
  }
})