Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package nextflow.executor

import nextflow.fusion.FusionHelper

import java.nio.file.Path

import groovy.transform.CompileStatic
Expand Down Expand Up @@ -128,6 +130,8 @@ abstract class Executor {
}

boolean isForeignFile(Path path) {
if ( isFusionEnabled() && path.scheme in FusionHelper.SUPPORTED_CLOUD_SCHEMES )
return false
path.scheme != getStageDir().scheme
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package nextflow.fusion

import java.nio.file.Path

/**
* Allow importing platform specific env variables in the Fusion context
*
Expand All @@ -25,4 +27,6 @@ package nextflow.fusion
interface FusionEnv {

Map<String,String> getEnvironment(String scheme, FusionConfig config)

Map<String,String> getEnvironmentFromPath(Path path, FusionConfig config)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.exception.ReportWarningException
import nextflow.plugin.Plugins

import java.nio.file.Path

/**
* Provider strategy for {@link FusionEnv}
*
Expand Down Expand Up @@ -49,13 +52,27 @@ class FusionEnvProvider {
result.FUSION_CACHE_SIZE = "${config.cacheSize().toMega()}M"
return result
}
Map<String,String> getEnvironmentFromPath(Path p) {
final config = FusionConfig.getConfig()
final list =Plugins.getExtensions(FusionEnv)
log.debug "Fusion environment extensions=$list"
final result = new HashMap<String,String>()
for( FusionEnv it : list ) {
final env = it.getEnvironmentFromPath(p,config)
log.debug "Env for $p: $env"
if( env )
result.putAll(env)
}
return result
}

protected Map<String,String> getFusionEnvironment(String scheme, FusionConfig config) {
final list = Plugins.getExtensions(FusionEnv)
log.debug "Fusion environment extensions=$list"
final result = new HashMap<String,String>()
for( FusionEnv it : list ) {
final env = it.getEnvironment(scheme,config)
log.debug "Env for $scheme: $env"
if( env )
result.putAll(env)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import nextflow.io.BucketParser
*/
@CompileStatic
class FusionHelper {
public static List<String> SUPPORTED_CLOUD_SCHEMES = ['s3', 'az']

@Memoized
static boolean isFusionEnabled(Session session) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class FusionScriptLauncher extends BashWrapperBuilder {
private String scheme
private Path remoteWorkDir
private Map<String,String> env
private List<Path> externalInputs

static FusionScriptLauncher create(TaskBean bean, String scheme) {

Expand All @@ -50,9 +51,16 @@ class FusionScriptLauncher extends BashWrapperBuilder {
bean.workDir = toContainerMount(bean.workDir, scheme)
bean.targetDir = toContainerMount(bean.targetDir, scheme)

final externalInputs = new LinkedList<Path>()
// remap input files to container mounted paths
for( Map.Entry<String,Path> entry : new HashMap<>(bean.inputFiles).entrySet() ) {
bean.inputFiles.put( entry.key, toContainerMount(entry.value, scheme) )
def inputScheme = scheme
if (entry.value.scheme && entry.value.scheme != scheme){
inputScheme = entry.value.scheme
externalInputs.add(entry.value)
}

bean.inputFiles.put( entry.key, toContainerMount(entry.value, inputScheme) )
}

// make it change to the task work dir
Expand All @@ -61,14 +69,15 @@ class FusionScriptLauncher extends BashWrapperBuilder {
if( bean.scratch==null )
bean.scratch = false

return new FusionScriptLauncher(bean, scheme, remoteWorkDir)
return new FusionScriptLauncher(bean, scheme, remoteWorkDir, externalInputs)
}

FusionScriptLauncher(TaskBean bean, String scheme, Path remoteWorkDir) {
FusionScriptLauncher(TaskBean bean, String scheme, Path remoteWorkDir, List<Path> external = List.of()) {
super(bean)
// keep track the google storage work dir
this.scheme = scheme
this.remoteWorkDir = remoteWorkDir
this.externalInputs = external
}

static protected String headerScript(TaskBean bean) {
Expand All @@ -87,6 +96,9 @@ class FusionScriptLauncher extends BashWrapperBuilder {
// foreign env
final provider = new FusionEnvProvider()
result.putAll(provider.getEnvironment(scheme))
for (Path p: this.externalInputs){
result.putAll(provider.getEnvironmentFromPath(p))
}
env = result
}
return env
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import nextflow.cloud.aws.config.AwsConfig
import nextflow.fusion.FusionConfig
import nextflow.fusion.FusionEnv
import org.pf4j.Extension

import java.nio.file.Path

/**
* Implements {@link FusionEnv} for AWS cloud
*
Expand Down Expand Up @@ -71,4 +74,8 @@ class AwsFusionEnv implements FusionEnv {
else
return List.<String>of()
}

Map<String, String> getEnvironmentFromPath(Path path, FusionConfig config) {
return getEnvironment(path.scheme,config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import nextflow.fusion.FusionConfig
import nextflow.fusion.FusionEnv
import org.pf4j.Extension

import java.nio.file.Path

/**
* Implement environment provider for Azure specific variables
*
Expand All @@ -47,7 +49,14 @@ class AzFusionEnv implements FusionEnv {
*/
@Override
Map<String, String> getEnvironment(String scheme, FusionConfig config) {
if (scheme != 'az') {
if (scheme != 'az') {
return Collections.<String, String> emptyMap()
}
return getEnvironmentFromPath(Global.session.workDir, config)
}

Map<String, String> getEnvironmentFromPath(Path workDir, FusionConfig config) {
if (workDir.scheme != 'az') {
return Collections.<String, String> emptyMap()
}

Expand Down Expand Up @@ -76,7 +85,7 @@ class AzFusionEnv implements FusionEnv {
}

// If no managed identity, use the standard environment with SAS token
result.AZURE_STORAGE_SAS_TOKEN = getOrCreateSasToken()
result.AZURE_STORAGE_SAS_TOKEN = getOrCreateSasToken(workDir)

return result
}
Expand All @@ -85,7 +94,7 @@ class AzFusionEnv implements FusionEnv {
* Return the SAS token if it is defined in the configuration, otherwise generate one based on the requested
* authentication method.
*/
synchronized String getOrCreateSasToken() {
synchronized String getOrCreateSasToken(Path workDir = Global.session.workDir) {
final cfg = AzConfig.config

// Check for incompatible configuration
Expand All @@ -101,10 +110,10 @@ class AzFusionEnv implements FusionEnv {
// For Active Directory and Managed Identity, we cannot generate an *account* SAS token, but we can generate
// a *container* SAS token for the work directory.
if (cfg.activeDirectory().isConfigured() || cfg.managedIdentity().isConfigured()) {
return AzHelper.generateContainerSasWithActiveDirectory(Global.session.workDir, cfg.storage().tokenDuration)
return AzHelper.generateContainerSasWithActiveDirectory(workDir, cfg.storage().tokenDuration)
}

// Shared Key authentication can use an account SAS token
return AzHelper.generateAccountSasWithAccountKey(Global.session.workDir, cfg.storage().tokenDuration)
return AzHelper.generateAccountSasWithAccountKey(workDir, cfg.storage().tokenDuration)
}
}
Loading