Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ connection_wait_for_notify <- function(con, timeout_secs) {
.Call(`_RPostgres_connection_wait_for_notify`, con, timeout_secs)
}

connection_get_temp_schema <- function(con) {
.Call(`_RPostgres_connection_get_temp_schema`, con)
}

connection_set_temp_schema <- function(con, temp_schema) {
invisible(.Call(`_RPostgres_connection_set_temp_schema`, con, temp_schema))
}

encode_vector <- function(x) {
.Call(`_RPostgres_encode_vector`, x)
}
Expand Down
47 changes: 45 additions & 2 deletions R/tables.R
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,21 @@ find_table <- function(conn, id, inf_table = "tables", only_first = FALSE) {
" AS table_schema) t"
)
} else if (is_redshift) {
query <- "(SELECT 1 AS nr, current_schema() AS table_schema) ttt"
# A variant of the Postgres version that uses CTEs and generate_series()
# instead of generate_subscripts(), the latter is not supported on Redshift
query <- paste0(
"(WITH ",
" n_schemas AS (",
" SELECT max(regexp_count(setting, '[,]')) + 2 AS max_num ",
" FROM pg_settings WHERE name='search_path'",
" ),",
" tt AS (",
" SELECT generate_series(1, max_num) AS nr, current_schemas(true)::text[] ",
" FROM n_schemas",
" )",
" SELECT nr, current_schemas[nr] AS table_schema FROM tt WHERE current_schemas[nr] <> 'pg_catalog'",
") ttt"
)
only_first <- FALSE
} else {
# https://stackoverflow.com/a/8767450/946850
Expand Down Expand Up @@ -349,6 +363,33 @@ find_table <- function(conn, id, inf_table = "tables", only_first = FALSE) {
query
}

find_temp_schema <- function(conn, fail_if_missing = TRUE) {
if(!is.na(connection_get_temp_schema(conn@ptr)))
return(connection_get_temp_schema(conn@ptr))
if (is(conn, "RedshiftConnection")) {
temp_schema <- dbGetQuery(
conn,
paste0(
"SELECT current_schemas[1] as schema ",
"FROM (SELECT current_schemas(true)) ",
"WHERE current_schemas[1] LIKE 'pg_temp_%'"
)
)

if (nrow(temp_schema) == 1 && is.character(temp_schema[[1]])) {
connection_set_temp_schema(conn@ptr, temp_schema[[1]])
return(connection_get_temp_schema(conn@ptr))
} else {
# Temporary schema do not exist yet.
if (fail_if_missing) stopc("temporary schema does not exist")
return(NULL)
}
} else {
connection_set_temp_schema(conn@ptr, "pg_temp")
return(connection_get_temp_schema(conn@ptr))
}
}

#' @export
#' @rdname postgres-tables
#' @param temporary If `TRUE`, only temporary tables are considered.
Expand All @@ -363,7 +404,9 @@ setMethod("dbRemoveTable", c("PqConnection", "character"),
extra <- "IF EXISTS "
}
if (temporary) {
extra <- paste0(extra, "pg_temp.")
temp_schema <- find_temp_schema(conn, fail_if_missing)
if (is.null(temp_schema)) return(invisible(TRUE))
extra <- paste0(extra, temp_schema, ".")
}
dbExecute(conn, paste0("DROP TABLE ", extra, name))
invisible(TRUE)
Expand Down
11 changes: 10 additions & 1 deletion src/DbConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ DbConnection::DbConnection(std::vector<std::string> keys, std::vector<std::strin
bool check_interrupts) :
pCurrentResult_(NULL),
transacting_(false),
check_interrupts_(check_interrupts)
check_interrupts_(check_interrupts),
temp_schema_(CharacterVector::create(NA_STRING))
{
size_t n = keys.size();
std::vector<const char*> c_keys(n + 1), c_values(n + 1);
Expand Down Expand Up @@ -245,6 +246,14 @@ void DbConnection::set_transacting(bool transacting) {
transacting_ = transacting;
}

CharacterVector DbConnection::get_temp_schema() const {
return temp_schema_;
}

void DbConnection::set_temp_schema(CharacterVector temp_schema) {
temp_schema_ = temp_schema;
}

void DbConnection::conn_stop(const char* msg) {
conn_stop(conn(), msg);
}
Expand Down
4 changes: 4 additions & 0 deletions src/DbConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class DbConnection : boost::noncopyable {
const DbResult* pCurrentResult_;
bool transacting_;
bool check_interrupts_;
CharacterVector temp_schema_;

public:
DbConnection(std::vector<std::string> keys, std::vector<std::string> values,
Expand Down Expand Up @@ -47,6 +48,9 @@ class DbConnection : boost::noncopyable {
bool is_transacting() const;
void set_transacting(bool transacting);

CharacterVector get_temp_schema() const;
void set_temp_schema(CharacterVector temp_schema);

void conn_stop(const char* msg);
static void conn_stop(PGconn* conn, const char* msg);

Expand Down
24 changes: 24 additions & 0 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,28 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
// connection_get_temp_schema
CharacterVector connection_get_temp_schema(DbConnection* con);
RcppExport SEXP _RPostgres_connection_get_temp_schema(SEXP conSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< DbConnection* >::type con(conSEXP);
rcpp_result_gen = Rcpp::wrap(connection_get_temp_schema(con));
return rcpp_result_gen;
END_RCPP
}
// connection_set_temp_schema
void connection_set_temp_schema(DbConnection* con, CharacterVector temp_schema);
RcppExport SEXP _RPostgres_connection_set_temp_schema(SEXP conSEXP, SEXP temp_schemaSEXP) {
BEGIN_RCPP
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< DbConnection* >::type con(conSEXP);
Rcpp::traits::input_parameter< CharacterVector >::type temp_schema(temp_schemaSEXP);
connection_set_temp_schema(con, temp_schema);
return R_NilValue;
END_RCPP
}
// encode_vector
std::string encode_vector(RObject x);
RcppExport SEXP _RPostgres_encode_vector(SEXP xSEXP) {
Expand Down Expand Up @@ -294,6 +316,8 @@ static const R_CallMethodDef CallEntries[] = {
{"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2},
{"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3},
{"_RPostgres_connection_wait_for_notify", (DL_FUNC) &_RPostgres_connection_wait_for_notify, 2},
{"_RPostgres_connection_get_temp_schema", (DL_FUNC) &_RPostgres_connection_get_temp_schema, 1},
{"_RPostgres_connection_set_temp_schema", (DL_FUNC) &_RPostgres_connection_set_temp_schema, 2},
{"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1},
{"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1},
{"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2},
Expand Down
9 changes: 9 additions & 0 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ List connection_wait_for_notify(DbConnection* con, int timeout_secs) {
return con->wait_for_notify(timeout_secs);
}

// Temporary Schema
// [[Rcpp::export]]
CharacterVector connection_get_temp_schema(DbConnection* con) {
return con->get_temp_schema();
}
// [[Rcpp::export]]
void connection_set_temp_schema(DbConnection* con, CharacterVector temp_schema) {
con->set_temp_schema(temp_schema);
}

// as() override

Expand Down
7 changes: 0 additions & 7 deletions tests/testthat/helper-DBItest.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ DBItest::make_context(

if (getRversion() < "3.6") "compliance",

# Redshift:
# "exists_table_temporary",
# "remove_table_temporary",
# "remove_table_temporary_arg",
# "list_objects_temporary",
# "list_fields_temporary",

NULL
)
)