Skip to content

Commit

Permalink
openeo: automate workflow and job start
Browse files Browse the repository at this point in the history
respecting maximum number of allowed concurrent jobs (2 for free tier usage)

to do:
figure out what advantages an paid suscription could have
  • Loading branch information
mrustl committed May 28, 2024
1 parent 7c85af1 commit f0cfff6
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 23 deletions.
6 changes: 6 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Generated by roxygen2: do not edit by hand

export("%>%")
export(check_job_status)
export(copernicus_cds)
export(copernicus_cds_parallel)
export(create_ad4gd_env)
Expand All @@ -10,9 +11,11 @@ export(gee_get_data_for_years)
export(gee_get_data_for_years_parallel)
export(gee_get_metadata)
export(get_metadata_era5)
export(get_number_of_active_jobs)
export(import_rds)
export(openeo_get_bands_meta)
export(openeo_get_data)
export(openeo_start_max_jobs)
export(sf_to_geojson)
import(foreach)
importFrom(doParallel,registerDoParallel)
Expand All @@ -22,6 +25,7 @@ importFrom(dplyr,bind_cols)
importFrom(dplyr,bind_rows)
importFrom(dplyr,case_when)
importFrom(dplyr,mutate)
importFrom(dplyr,pull)
importFrom(dplyr,select)
importFrom(ecmwfr,wf_request)
importFrom(fs,path_join)
Expand All @@ -36,8 +40,10 @@ importFrom(lubridate,ymd_hms)
importFrom(magrittr,"%>%")
importFrom(openeo,create_job)
importFrom(openeo,describe_collection)
importFrom(openeo,describe_job)
importFrom(openeo,list_collections)
importFrom(openeo,list_file_formats)
importFrom(openeo,list_jobs)
importFrom(openeo,processes)
importFrom(openeo,start_job)
importFrom(parallel,clusterEvalQ)
Expand Down
16 changes: 7 additions & 9 deletions R/.import_script.R
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
data_dirs <- fs::dir_ls("vignettes/gee/current")
remotes::install_github("kwb-r/kwb.satellite@dev")

data <- stats::setNames(lapply(data_dirs[3], function(dir) {
sat_dir <- "//medusa/projekte$/SUW_Department/Projects/AD4GD/Exchange/01_data/01_input/satellite_data/google-earth-engine"

sat_dirs <- fs::dir_ls(sat_dir)

sat_dat <- stats::setNames(lapply(data_dirs, function(dir) {
kwb.satellite::import_rds(rds_dir = dir, flatten = TRUE) #%>%
#dplyr::bind_rows()
}),
nm = basename(data_dirs[3]))

length(tmp_list)

tmp <- kwb.satellite::flatten_results(tmp_list)

nm = basename(data_dirs))

kwb.satellite::flatten_results(tmp_list$`Bückwitzer See_point-on-surface_mean_scale-10m_2017-2023`)
10 changes: 10 additions & 0 deletions R/.openeo_test.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
openeo_con <- openeo::connect(host = "https://openeo.dataspace.copernicus.eu")
openeo::login(openeo_con)

test <- lapply(6:10, function(i) {
kwb.satellite::openeo_get_data(lakes = lakes_bb_selected[i,])
})

job_ids <- sapply(1:5, function(i) test[[i]]$job$id)

kwb.satellite::openeo_start_max_jobs(job_ids = job_ids)
24 changes: 12 additions & 12 deletions R/openeo_get-data.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ openeo_get_data <- function(lakes,
point_on_surface = FALSE,
spatial_fun = "mean",
bands = NULL,
output_format = "JSON",
output_format = "CSV",
col_lakename = "SEE_NAME",
col_lakeid = "SEE_KZ",
start_job = FALSE
Expand Down Expand Up @@ -87,7 +87,7 @@ shape_type <- if(all(geos == "point")) {
"unclear"
}

if(is.null(spatial_fun) & output_format != "netCDF") {
if(is.null(spatial_fun) && output_format != "netCDF") {
message("Setting 'output_format' to 'netCDF, because '%s' is not supported by OpenEO",
output_format)
output_format <- "netCDF"
Expand All @@ -102,16 +102,6 @@ lake_id <- if(!is.null(lakes[[col_lakeid]])) {
""
}

batch_name <- sprintf("%s_%s%s_%s_%s-%s_%s",
lake_name,
lake_id,
shape_type,
spatial_fun,
date_start,
date_end,
output_format) %>% paste0(collapse = "_")


lakes <- if(point_on_surface) {
lakes %>%
sf::st_transform(crs = 25833) %>%
Expand Down Expand Up @@ -145,6 +135,16 @@ cube <- p$aggregate_spatial(data = cube,
# save result as JSON
res <- p$save_result(data = cube, format = output_format)

batch_name <- sprintf("%s_%s%s_%s_%s_%s-%s_%s",
lake_name,
lake_id,
shape_type,
collection_id,
if(is.null(spatial_fun)) {"raw"} else {spatial_fun},
date_start,
date_end,
output_format) %>% paste0(collapse = "_")

# send job to back-end
job <- openeo::create_job(graph = res, title = batch_name)

Expand Down
86 changes: 86 additions & 0 deletions R/openeo_start-jobs.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#' Helper function OpenEO: check job status
#'
#' @param job_id job id
#' @return status string of job
#' @export
#' @importFrom openeo describe_job
#'
check_job_status <- function(job_id) {
job_info <- openeo::describe_job(job = job_id)
return(job_info$status)
}

#' Helper function OpenEO: get number of active jobs
#'
#' @return number of active jobs
#' @export
#'
#' @importFrom openeo list_jobs
#' @importFrom tibble as_tibble
#' @importFrom dplyr pull
get_number_of_active_jobs <- function() {
sum(openeo::list_jobs() %>%
tibble::as_tibble() %>%
dplyr::pull(status) %in% c("running", "queued"))
}


#' OpenEO: start maximum number of jobs
#'
#' @param job_ids character vector of job ids
#' @param max_jobs maximum number of concurrent jobs (default: 2)
#' @param check_interval check intervall in seconds if jobs are already finished
#' @param debug print debug messages (default: TRUE)
#'
#' @return starts all jobs respecting the max_jobs limit
#' @export
#' @importFrom kwb.utils catAndRun
openeo_start_max_jobs <- function(job_ids, max_jobs = 2, check_interval = 30, debug = TRUE) {
idx <- 0

while(idx < length(job_ids)) {
active_jobs <- get_number_of_active_jobs()

# Starte so viele neue Jobs wie möglich
while(active_jobs < max_jobs && idx < length(job_ids)) {
idx <- idx + 1
job_status <- check_job_status(job_id = job_ids[idx])

if(job_status == "created") {
kwb.utils::catAndRun(messageText = sprintf("Start job '%s' (%d/%d), active jobs: %d",
job_ids[idx],
idx,
length(job_ids),
active_jobs),
expr = {
openeo::start_job(job = job_ids[idx])
Sys.sleep(1)
active_jobs <- get_number_of_active_jobs()

Sys.sleep(1) # Kleine Pause, um Server nicht zu überlasten
},
dbg = debug)
} else if (job_status == "finished") {
message(sprintf("Skipping job '%s'. It is already '%s'",
job_ids[idx],
job_status))
} else {
message(sprintf("Skipping job '%s'. It is already '%s'",
job_ids[idx],
job_status))
}
}

if(active_jobs == max_jobs) {
message(sprintf("We have to wait. There are already %d jobs 'running/queued' (%s)",
active_jobs,
Sys.time()))
}

# Wartezeit zwischen den Überprüfungen
Sys.sleep(check_interval)

# Überwachen und Anzahl der aktiven Jobs aktualisieren
active_jobs <- get_number_of_active_jobs()
}
}
17 changes: 17 additions & 0 deletions man/check_job_status.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions man/get_number_of_active_jobs.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions man/openeo_get_data.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions man/openeo_start_max_jobs.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f0cfff6

Please sign in to comment.