Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Safe) Implementation of the token-safe retry logic for gfal #12218

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
7 changes: 5 additions & 2 deletions src/python/WMCore/Storage/Backends/AWSS3Impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from __future__ import division

import os
import logging
from WMCore.Storage.StageOutImpl import StageOutImpl
from WMCore.Storage.Registry import registerStageOutImpl

Expand All @@ -28,11 +29,13 @@ def createSourceName(self, protocol, pfn):
"""
return "%s" % pfn

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None, authMethod=None, forceMethod=False):
"""
_createStageOutCommand_
Build an aws s3 copy command
"""
logging.info("Warning! AWSS3Impl does not support authMethod handling")

result = "#!/bin/sh\n"

copyCommand = "aws s3 cp"
Expand All @@ -57,7 +60,7 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No

return result

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None, authMethod=None, forceMethod=False):
"""
Debug a failed aws s3 copy command for stageOut, without re-running it,
providing information on the environment and the certifications
Expand Down
7 changes: 5 additions & 2 deletions src/python/WMCore/Storage/Backends/CPImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import os
import errno
import logging

from WMCore.Storage.Registry import registerStageOutImpl
from WMCore.Storage.StageOutImpl import StageOutImpl
Expand Down Expand Up @@ -49,13 +50,15 @@ def createOutputDirectory(self, targetPFN):
raise
return

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None, authMethod=None, forceMethod=False):
"""
_createStageOutCommand_

Build the actual cp stageout command

"""
logging.info("Warning! CPImpl does not support authMethod handling")

copyCommand = ""

if self.stageIn:
Expand All @@ -82,7 +85,7 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No

return copyCommand

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None, authMethod=None, forceMethod=False):
"""
Debug a failed cp command for stageOut, without re-running it,
providing information on the environment and the certifications
Expand Down
6 changes: 4 additions & 2 deletions src/python/WMCore/Storage/Backends/FNALImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from __future__ import print_function

import os
import logging

from WMCore.Storage.Backends.LCGImpl import LCGImpl
from WMCore.Storage.Registry import registerStageOutImpl
Expand Down Expand Up @@ -84,12 +85,13 @@ def createSourceName(self, protocol, pfn):
return self.srmImpl.createSourceName(protocol, pfn)
return pfn

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None, authMethod=None, forceMethod=False):
"""
_createStageOutCommand_

Build a mkdir to generate the directory
"""
logging.info("Warning! FNALImpl does not support authMethod handling")

if getattr(self, 'stageIn', False):
return self.buildStageInCommand(sourcePFN, targetPFN, options)
Expand Down Expand Up @@ -137,7 +139,7 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No
"""
return result

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None, authMethod=None, forceMethod=False):
"""
Debug a failed fnal-flavor copy command for stageOut, without re-running it,
providing information on the environment and the certifications
Expand Down
113 changes: 79 additions & 34 deletions src/python/WMCore/Storage/Backends/GFAL2Impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from WMCore.Storage.Registry import registerStageOutImpl
from WMCore.Storage.StageOutImpl import StageOutImpl

_CheckExitCodeOption = True


class GFAL2Impl(StageOutImpl):
"""
Expand All @@ -24,10 +22,20 @@ def __init__(self, stagein=False):
# Next commands after separation are executed without env -i and this leads us with
# mixed environment with COMP and system python.
# GFAL2 is not build under COMP environment and it had failures with mixed environment.
self.setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'"
self.removeCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}')

self.setAuthX509 = "X509_USER_PROXY=$X509_USER_PROXY"
self.setAuthToken = "BEARER_TOKEN_FILE=$BEARER_TOKEN_FILE BEARER_TOKEN=$(cat $BEARER_TOKEN_FILE)"
self.unsetX509 = "unset X509_USER_PROXY;"
self.unsetToken = "unset BEARER_TOKEN;"

# These commands are parameterized according to:
# 1. authentication method (set_auth)
# 2. forced authentication method (unset_auth)
# 3. finally, debug mode or not (dry_run)
self.setups = "env -i {{set_auth}} JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'"
self.copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure {checksum} {options} {source} {destination}'
self.copyCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + self.copyOpts)
self.copyCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; {unset_auth} date; {dry_run} gfal-copy ' + self.copyOpts)
self.removeCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; {unset_auth} date; {dry_run} gfal-rm -t 600 {}')

def createFinalPFN(self, pfn):
"""
Expand Down Expand Up @@ -64,32 +72,51 @@ def createOutputDirectory(self, targetPFN):
"""
return

def createRemoveFileCommand(self, pfn):
def createRemoveFileCommand(self, pfn, authMethod=None, forceMethod=False, dryRun=False):
"""
_createRemoveFileCommand_
handle file remove using gfal-rm

gfal-rm options used:
-t global timeout for the execution of the command.
Command is interrupted if time expires before it finishes
"""
if os.path.isfile(pfn):
return "/bin/rm -f {}".format(os.path.abspath(pfn))
:pfn: str, destination PFN
:forceMethod: bool to isolate and force a given authentication method
:dryRun: bool, dry run mode (to enable debug mode)
"""
if authMethod == 'X509':
set_auth = self.setAuthX509
unset_auth = self.unsetToken if forceMethod else ""
elif authMethod == 'TOKEN':
set_auth = self.setAuthToken
unset_auth = self.unsetToken if forceMethod else ""
else:
return self.removeCommand.format(self.createFinalPFN(pfn))
set_auth = ''
unset_auth = ''
dryRun = 'echo ' if dryRun else ''

if os.path.isfile(pfn):
return "{} /bin/rm -f {}".format(dryRun, os.path.abspath(pfn))
else:
#return self.removeCommand.format(self.createFinalPFN(pfn), unset_auth=forceMethod, dry_run=dryRun)
return self.removeCommand.format(self.createFinalPFN(pfn), set_auth=set_auth, unset_auth=unset_auth, dry_run=dryRun)

def buildCopyCommandDict(self, sourcePFN, targetPFN, options=None, checksums=None):
def buildCopyCommandDict(self, sourcePFN, targetPFN, options=None, checksums=None,
authMethod='X509', forceMethod=False, dryRun=False):
"""
Build the gfal-cp command for stageOut
Build the gfal-copy command for stageOut

:sourcePFN: str, PFN of the source file
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
:authMethod: string with the authentication method to use (either 'X509' or 'TOKEN')
:forceMethod: bool to isolate and force a given authentication method
:dryRun: bool, dry run mode (to enable debug mode)
:returns: a dictionary with specific parameters to be formatted in the commands
"""

copyCommandDict = {'checksum': '', 'options': '', 'source': '', 'destination': ''}
copyCommandDict = {'checksum': '', 'options': '', 'source': '', 'destination': '',
'set_auth': '', 'unset_auth': '', 'dry_run': ''}

useChecksum = (checksums is not None and 'adler32' in checksums and not self.stageIn)

Expand All @@ -111,37 +138,55 @@ def buildCopyCommandDict(self, sourcePFN, targetPFN, options=None, checksums=Non
copyCommandDict['source'] = self.createFinalPFN(sourcePFN)
copyCommandDict['destination'] = self.createFinalPFN(targetPFN)

if authMethod is None:
copyCommandDict['set_auth'] = ""
copyCommandDict['unset_auth'] = ""
elif authMethod.upper() == 'X509':
copyCommandDict['set_auth'] = self.setAuthX509
copyCommandDict['unset_auth'] = self.unsetToken if forceMethod else ""
elif authMethod.upper() == 'TOKEN':
copyCommandDict['set_auth'] = self.setAuthToken
copyCommandDict['unset_auth'] = self.unsetToken if forceMethod else ""

copyCommandDict['dry_run'] = 'echo ' if dryRun else ''

return copyCommandDict

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None,
authMethod=None, forceMethod=False):
"""
Create gfal-cp command for stageOut

:sourcePFN: str, PFN of the source file
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
:authMethod: string with the authentication method to use (either 'X509' or 'TOKEN')
:forceMethod: bool to isolate and force a given authentication method
returns: a string with the full stage out script
"""

copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums)
#raise ValueError(self.copyCommand)
copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums,
authMethod, forceMethod)
copyCommand = self.copyCommand.format_map(copyCommandDict)
result = "#!/bin/bash\n" + copyCommand

if _CheckExitCodeOption:
result += """
EXIT_STATUS=$?
echo "gfal-copy exit status: $EXIT_STATUS"
if [[ $EXIT_STATUS != 0 ]]; then
echo "ERROR: gfal-copy exited with $EXIT_STATUS"
echo "Cleaning up failed file:"
{remove_command}
fi
exit $EXIT_STATUS
""".format(remove_command=self.createRemoveFileCommand(targetPFN))
# add check for exit code
result += """
EXIT_STATUS=$?
echo "gfal-copy exit status: $EXIT_STATUS"
if [[ $EXIT_STATUS != 0 ]]; then
echo "ERROR: gfal-copy exited with $EXIT_STATUS"
echo "Cleaning up failed file:"
{remove_command}
fi
exit $EXIT_STATUS
""".format(remove_command=self.createRemoveFileCommand(targetPFN, authMethod=authMethod, forceMethod=forceMethod))

return result

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None,
authMethod=None, forceMethod=False):
"""
Debug a failed gfal-cp command for stageOut, without re-running it,
providing information on the environment and the certifications
Expand All @@ -152,7 +197,9 @@ def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=N
:checksums: dict, collect checksums according to the algorithms saved as keys
"""

copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums)
copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums,
authMethod, forceMethod, dryRun=True)

copyCommand = self.copyCommand.format_map(copyCommandDict)

result = "#!/bin/bash\n"
Expand Down Expand Up @@ -195,11 +242,9 @@ def removeFile(self, pfnToRemove):
_removeFile_
CleanUp pfn provided
"""
if os.path.isfile(pfnToRemove):
command = "/bin/rm -f {}".format(os.path.abspath(pfnToRemove))
else:
command = self.removeCommand.format(self.createFinalPFN(pfnToRemove))
command = self.createRemoveFileCommand(pfnToRemove)
self.executeCommand(command)


registerStageOutImpl("gfal2", GFAL2Impl)

7 changes: 5 additions & 2 deletions src/python/WMCore/Storage/Backends/LCGImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

"""
import os
import logging

from future.utils import viewitems

Expand Down Expand Up @@ -89,13 +90,15 @@ def createRemoveFileCommand(self, pfn):
else:
return StageOutImpl.createRemoveFileCommand(self, pfn)

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None, authMethod=None, forceMethod=False):
"""
_createStageOutCommand_

Build an srmcp command

"""
logging.info("Warning! LCGImpl does not support authMethod handling")

result = "#!/bin/sh\n"

# check if we should use the grid UI from CVMFS
Expand Down Expand Up @@ -199,7 +202,7 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No

return result

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None, authMethod=None, forceMethod=False):
"""
Debug a failed lcg-via smrv copy command for stageOut, without re-running it,
providing information on the environment and the certifications
Expand Down
7 changes: 5 additions & 2 deletions src/python/WMCore/Storage/Backends/SRMV2Impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import os
import re
import logging

from WMCore.Storage.Execute import runCommandWithOutput
from WMCore.Storage.Registry import registerStageOutImpl
Expand Down Expand Up @@ -113,7 +114,7 @@ def createRemoveFileCommand(self, pfn):
else:
return StageOutImpl.createRemoveFileCommand(self, pfn)

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None, authMethod=None, forceMethod=False):
"""
_createStageOutCommand_

Expand All @@ -125,6 +126,8 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No
-retry_num number of retries before before client gives up
-request_lifetime request lifetime in seconds
"""
logging.info("Warning! SRMV2Impl does not support authMethod handling")

result = "#!/bin/sh\n"
result += "REPORT_FILE=`pwd`/srm.report.$$\n"
result += "srmcp -2 -report=$REPORT_FILE -retry_num=0 -request_lifetime=2400"
Expand Down Expand Up @@ -191,7 +194,7 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No

return result

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None, authMethod=None, forceMethod=False):
"""
Debug a failed smrv2 copy command for stageOut, without re-running it,
providing information on the environment and the certifications
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def createOutputDirectory(self, targetPFN):
else:
print("=> dir already exists... do nothing.")

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None, authMethod=None, forceMethod=False):
"""
_createStageOutCommand_

Expand All @@ -96,7 +96,7 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No
print(result)
return result

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None, authMethod=None, forceMethod=False):
"""
Debug a failed copy command for stageOut, without re-running it,
providing information on the environment and the certifications
Expand Down
Loading