Skip to content

Commit

Permalink
First parallel version of twdtwApply for raster time series using the…
Browse files Browse the repository at this point in the history
… package snow
  • Loading branch information
vwmaus committed Jun 2, 2017
1 parent 50f9ec4 commit 904022a
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 165 deletions.
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ Imports:
lubridate,
caret,
mgcv,
xtable
xtable,
snow
Suggests:
testthat,
knitr,
Expand Down
5 changes: 5 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ importFrom(reshape2,melt)
importFrom(scales,date_format)
importFrom(scales,percent)
importFrom(scales,pretty_breaks)
importFrom(snow,clusterExport)
importFrom(snow,recvOneData)
importFrom(snow,sendCall)
importFrom(sp,CRS)
importFrom(sp,Polygon)
importFrom(sp,Polygons)
Expand All @@ -117,6 +120,8 @@ importFrom(stats,qnorm)
importFrom(stats,sd)
importFrom(stats,window)
importFrom(stats,xtabs)
importFrom(utils,flush.console)
importFrom(utils,packageDescription)
importFrom(xtable,print.xtable)
importFrom(xtable,xtable)
useDynLib(dtwSat, .registration = TRUE)
215 changes: 61 additions & 154 deletions R/twdtwApplyParallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
#'
#' @export
setGeneric(name = "twdtwApplyParallel",
def = function(x, y, resample=TRUE, length=NULL, weight.fun=NULL,
dist.method="Euclidean", step.matrix = symmetric1, n=NULL,
span=NULL, min.length=0, theta = 0.5, ...) standardGeneric("twdtwApplyParallel"))
def = function(x, y, resample = TRUE, length = NULL, weight.fun = NULL,
dist.method = "Euclidean", step.matrix = symmetric1, n = NULL,
span = NULL, min.length = 0, theta = 0.5, ...) standardGeneric("twdtwApplyParallel"))



Expand All @@ -48,7 +48,7 @@ setGeneric(name = "twdtwApplyParallel",
#' @examples
#' \dontrun{
#' # Run TWDTW analysis for raster time series
#' patt = MOD13Q1.MT.yearly.patterns
#' temporal_patterns = MOD13Q1.MT.yearly.patterns
#' evi = brick(system.file("lucc_MT/data/evi.tif", package="dtwSat"))
#' ndvi = brick(system.file("lucc_MT/data/ndvi.tif", package="dtwSat"))
#' red = brick(system.file("lucc_MT/data/red.tif", package="dtwSat"))
Expand All @@ -63,8 +63,11 @@ setGeneric(name = "twdtwApplyParallel",
#' by="12 month")
#' log_fun = weight.fun=logisticWeight(-0.1,50)
#'
#' r_twdtw = twdtwApply(x=rts, y=patt, weight.fun=log_fun, breaks=time_interval,
#' filepath="~/test_twdtw", overwrite=TRUE, format="GTiff")
#' library(snow)
#' beginCluster()
#' r_twdtw = twdtwApplyParallel(x = rts, y = temporal_patterns,
#' weight.fun = log_fun, breaks = time_interval)
#' endCluster()
#'
#' plot(r_twdtw, type="distance")
#'
Expand All @@ -78,7 +81,7 @@ setGeneric(name = "twdtwApplyParallel",
#' @export
setMethod(f = "twdtwApplyParallel", "twdtwRaster",
def = function(x, y, resample, length, weight.fun, dist.method, step.matrix, n, span, min.length, theta,
breaks=NULL, from=NULL, to=NULL, by=NULL, overlap=0.5, chunk.size=1000, filepath=NULL, silent=FALSE, ...){
breaks=NULL, from=NULL, to=NULL, by=NULL, overlap=0.5, filepath="", silent=FALSE, ...){
if(!is(step.matrix, "stepPattern"))
stop("step.matrix is not of class stepPattern")
if(is.null(weight.fun))
Expand Down Expand Up @@ -107,106 +110,17 @@ setMethod(f = "twdtwApplyParallel", "twdtwRaster",
if(resample)
y = resampleTimeSeries(object=y, length=length)
twdtwApplyParallel.twdtwRaster(x, y, weight.fun, dist.method, step.matrix, n, span, min.length, theta,
breaks, overlap, chunk.size, filepath, silent, ...)
breaks, overlap, filepath, silent, ...)
})


twdtwApplyParallel.twdtwRaster = function(x, y, weight.fun, dist.method, step.matrix, n, span, min.length, theta,
breaks, overlap, chunk.size, filepath, silent, ...){

# Set blocks Multi-thread parameters
minblocks = round(nrow(x)*ncol(x) / chunk.size)
blocks = blockSize(x@timeseries[[1]], minblocks = minblocks)
threads = seq(1, blocks$n)

# Match raster bands to pattern bands
raster_bands = coverages(x)
pattern_names = names(y@timeseries[[1]])
matching_bands = pattern_names[pattern_names %in% raster_bands]
if(length(matching_bands)<1)
stop(paste0("Attributes (bands) of the raster and patterns do not match"))
if("doy"%in%coverages(x) & !"doy"%in%matching_bands)
matching_bands = c("doy", matching_bands)
x = subset(x, layers=matching_bands)
raster_bands = coverages(x)

# Set raster levels and labels
levels = levels(y)
names(levels) = levels

# Open raster fiels for results
if(is.null(filepath)) filepath = paste0(getwd(), "/twdtw_results")
r_template = brick(x@timeseries[[1]], nl=length(breaks)-1)
dir.create(filepath, showWarnings = FALSE, recursive = TRUE)
filename = paste0(filepath,"/twdtw_distance_from_",levels)
names(filename) = levels
# b_files = lapply(filename, function(i) writeStart(r_template, filename=i, format="GTiff", overwrite=TRUE))
b_files = lapply(filename, function(i) writeStart(r_template, filename=i, ...))

# Get time line
timeline = as.Date(index(x))

# Raster info
proj_str = projection(x)
coord = data.frame(coordinates(x))
names(coord) = c("longitude","latitude")

fun = function(i){

if(!silent) print(paste0("Procesing chunk ",i,"/",threads[length(threads)]))

# Get twdtwTimeSeries from raster
cells = cellFromRow(x@timeseries[[1]], blocks$row[i]:blocks$nrows[i])
ts = getTimeSeries(x, y = coord[cells,], proj4string = proj_str)

# Apply TWDTW analysis
twdtw_results = twdtwApply(ts, y=y, weight.fun=weight.fun, dist.method=dist.method,
step.matrix=step.matrix, n=n, span=span,
min.length=min.length, theta=theta, keep=FALSE)

# Get best mathces for each point, period, and pattern
m = length(levels)
h = length(breaks)-1
A = lapply(as.list(twdtw_results), FUN=.lowestDistances, m=m, n=h, levels=levels, breaks=breaks, overlap=overlap, fill=9999)

# Reshape list to array
A = sapply(A, matrix, nrow=h, ncol=m, simplify = 'array')

# Write raster files
lapply(seq_along(levels), function(l) writeValues(b_files[[levels[l]]], matrix(t(A[,l,]),ncol=h), blocks$row[i]))
}

# Apply TWDTW analysis
out.list = lapply(threads, FUN = fun)

# Close raster files
b_files = lapply(b_files, writeStop)

# Create brick list for output
out = lapply(sapply(b_files, filename), brick)

# Save output timeline
timeline = breaks[-1]
write(as.character(timeline), file = paste(filepath, "timeline", sep="/"))

new("twdtwRaster", timeseries = out, timeline=timeline, layers = names(out))
}




# Prallel raster processing
apply_raster_parallel <- function(x, fun, A, filepath="", ...) {

# Set blocks Multi-thread parameters
# minblocks <- round(nrow(x) * ncol(x) / chunk.size)
# blocks <- blockSize(x@timeseries[[1]], minblocks = minblocks)
# threads <- seq(1, blocks$n)
breaks, overlap, filepath, silent, ...){

# Match raster bands to pattern bands
raster_bands <- coverages(x)
pattern_names <- names(y@timeseries[[1]])
matching_bands <- pattern_names[pattern_names %in% raster_bands]
pattern_bands <- names(y@timeseries[[1]])
matching_bands <- pattern_bands[pattern_bands %in% raster_bands]

if(length(matching_bands) < 1)
stop(paste0("Bands from twdtwRaster do not match the bands from patterns"))
Expand All @@ -222,66 +136,61 @@ apply_raster_parallel <- function(x, fun, A, filepath="", ...) {
names(levels) <- levels

# Create output raster objects
# if(is.null(filepath))
# filepath = paste0(getwd(), "/twdtw_results")
r_template <- brick(x@timeseries[[1]], nl = length(breaks) - 1, values = FALSE)
out <- rep(list(r_template), length(levels))
names(out) <- names(levels)
# dir.create(filepath, showWarnings = FALSE, recursive = TRUE)
# filename = paste0(filepath,"/twdtw_distance_from_",levels)
# names(filename) = levels
# b_files = lapply(filename, function(i) writeStart(r_template, filename=i, format="GTiff", overwrite=TRUE))
# b_files = lapply(filename, function(i) writeStart(r_template, filename=i, ...))

# out <- brick(x@timeseries[[1]], nl=length(breaks)-1, values=FALSE)
# names(out) <- names(x)

cl <- getCluster()
on.exit( returnCluster() )

nodes <- length(cl)

bs <- blockSize(x, minblocks = nodes*4)
bs$array_rows <- cumsum(c(1, bs$nrows*out@ncols))
bs$array_rows <- cumsum(c(1, bs$nrows*out[[1]]@ncols))
pb <- pbCreate(bs$n)

clusterExport(cl = cl, list = c("y", "weight.fun", "dist.method", "step.matrix", "n", "span",
"min.length", "theta", "breaks", "overlap"), envir = environment())

clFun <- function(k){

# Get raster data
# v <- lapply(x@timeseries, getValues, row = 1, nrows = 2)
v <- lapply(x, getValues, row = bs$row[k], nrows = bs$nrows[k])
# Get raster data
v <- lapply(x@timeseries, getValues, row = bs$row[k], nrows = bs$nrows[k])

# Create twdtwTimeSeries
ts <- twdtwTimeSeries(lapply(1:nrow(v[[1]]), function(i){
# Get time series dates
# Create twdtwTimeSeries
ts <- dtwSat::twdtwTimeSeries(lapply(1:nrow(v[[1]]), function(i){
# Get time series dates
if(any(names(v) %in% "doy")){
timeline <- getDatesFromDOY(year = format(index(x), "%Y"), doy = v[["doy"]][i,])
timeline <- dtwSat::getDatesFromDOY(year = format(dtwSat::index(x), "%Y"), doy = v[["doy"]][i,])
} else {
timeline <- index(x)
timeline <- dtwSat::index(x)
}
# Tag duplicate dates for removal
k = !duplicated(timeline)
# Create multi band time series
zoo(sapply(v[!(names(v) %in% "doy")], function(v) v[i, k, drop = FALSE]), timeline[k])
# Tag duplicate dates for removal
s = !duplicated(timeline)
# Create multi band time series
zoo::zoo(sapply(v[!(names(v) %in% "doy")], function(v) v[i, s, drop = FALSE]), timeline[s])
}))

# Apply TWDTW analysis
twdtw_results = twdtwApply(ts, y=y, weight.fun=weight.fun, dist.method=dist.method,
step.matrix=step.matrix, n=n, span=span,
min.length=min.length, theta=theta, keep=FALSE)

# Get best mathces for each point, period, and pattern
m = length(levels)
h = length(breaks)-1

A <- lapply(as.list(twdtw_results), FUN=.lowestDistances, m=m, n=h, levels=levels, breaks=breaks, overlap=overlap, fill=9999)

# Apply TWDTW analysis
twdtw_results <- dtwSat::twdtwApply(x = ts, y = y, weight.fun = weight.fun, dist.method = dist.method,
step.matrix = step.matrix, n = n, span = span,
min.length = min.length, theta = theta, keep = FALSE)

# Get best mathces for each point, period, and pattern
levels <- dtwSat::levels(y)
m <- length(levels)
h <- length(breaks)-1

A <- lapply(as.list(twdtw_results), FUN = .lowestDistances, m = m, n = h,
levels = levels, breaks = breaks, overlap = overlap, fill = 9999)
B <- as.list(data.frame(do.call("rbind", A)))
names(B) <- levels
lapply(B, function(m) matrix(as.numeric(m), nrow = length(A), ncol = nrow(A[[1]]), byrow = TRUE))
}

# get all nodes going
for (k in 1:nodes) {
sendCall(cl[[k]], clFun, list(k, A), tag = k)
sendCall(cl[[k]], clFun, list(k), tag = k)
}

filepath <- trim(filepath)
Expand All @@ -290,23 +199,22 @@ apply_raster_parallel <- function(x, fun, A, filepath="", ...) {
filename <- paste0(filepath, "/", names(out), ".grd")
} else if (!canProcessInMemory(r_template, n = length(breaks))) {
filename <- sapply(names(out), rasterTmpFile)
}
}
# filename <- sapply(names(out), rasterTmpFile)

if (!is.null(filename)) {
# out <- writeStart(out, filename = filename, ... )
out <- lapply(names(out), function(i) writeStart(out[[i]], filename = filename[i], ...))
} else {
# vv <- matrix(ncol=nrow(out), nrow=ncol(out))
# vv <- matrix(out, ncol=nlayers(out))
vv <- lapply(names(out), function(i) matrix(out[[i]], ncol = nlayers(out[[i]])))
names(vv) <- names(out)
}

for (k in 1:bs$n) {
# receive results from a node
d <- recvOneData(cl)

# error?
if (! d$value$success) {
if (!d$value$success) {
stop('cluster error')
}

Expand All @@ -315,34 +223,33 @@ apply_raster_parallel <- function(x, fun, A, filepath="", ...) {
cat('received block: ',b," / ",bs$n,'\n'); flush.console();

if (!is.null(filename)) {
# out <- writeValues(out, d$value$value, bs$row[b])
out <- lapply(seq_along(levels), function(l) writeValues(out[[l]], d$value$value[[l]], bs$row[b]))
} else {
# cols <- bs$row[b]:(bs$row[b] + bs$nrows[b]-1)
# vv[,cols] <- matrix(d$value$value, nrow=out@ncols)
rows <- seq(from = bs$array_rows[b], by = 1, length.out = bs$nrows[b]*out@ncols)
vv <- lapply(seq_along(levels), function(l) vv[[l]][rows,] <- d$value$value[[l]])
rows <- seq(from = bs$array_rows[b], by = 1, length.out = bs$nrows[b]*out[[1]]@ncols)
for(l in seq_along(levels)){
vv[[l]][rows,] <- d$value$value[[l]]
}
}

# need to send more data?
ni <- nodes + k
if (ni <= bs$n) {
sendCall(cl[[d$node]], clFun, list(ni, A), tag = ni)
}
sendCall(cl[[d$node]], clFun, list(ni), tag = ni)
}
pbStep(pb)
}
if (!is.null(filename)) {
# out <- writeStop(out)
out <- lapply(out, writeStop)
} else {
# out <- setValues(out, as.vector(vv))
out <- lapply(names(out), function(i) setValues(out[[i]], values = as.vector(vv[[i]])))
out <- lapply(seq_along(levels), function(i) setValues(out[[i]], values = vv[[i]]))
names(out) <- names(vv)
}
pbClose(pb)

return(out)
new("twdtwRaster", timeseries = out, timeline = breaks[-1], layers = levels)

}

.lowestDistances2 = function(x, m, n, levels, breaks, overlap, fill){
t(.bestmatches(x, m, n, levels, breaks, overlap, fill)$AM)
}



2 changes: 1 addition & 1 deletion R/twdtwAssess.R
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ NULL
#'
#' # Assess classification
#' twdtw_assess = twdtwAssess(object = r_lucc, y = validation_samples,
#' proj4string = proj_str, conf.int = .95, rm.nosample=TRUE)
#' proj4string = proj_str, conf.int = .95, rm.nosample = TRUE)
#' twdtw_assess
#'
#' # Plot assessment
Expand Down
2 changes: 2 additions & 0 deletions R/zzz.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
#' @importFrom lubridate month month<- day day<- year year<-
#' @importFrom caret createDataPartition
#' @importFrom xtable xtable print.xtable
#' @importFrom utils packageDescription flush.console
#' @importFrom snow clusterExport sendCall recvOneData
#' @useDynLib dtwSat, .registration = TRUE
#'
NULL
Expand Down
Loading

0 comments on commit 904022a

Please sign in to comment.