From 904022a558ad215575e3273928d9966dde8f4945 Mon Sep 17 00:00:00 2001 From: vwmaus Date: Fri, 2 Jun 2017 16:04:19 +0200 Subject: [PATCH] First parallel version of twdtwApply for raster time series using the package snow --- DESCRIPTION | 3 +- NAMESPACE | 5 + R/twdtwApplyParallel.R | 215 +++++++++++--------------------------- R/twdtwAssess.R | 2 +- R/zzz.R | 2 + man/twdtwApplyParallel.Rd | 16 +-- man/twdtwAssess.Rd | 2 +- 7 files changed, 80 insertions(+), 165 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index dc8c867..1a7dba7 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -37,7 +37,8 @@ Imports: lubridate, caret, mgcv, - xtable + xtable, + snow Suggests: testthat, knitr, diff --git a/NAMESPACE b/NAMESPACE index 2fcbd71..e07aa89 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -101,6 +101,9 @@ importFrom(reshape2,melt) importFrom(scales,date_format) importFrom(scales,percent) importFrom(scales,pretty_breaks) +importFrom(snow,clusterExport) +importFrom(snow,recvOneData) +importFrom(snow,sendCall) importFrom(sp,CRS) importFrom(sp,Polygon) importFrom(sp,Polygons) @@ -117,6 +120,8 @@ importFrom(stats,qnorm) importFrom(stats,sd) importFrom(stats,window) importFrom(stats,xtabs) +importFrom(utils,flush.console) +importFrom(utils,packageDescription) importFrom(xtable,print.xtable) importFrom(xtable,xtable) useDynLib(dtwSat, .registration = TRUE) diff --git a/R/twdtwApplyParallel.R b/R/twdtwApplyParallel.R index 5993f47..d332699 100644 --- a/R/twdtwApplyParallel.R +++ b/R/twdtwApplyParallel.R @@ -37,9 +37,9 @@ #' #' @export setGeneric(name = "twdtwApplyParallel", - def = function(x, y, resample=TRUE, length=NULL, weight.fun=NULL, - dist.method="Euclidean", step.matrix = symmetric1, n=NULL, - span=NULL, min.length=0, theta = 0.5, ...) standardGeneric("twdtwApplyParallel")) + def = function(x, y, resample = TRUE, length = NULL, weight.fun = NULL, + dist.method = "Euclidean", step.matrix = symmetric1, n = NULL, + span = NULL, min.length = 0, theta = 0.5, ...) standardGeneric("twdtwApplyParallel")) @@ -48,7 +48,7 @@ setGeneric(name = "twdtwApplyParallel", #' @examples #' \dontrun{ #' # Run TWDTW analysis for raster time series -#' patt = MOD13Q1.MT.yearly.patterns +#' temporal_patterns = MOD13Q1.MT.yearly.patterns #' evi = brick(system.file("lucc_MT/data/evi.tif", package="dtwSat")) #' ndvi = brick(system.file("lucc_MT/data/ndvi.tif", package="dtwSat")) #' red = brick(system.file("lucc_MT/data/red.tif", package="dtwSat")) @@ -63,8 +63,11 @@ setGeneric(name = "twdtwApplyParallel", #' by="12 month") #' log_fun = weight.fun=logisticWeight(-0.1,50) #' -#' r_twdtw = twdtwApply(x=rts, y=patt, weight.fun=log_fun, breaks=time_interval, -#' filepath="~/test_twdtw", overwrite=TRUE, format="GTiff") +#' library(snow) +#' beginCluster() +#' r_twdtw = twdtwApplyParallel(x = rts, y = temporal_patterns, +#' weight.fun = log_fun, breaks = time_interval) +#' endCluster() #' #' plot(r_twdtw, type="distance") #' @@ -78,7 +81,7 @@ setGeneric(name = "twdtwApplyParallel", #' @export setMethod(f = "twdtwApplyParallel", "twdtwRaster", def = function(x, y, resample, length, weight.fun, dist.method, step.matrix, n, span, min.length, theta, - breaks=NULL, from=NULL, to=NULL, by=NULL, overlap=0.5, chunk.size=1000, filepath=NULL, silent=FALSE, ...){ + breaks=NULL, from=NULL, to=NULL, by=NULL, overlap=0.5, filepath="", silent=FALSE, ...){ if(!is(step.matrix, "stepPattern")) stop("step.matrix is not of class stepPattern") if(is.null(weight.fun)) @@ -107,106 +110,17 @@ setMethod(f = "twdtwApplyParallel", "twdtwRaster", if(resample) y = resampleTimeSeries(object=y, length=length) twdtwApplyParallel.twdtwRaster(x, y, weight.fun, dist.method, step.matrix, n, span, min.length, theta, - breaks, overlap, chunk.size, filepath, silent, ...) + breaks, overlap, filepath, silent, ...) }) twdtwApplyParallel.twdtwRaster = function(x, y, weight.fun, dist.method, step.matrix, n, span, min.length, theta, - breaks, overlap, chunk.size, filepath, silent, ...){ - - # Set blocks Multi-thread parameters - minblocks = round(nrow(x)*ncol(x) / chunk.size) - blocks = blockSize(x@timeseries[[1]], minblocks = minblocks) - threads = seq(1, blocks$n) - - # Match raster bands to pattern bands - raster_bands = coverages(x) - pattern_names = names(y@timeseries[[1]]) - matching_bands = pattern_names[pattern_names %in% raster_bands] - if(length(matching_bands)<1) - stop(paste0("Attributes (bands) of the raster and patterns do not match")) - if("doy"%in%coverages(x) & !"doy"%in%matching_bands) - matching_bands = c("doy", matching_bands) - x = subset(x, layers=matching_bands) - raster_bands = coverages(x) - - # Set raster levels and labels - levels = levels(y) - names(levels) = levels - - # Open raster fiels for results - if(is.null(filepath)) filepath = paste0(getwd(), "/twdtw_results") - r_template = brick(x@timeseries[[1]], nl=length(breaks)-1) - dir.create(filepath, showWarnings = FALSE, recursive = TRUE) - filename = paste0(filepath,"/twdtw_distance_from_",levels) - names(filename) = levels - # b_files = lapply(filename, function(i) writeStart(r_template, filename=i, format="GTiff", overwrite=TRUE)) - b_files = lapply(filename, function(i) writeStart(r_template, filename=i, ...)) - - # Get time line - timeline = as.Date(index(x)) - - # Raster info - proj_str = projection(x) - coord = data.frame(coordinates(x)) - names(coord) = c("longitude","latitude") - - fun = function(i){ - - if(!silent) print(paste0("Procesing chunk ",i,"/",threads[length(threads)])) - - # Get twdtwTimeSeries from raster - cells = cellFromRow(x@timeseries[[1]], blocks$row[i]:blocks$nrows[i]) - ts = getTimeSeries(x, y = coord[cells,], proj4string = proj_str) - - # Apply TWDTW analysis - twdtw_results = twdtwApply(ts, y=y, weight.fun=weight.fun, dist.method=dist.method, - step.matrix=step.matrix, n=n, span=span, - min.length=min.length, theta=theta, keep=FALSE) - - # Get best mathces for each point, period, and pattern - m = length(levels) - h = length(breaks)-1 - A = lapply(as.list(twdtw_results), FUN=.lowestDistances, m=m, n=h, levels=levels, breaks=breaks, overlap=overlap, fill=9999) - - # Reshape list to array - A = sapply(A, matrix, nrow=h, ncol=m, simplify = 'array') - - # Write raster files - lapply(seq_along(levels), function(l) writeValues(b_files[[levels[l]]], matrix(t(A[,l,]),ncol=h), blocks$row[i])) - } - - # Apply TWDTW analysis - out.list = lapply(threads, FUN = fun) - - # Close raster files - b_files = lapply(b_files, writeStop) - - # Create brick list for output - out = lapply(sapply(b_files, filename), brick) - - # Save output timeline - timeline = breaks[-1] - write(as.character(timeline), file = paste(filepath, "timeline", sep="/")) - - new("twdtwRaster", timeseries = out, timeline=timeline, layers = names(out)) -} - - - - -# Prallel raster processing -apply_raster_parallel <- function(x, fun, A, filepath="", ...) { - - # Set blocks Multi-thread parameters - # minblocks <- round(nrow(x) * ncol(x) / chunk.size) - # blocks <- blockSize(x@timeseries[[1]], minblocks = minblocks) - # threads <- seq(1, blocks$n) + breaks, overlap, filepath, silent, ...){ # Match raster bands to pattern bands raster_bands <- coverages(x) - pattern_names <- names(y@timeseries[[1]]) - matching_bands <- pattern_names[pattern_names %in% raster_bands] + pattern_bands <- names(y@timeseries[[1]]) + matching_bands <- pattern_bands[pattern_bands %in% raster_bands] if(length(matching_bands) < 1) stop(paste0("Bands from twdtwRaster do not match the bands from patterns")) @@ -222,19 +136,9 @@ apply_raster_parallel <- function(x, fun, A, filepath="", ...) { names(levels) <- levels # Create output raster objects - # if(is.null(filepath)) - # filepath = paste0(getwd(), "/twdtw_results") r_template <- brick(x@timeseries[[1]], nl = length(breaks) - 1, values = FALSE) out <- rep(list(r_template), length(levels)) names(out) <- names(levels) - # dir.create(filepath, showWarnings = FALSE, recursive = TRUE) - # filename = paste0(filepath,"/twdtw_distance_from_",levels) - # names(filename) = levels - # b_files = lapply(filename, function(i) writeStart(r_template, filename=i, format="GTiff", overwrite=TRUE)) - # b_files = lapply(filename, function(i) writeStart(r_template, filename=i, ...)) - - # out <- brick(x@timeseries[[1]], nl=length(breaks)-1, values=FALSE) - # names(out) <- names(x) cl <- getCluster() on.exit( returnCluster() ) @@ -242,46 +146,51 @@ apply_raster_parallel <- function(x, fun, A, filepath="", ...) { nodes <- length(cl) bs <- blockSize(x, minblocks = nodes*4) - bs$array_rows <- cumsum(c(1, bs$nrows*out@ncols)) + bs$array_rows <- cumsum(c(1, bs$nrows*out[[1]]@ncols)) pb <- pbCreate(bs$n) + + clusterExport(cl = cl, list = c("y", "weight.fun", "dist.method", "step.matrix", "n", "span", + "min.length", "theta", "breaks", "overlap"), envir = environment()) clFun <- function(k){ - # Get raster data - # v <- lapply(x@timeseries, getValues, row = 1, nrows = 2) - v <- lapply(x, getValues, row = bs$row[k], nrows = bs$nrows[k]) + # Get raster data + v <- lapply(x@timeseries, getValues, row = bs$row[k], nrows = bs$nrows[k]) - # Create twdtwTimeSeries - ts <- twdtwTimeSeries(lapply(1:nrow(v[[1]]), function(i){ - # Get time series dates + # Create twdtwTimeSeries + ts <- dtwSat::twdtwTimeSeries(lapply(1:nrow(v[[1]]), function(i){ + # Get time series dates if(any(names(v) %in% "doy")){ - timeline <- getDatesFromDOY(year = format(index(x), "%Y"), doy = v[["doy"]][i,]) + timeline <- dtwSat::getDatesFromDOY(year = format(dtwSat::index(x), "%Y"), doy = v[["doy"]][i,]) } else { - timeline <- index(x) + timeline <- dtwSat::index(x) } - # Tag duplicate dates for removal - k = !duplicated(timeline) - # Create multi band time series - zoo(sapply(v[!(names(v) %in% "doy")], function(v) v[i, k, drop = FALSE]), timeline[k]) + # Tag duplicate dates for removal + s = !duplicated(timeline) + # Create multi band time series + zoo::zoo(sapply(v[!(names(v) %in% "doy")], function(v) v[i, s, drop = FALSE]), timeline[s]) })) - - # Apply TWDTW analysis - twdtw_results = twdtwApply(ts, y=y, weight.fun=weight.fun, dist.method=dist.method, - step.matrix=step.matrix, n=n, span=span, - min.length=min.length, theta=theta, keep=FALSE) - - # Get best mathces for each point, period, and pattern - m = length(levels) - h = length(breaks)-1 - - A <- lapply(as.list(twdtw_results), FUN=.lowestDistances, m=m, n=h, levels=levels, breaks=breaks, overlap=overlap, fill=9999) + + # Apply TWDTW analysis + twdtw_results <- dtwSat::twdtwApply(x = ts, y = y, weight.fun = weight.fun, dist.method = dist.method, + step.matrix = step.matrix, n = n, span = span, + min.length = min.length, theta = theta, keep = FALSE) + + # Get best mathces for each point, period, and pattern + levels <- dtwSat::levels(y) + m <- length(levels) + h <- length(breaks)-1 + + A <- lapply(as.list(twdtw_results), FUN = .lowestDistances, m = m, n = h, + levels = levels, breaks = breaks, overlap = overlap, fill = 9999) B <- as.list(data.frame(do.call("rbind", A))) + names(B) <- levels lapply(B, function(m) matrix(as.numeric(m), nrow = length(A), ncol = nrow(A[[1]]), byrow = TRUE)) } # get all nodes going for (k in 1:nodes) { - sendCall(cl[[k]], clFun, list(k, A), tag = k) + sendCall(cl[[k]], clFun, list(k), tag = k) } filepath <- trim(filepath) @@ -290,15 +199,14 @@ apply_raster_parallel <- function(x, fun, A, filepath="", ...) { filename <- paste0(filepath, "/", names(out), ".grd") } else if (!canProcessInMemory(r_template, n = length(breaks))) { filename <- sapply(names(out), rasterTmpFile) - } + } + # filename <- sapply(names(out), rasterTmpFile) if (!is.null(filename)) { - # out <- writeStart(out, filename = filename, ... ) out <- lapply(names(out), function(i) writeStart(out[[i]], filename = filename[i], ...)) } else { - # vv <- matrix(ncol=nrow(out), nrow=ncol(out)) - # vv <- matrix(out, ncol=nlayers(out)) vv <- lapply(names(out), function(i) matrix(out[[i]], ncol = nlayers(out[[i]]))) + names(vv) <- names(out) } for (k in 1:bs$n) { @@ -306,7 +214,7 @@ apply_raster_parallel <- function(x, fun, A, filepath="", ...) { d <- recvOneData(cl) # error? - if (! d$value$success) { + if (!d$value$success) { stop('cluster error') } @@ -315,34 +223,33 @@ apply_raster_parallel <- function(x, fun, A, filepath="", ...) { cat('received block: ',b," / ",bs$n,'\n'); flush.console(); if (!is.null(filename)) { - # out <- writeValues(out, d$value$value, bs$row[b]) out <- lapply(seq_along(levels), function(l) writeValues(out[[l]], d$value$value[[l]], bs$row[b])) } else { - # cols <- bs$row[b]:(bs$row[b] + bs$nrows[b]-1) - # vv[,cols] <- matrix(d$value$value, nrow=out@ncols) - rows <- seq(from = bs$array_rows[b], by = 1, length.out = bs$nrows[b]*out@ncols) - vv <- lapply(seq_along(levels), function(l) vv[[l]][rows,] <- d$value$value[[l]]) + rows <- seq(from = bs$array_rows[b], by = 1, length.out = bs$nrows[b]*out[[1]]@ncols) + for(l in seq_along(levels)){ + vv[[l]][rows,] <- d$value$value[[l]] + } } # need to send more data? ni <- nodes + k if (ni <= bs$n) { - sendCall(cl[[d$node]], clFun, list(ni, A), tag = ni) - } + sendCall(cl[[d$node]], clFun, list(ni), tag = ni) + } pbStep(pb) } if (!is.null(filename)) { - # out <- writeStop(out) out <- lapply(out, writeStop) } else { - # out <- setValues(out, as.vector(vv)) - out <- lapply(names(out), function(i) setValues(out[[i]], values = as.vector(vv[[i]]))) + out <- lapply(seq_along(levels), function(i) setValues(out[[i]], values = vv[[i]])) + names(out) <- names(vv) } pbClose(pb) - return(out) + new("twdtwRaster", timeseries = out, timeline = breaks[-1], layers = levels) + } -.lowestDistances2 = function(x, m, n, levels, breaks, overlap, fill){ - t(.bestmatches(x, m, n, levels, breaks, overlap, fill)$AM) -} + + + diff --git a/R/twdtwAssess.R b/R/twdtwAssess.R index d675234..6783e35 100644 --- a/R/twdtwAssess.R +++ b/R/twdtwAssess.R @@ -112,7 +112,7 @@ NULL #' #' # Assess classification #' twdtw_assess = twdtwAssess(object = r_lucc, y = validation_samples, -#' proj4string = proj_str, conf.int = .95, rm.nosample=TRUE) +#' proj4string = proj_str, conf.int = .95, rm.nosample = TRUE) #' twdtw_assess #' #' # Plot assessment diff --git a/R/zzz.R b/R/zzz.R index 6236346..3b4ab99 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -45,6 +45,8 @@ #' @importFrom lubridate month month<- day day<- year year<- #' @importFrom caret createDataPartition #' @importFrom xtable xtable print.xtable +#' @importFrom utils packageDescription flush.console +#' @importFrom snow clusterExport sendCall recvOneData #' @useDynLib dtwSat, .registration = TRUE #' NULL diff --git a/man/twdtwApplyParallel.Rd b/man/twdtwApplyParallel.Rd index 740d720..189c2e6 100644 --- a/man/twdtwApplyParallel.Rd +++ b/man/twdtwApplyParallel.Rd @@ -13,8 +13,8 @@ twdtwApplyParallel(x, y, resample = TRUE, length = NULL, \S4method{twdtwApplyParallel}{twdtwRaster}(x, y, resample, length, weight.fun, dist.method, step.matrix, n, span, min.length, theta, breaks = NULL, - from = NULL, to = NULL, by = NULL, overlap = 0.5, chunk.size = 1000, - filepath = NULL, silent = FALSE, ...) + from = NULL, to = NULL, by = NULL, overlap = 0.5, filepath = "", + silent = FALSE, ...) } \arguments{ \item{x}{an object of class twdtw*. This is the target time series. @@ -74,9 +74,6 @@ the time has the same weight as the curve shape in the TWDTW analysis.} between one match and the interval of classification. Default is 0.5, \emph{i.e.} an overlap minimum of 50\%.} -\item{chunk.size}{An integer. Set the number of cells for each block, -see \code{\link[raster]{blockSize}} for details.} - \item{filepath}{A character. The path to save the raster with results. If not informed the function saves in the current work directory.} @@ -102,7 +99,7 @@ See [1] for details about the method. \examples{ \dontrun{ # Run TWDTW analysis for raster time series -patt = MOD13Q1.MT.yearly.patterns +temporal_patterns = MOD13Q1.MT.yearly.patterns evi = brick(system.file("lucc_MT/data/evi.tif", package="dtwSat")) ndvi = brick(system.file("lucc_MT/data/ndvi.tif", package="dtwSat")) red = brick(system.file("lucc_MT/data/red.tif", package="dtwSat")) @@ -117,8 +114,11 @@ time_interval = seq(from=as.Date("2007-09-01"), to=as.Date("2013-09-01"), by="12 month") log_fun = weight.fun=logisticWeight(-0.1,50) -r_twdtw = twdtwApply(x=rts, y=patt, weight.fun=log_fun, breaks=time_interval, - filepath="~/test_twdtw", overwrite=TRUE, format="GTiff") +library(snow) +beginCluster() +r_twdtw = twdtwApplyParallel(x = rts, y = temporal_patterns, + weight.fun = log_fun, breaks = time_interval) +endCluster() plot(r_twdtw, type="distance") diff --git a/man/twdtwAssess.Rd b/man/twdtwAssess.Rd index 4cad96d..a9f7ce8 100644 --- a/man/twdtwAssess.Rd +++ b/man/twdtwAssess.Rd @@ -116,7 +116,7 @@ plot(r_lucc) # Assess classification twdtw_assess = twdtwAssess(object = r_lucc, y = validation_samples, - proj4string = proj_str, conf.int = .95, rm.nosample=TRUE) + proj4string = proj_str, conf.int = .95, rm.nosample = TRUE) twdtw_assess # Plot assessment