Skip to content

Commit

Permalink
Include progress bar in parallel processing
Browse files Browse the repository at this point in the history
  • Loading branch information
vwmaus committed Jun 4, 2017
1 parent 904022a commit c0d8029
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 49 deletions.
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Depends:
R (>= 3.2.0),
zoo,
raster,
snow,
ggplot2
Imports:
methods,
Expand All @@ -37,8 +38,7 @@ Imports:
lubridate,
caret,
mgcv,
xtable,
snow
xtable
Suggests:
testthat,
knitr,
Expand Down
4 changes: 1 addition & 3 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ import(ggplot2)
import(methods)
import(raster)
import(rgdal)
import(snow)
import(zoo)
importFrom(RColorBrewer,brewer.pal)
importFrom(caret,createDataPartition)
Expand All @@ -101,9 +102,6 @@ importFrom(reshape2,melt)
importFrom(scales,date_format)
importFrom(scales,percent)
importFrom(scales,pretty_breaks)
importFrom(snow,clusterExport)
importFrom(snow,recvOneData)
importFrom(snow,sendCall)
importFrom(sp,CRS)
importFrom(sp,Polygon)
importFrom(sp,Polygons)
Expand Down
86 changes: 55 additions & 31 deletions R/twdtwApplyParallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ setGeneric(name = "twdtwApplyParallel",
#' @aliases twdtwApplyParallel-twdtwRaster
#' @examples
#' \dontrun{
#' # Run TWDTW analysis for raster time series
#' temporal_patterns = MOD13Q1.MT.yearly.patterns
#' # Create raster time series
#' evi = brick(system.file("lucc_MT/data/evi.tif", package="dtwSat"))
#' ndvi = brick(system.file("lucc_MT/data/ndvi.tif", package="dtwSat"))
#' red = brick(system.file("lucc_MT/data/red.tif", package="dtwSat"))
Expand All @@ -59,23 +58,48 @@ setGeneric(name = "twdtwApplyParallel",
#' timeline = scan(system.file("lucc_MT/data/timeline", package="dtwSat"), what="date")
#' rts = twdtwRaster(evi, ndvi, red, blue, nir, mir, timeline = timeline, doy = doy)
#'
#' time_interval = seq(from=as.Date("2007-09-01"), to=as.Date("2013-09-01"),
#' by="12 month")
#' log_fun = weight.fun=logisticWeight(-0.1,50)
#' # Read fiels samples
#' field_samples = read.csv(system.file("lucc_MT/data/samples.csv", package="dtwSat"))
#' proj_str = scan(system.file("lucc_MT/data/samples_projection",
#' package="dtwSat"), what = "character")
#'
#' # Split samples for training (10%) and validation (90%) using stratified sampling
#' library(caret)
#' set.seed(1)
#' I = unlist(createDataPartition(field_samples$label, p = 0.1))
#' training_samples = field_samples[I,]
#' validation_samples = field_samples[-I,]
#'
#' # Get time series form raster
#' training_ts = getTimeSeries(rts, y = training_samples, proj4string = proj_str)
#' validation_ts = getTimeSeries(rts, y = validation_samples, proj4string = proj_str)
#'
#' library(snow)
#' # Create temporal patterns
#' temporal_patterns = createPatterns(training_ts, freq = 8, formula = y ~ s(x))
#'
#' # Run TWDTW analysis for raster time series
#' log_fun = weight.fun=logisticWeight(-0.1, 50)
#' time_s <- system.time(
#' r_twdtw <- twdtwApply(x = rts, y = temporal_patterns,
#' weight.fun = log_fun, overwrite = TRUE))
#' beginCluster()
#' r_twdtw = twdtwApplyParallel(x = rts, y = temporal_patterns,
#' weight.fun = log_fun, breaks = time_interval)
#' time_p <- system.time(
#' r_twdtw <- twdtwApplyParallel(x = rts, y = temporal_patterns,
#' weight.fun = log_fun, progress = 'text'))
#' endCluster()
#'
#' plot(r_twdtw, type="distance")
#'
#' # Classify raster based on the TWDTW analysis
#' r_lucc = twdtwClassify(r_twdtw, format="GTiff", overwrite=TRUE)
#'
#' plot(r_lucc)
#'
#' plot(r_lucc, type="distance")
#' # Assess classification
#' twdtw_assess = twdtwAssess(object = r_lucc, y = validation_samples,
#' proj4string = proj_str, conf.int = .95, rm.nosample = TRUE)
#' twdtw_assess
#'
#' # Plot assessment
#' plot(twdtw_assess, type="accuracy")
#' plot(twdtw_assess, type="area")
#'
#' }
#' @export
Expand Down Expand Up @@ -147,7 +171,23 @@ twdtwApplyParallel.twdtwRaster = function(x, y, weight.fun, dist.method, step.ma

bs <- blockSize(x, minblocks = nodes*4)
bs$array_rows <- cumsum(c(1, bs$nrows*out[[1]]@ncols))
pb <- pbCreate(bs$n)

filepath <- trim(filepath)
filename <- NULL
if (filepath != "") {
filename <- paste0(filepath, "/", names(out), ".grd")
} else if (!canProcessInMemory(r_template, n = length(breaks))) {
filename <- sapply(names(out), rasterTmpFile)
}

if (!is.null(filename)) {
out <- lapply(names(out), function(i) writeStart(out[[i]], filename = filename[i], ...))
} else {
vv <- lapply(names(out), function(i) matrix(out[[i]], ncol = nlayers(out[[i]])))
names(vv) <- names(out)
}

pb <- pbCreate(bs$n, ...)

clusterExport(cl = cl, list = c("y", "weight.fun", "dist.method", "step.matrix", "n", "span",
"min.length", "theta", "breaks", "overlap"), envir = environment())
Expand Down Expand Up @@ -193,22 +233,6 @@ twdtwApplyParallel.twdtwRaster = function(x, y, weight.fun, dist.method, step.ma
sendCall(cl[[k]], clFun, list(k), tag = k)
}

filepath <- trim(filepath)
filename <- NULL
if (filepath != "") {
filename <- paste0(filepath, "/", names(out), ".grd")
} else if (!canProcessInMemory(r_template, n = length(breaks))) {
filename <- sapply(names(out), rasterTmpFile)
}
# filename <- sapply(names(out), rasterTmpFile)

if (!is.null(filename)) {
out <- lapply(names(out), function(i) writeStart(out[[i]], filename = filename[i], ...))
} else {
vv <- lapply(names(out), function(i) matrix(out[[i]], ncol = nlayers(out[[i]])))
names(vv) <- names(out)
}

for (k in 1:bs$n) {
# receive results from a node
d <- recvOneData(cl)
Expand All @@ -220,7 +244,7 @@ twdtwApplyParallel.twdtwRaster = function(x, y, weight.fun, dist.method, step.ma

# which block is this?
b <- d$value$tag
cat('received block: ',b," / ",bs$n,'\n'); flush.console();
# cat('received block: ',b," / ",bs$n,'\n'); flush.console();

if (!is.null(filename)) {
out <- lapply(seq_along(levels), function(l) writeValues(out[[l]], d$value$value[[l]], bs$row[b]))
Expand All @@ -236,7 +260,7 @@ twdtwApplyParallel.twdtwRaster = function(x, y, weight.fun, dist.method, step.ma
if (ni <= bs$n) {
sendCall(cl[[d$node]], clFun, list(ni), tag = ni)
}
pbStep(pb)
pbStep(pb, k)
}
if (!is.null(filename)) {
out <- lapply(out, writeStop)
Expand Down
2 changes: 1 addition & 1 deletion R/zzz.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#' @import ggplot2
#' @import methods
#' @import rgdal
#' @import snow
#' @importFrom proxy dist pr_DB
#' @importFrom reshape2 melt
#' @importFrom scales pretty_breaks date_format percent
Expand All @@ -46,7 +47,6 @@
#' @importFrom caret createDataPartition
#' @importFrom xtable xtable print.xtable
#' @importFrom utils packageDescription flush.console
#' @importFrom snow clusterExport sendCall recvOneData
#' @useDynLib dtwSat, .registration = TRUE
#'
NULL
Expand Down
48 changes: 36 additions & 12 deletions man/twdtwApplyParallel.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c0d8029

Please sign in to comment.