Skip to content

Commit

Permalink
Batch related functions separed to a module
Browse files Browse the repository at this point in the history
  • Loading branch information
kjvbrt committed Jan 8, 2025
1 parent b9b8422 commit 1a2d1a8
Show file tree
Hide file tree
Showing 10 changed files with 421 additions and 652 deletions.
35 changes: 31 additions & 4 deletions bin/fccanalysis
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Starting point (executable) for fccanalysis command

import argparse
import logging
import sys

from parsers import setup_subparsers
from init_analysis import init_analysis
Expand All @@ -17,6 +18,7 @@ from do_plots import do_plots
from do_combine import do_combine


# _____________________________________________________________________________
class MultiLineFormatter(logging.Formatter):
'''
Multi-line formatter.
Expand All @@ -42,6 +44,24 @@ class MultiLineFormatter(logging.Formatter):
return head + ''.join(indent + line for line in trailing)


# _____________________________________________________________________________
class MaxLevelFilter(logging.Filter):
'''
Filters (lets through) all messages with level < LEVEL
Found here:
https://stackoverflow.com/questions/1383254/
'''
def __init__(self, level):
self.level = level

def filter(self, record):
return record.levelno < self.level # "<" instead of "<=": since
# logger.setLevel is inclusive,
# this should be exclusive


# _____________________________________________________________________________
def main():
'''
Starting point for fccanalysis command
Expand Down Expand Up @@ -78,10 +98,16 @@ def main():
logger = logging.getLogger('FCCAnalyses')
logger.setLevel(verbosity_level)
formatter = MultiLineFormatter(fmt='----> %(levelname)s: %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setLevel(verbosity_level)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
stdout_stream_handler = logging.StreamHandler(sys.stdout)
stderr_stream_handler = logging.StreamHandler(sys.stderr)
lower_than_warning_filter = MaxLevelFilter(logging.WARNING)
stdout_stream_handler.addFilter(lower_than_warning_filter)
stdout_stream_handler.setLevel(verbosity_level)
stderr_stream_handler.setLevel(logging.WARNING)
stdout_stream_handler.setFormatter(formatter)
stderr_stream_handler.setFormatter(formatter)
logger.addHandler(stdout_stream_handler)
logger.addHandler(stderr_stream_handler)

if args.command == 'init':
init_analysis(parser)
Expand All @@ -101,5 +127,6 @@ def main():
run(parser)


# _____________________________________________________________________________
if __name__ == "__main__":
main()
6 changes: 3 additions & 3 deletions examples/FCCee/higgs/mH-recoil/mumu/analysis_stage1_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ def __init__(self, cmdline_args):

# Optional: output directory on eos, if specified files will be copied
# there once the batch job is done, default is empty
self.output_dir_eos = '/eos/experiment/fcc/ee/analyses/case-studies/' \
f'higgs/mH-recoil/stage1_{self.ana_args.muon_pt}'
# self.output_dir_eos = '/eos/experiment/fcc/ee/analyses/case-studies/' \
# f'higgs/mH-recoil/stage1_{self.ana_args.muon_pt}'

# Optional: type for eos, needed when <outputDirEos> is specified. The
# default is FCC EOS, which is eospublic
self.eos_type = 'eospublic'
# self.eos_type = 'eospublic'

# Optional: test file
self.test_file = 'root://eospublic.cern.ch//eos/experiment/fcc/ee/' \
Expand Down
283 changes: 283 additions & 0 deletions python/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
'''
Submitting to the HTCondor batch system.
'''

import os
import sys
import time
import shutil
import json
import logging
import subprocess
import datetime
import numpy as np

import ROOT # type: ignore
from anascript import get_element, get_element_dict, get_attribute
from process import get_process_info, get_entries_sow


ROOT.gROOT.SetBatch(True)

LOGGER = logging.getLogger('FCCAnalyses.batch')


# _____________________________________________________________________________
def determine_os(local_dir: str) -> str | None:
'''
Determines platform on which FCCAnalyses was compiled
'''
cmake_config_path = local_dir + '/build/CMakeFiles/CMakeConfigureLog.yaml'
if not os.path.isfile(cmake_config_path):
LOGGER.warning('CMake configuration file was not found!\n'
'Was FCCAnalyses properly build?')
return None

with open(cmake_config_path, 'r', encoding='utf-8') as cmake_config_file:
cmake_config = cmake_config_file.read()
if 'centos7' in cmake_config:
return 'centos7'
if 'almalinux9' in cmake_config:
return 'almalinux9'

return None


# _____________________________________________________________________________
def create_condor_config(log_dir: str,
process_name: str,
build_os: str | None,
rdf_module,
subjob_scripts: list[str]) -> str:
'''
Creates contents of HTCondor submit description file.
'''
cfg = 'executable = $(scriptfile)\n'

cfg += f'log = {log_dir}/condor_job.{process_name}.'
cfg += '$(ClusterId).log\n'

cfg += f'output = {log_dir}/condor_job.{process_name}.'
cfg += '$(ClusterId).$(ProcId).out\n'

cfg += f'error = {log_dir}/condor_job.{process_name}.'
cfg += '$(ClusterId).$(ProcId).error\n'

cfg += 'getenv = False\n'

cfg += f'environment = "LS_SUBCWD={log_dir}"\n' # not sure

cfg += 'requirements = ( '
if build_os == 'centos7':
cfg += '(OpSysAndVer =?= "CentOS7") && '
if build_os == 'almalinux9':
cfg += '(OpSysAndVer =?= "AlmaLinux9") && '
if build_os is None:
LOGGER.warning('Submitting jobs to default operating system. There '
'may be compatibility issues.')
cfg += '(Machine =!= LastRemoteHost) && (TARGET.has_avx2 =?= True) )\n'

cfg += 'on_exit_remove = (ExitBySignal == False) && (ExitCode == 0)\n'

cfg += 'max_retries = 3\n'

cfg += '+JobFlavour = "%s"\n' % get_element(rdf_module, 'batch_queue')

cfg += '+AccountingGroup = "%s"\n' % get_element(rdf_module, 'comp_group')

cfg += 'RequestCpus = %i\n' % get_element(rdf_module, "n_threads")

cfg += 'should_transfer_files = yes\n'
cfg += 'when_to_transfer_output = on_exit\n'

cfg += f'transfer_output_files = $(outfile)\n\n'

# add user batch configuration if any
user_batch_config = get_attribute(rdf_module, 'user_batch_config', None)
if user_batch_config is not None:
if not os.path.isfile(user_batch_config):
LOGGER.warning('userBatchConfig file can\'t be found! Will not '
'add it to the default config.')
else:
with open(user_batch_config, 'r', encoding='utf-8') as cfgfile:
for line in cfgfile:
cfg += line + '\n'
cfg += '\n\n'

output_dir = get_attribute(rdf_module, 'output_dir', None)
output_path = os.path.join(output_dir, process_name)
cfg += 'queue scriptfile, outfile from (\n'
for idx, scriptfile in enumerate(subjob_scripts):
cfg += f' {scriptfile}, {output_path}/chunk_{idx}.root\n'
cfg += ')\n'

return cfg


# _____________________________________________________________________________
def create_subjob_script(local_dir: str,
analysis,
process_name: str,
chunk_num: int,
chunk_list: list[list[str]],
anapath: str,
cmd_args) -> str:
'''
Creates sub-job script to be run.
'''

output_dir = get_attribute(analysis, 'output_dir', None)

scr = '#!/bin/bash\n\n'
scr += 'source ' + local_dir + '/setup.sh\n\n'

# scr += f'mkdir job_{process_name}_chunk_{chunk_num}\n'
# scr += f'cd job_{process_name}_chunk_{chunk_num}\n\n'

output_path = os.path.join(output_dir, process_name,
f'chunk_{chunk_num}.root')

scr += 'which fccanalysis\n'
#scr += local_dir
scr += f'fccanalysis run {anapath} --batch'
scr += f' --output {output_path}'
if cmd_args.ncpus > 0:
scr += ' --ncpus ' + get_element(rdf_module, "n_threads")
if len(cmd_args.unknown) > 0:
scr += ' ' + ' '.join(cmd_args.unknown)
scr += ' --files-list'
for file_path in chunk_list[chunk_num]:
scr += f' {file_path}'
scr += '\n\n'

# output_dir_eos = get_attribute(analysis, 'output_dir_eos', None)
# if not os.path.isabs(output_dir) and output_dir_eos is None:
# final_dest = os.path.join(local_dir, output_dir, process_name,
# f'chunk_{chunk_num}.root')
# scr += f'cp {output_path} {final_dest}\n'

# if output_dir_eos is not None:
# eos_type = get_attribute(analysis, 'eos_type', 'eospublic')

# final_dest = os.path.join(output_dir_eos,
# process_name,
# f'chunk_{chunk_num}.root')
# final_dest = f'root://{eos_type}.cern.ch/' + final_dest
# scr += f'xrdcp {output_path} {final_dest}\n'

scr += f'ls -alh {output_path}\n'
scr += 'pwd\n'
scr += f'find "$PWD" -name *.root\n'

return scr


# _____________________________________________________________________________
def submit_job(cmd: str, max_trials: int) -> bool:
'''
Submit job to condor, retry `max_trials` times.
'''
for i in range(max_trials):
with subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True) as proc:
(stdout, stderr) = proc.communicate()

if proc.returncode == 0 and len(stderr) == 0:
LOGGER.info(stdout)
LOGGER.info('GOOD SUBMISSION')
return True

LOGGER.warning('Error while submitting, retrying...\n '
'Trial: %i / %i\n Error: %s',
i, max_trials, stderr)
time.sleep(10)

LOGGER.error('Failed submitting after: %i trials!', max_trials)
return False


# _____________________________________________________________________________
def send_to_batch(args, analysis, chunk_list, sample_name, anapath: str):
'''
Send jobs to HTCondor batch system.
'''
local_dir = os.environ['LOCAL_DIR']
current_date = datetime.datetime.fromtimestamp(
datetime.datetime.now().timestamp()).strftime('%Y-%m-%d_%H-%M-%S')
# log_dir = os.path.join(local_dir, 'BatchOutputs', current_date,
# sample_name)
log_dir = os.path.join('BatchOutputs', current_date,
sample_name)
if not os.path.exists(log_dir):
os.system(f'mkdir -p {log_dir}')

# Making sure the FCCAnalyses libraries are compiled and installed
try:
subprocess.check_output(['make', 'install'],
cwd=local_dir+'/build',
stderr=subprocess.DEVNULL
)
except subprocess.CalledProcessError:
LOGGER.error('The FCCanalyses libraries are not properly build and '
'installed!\nAborting job submission...')
sys.exit(3)

subjob_scripts = []
for ch_num in range(len(chunk_list)):
subjob_script_path = os.path.join(
log_dir,
f'job_{sample_name}_chunk_{ch_num}.sh')
subjob_scripts.append(subjob_script_path)

for i in range(3):
try:
with open(subjob_script_path, 'w', encoding='utf-8') as ofile:
subjob_script = create_subjob_script(local_dir,
analysis,
sample_name,
ch_num,
chunk_list,
anapath,
args)
ofile.write(subjob_script)
except IOError as err:
if i < 2:
LOGGER.warning('I/O error(%i): %s',
err.errno, err.strerror)
else:
LOGGER.error('I/O error(%i): %s', err.errno, err.strerror)
sys.exit(3)
else:
break
time.sleep(10)
subprocess.getstatusoutput(f'chmod u+x {subjob_script_path}')

LOGGER.debug('Sub-job scripts to be run:\n - %s',
'\n - '.join(subjob_scripts))

condor_config_path = f'{log_dir}/job_desc_{sample_name}.cfg'

for i in range(3):
try:
with open(condor_config_path, 'w', encoding='utf-8') as cfgfile:
condor_config = create_condor_config(log_dir,
sample_name,
determine_os(local_dir),
analysis,
subjob_scripts)
cfgfile.write(condor_config)
except IOError as err:
LOGGER.warning('I/O error(%i): %s', err.errno, err.strerror)
if i == 2:
sys.exit(3)
else:
break
time.sleep(10)
# subprocess.getstatusoutput(f'chmod u+x {condor_config_path}')

batch_cmd = f'condor_submit -spool {condor_config_path}'
LOGGER.info('Batch command:\n %s', batch_cmd)
success = submit_job(batch_cmd, 10)
if not success:
sys.exit(3)
2 changes: 1 addition & 1 deletion python/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def setup_run_parser(parser):
parser.add_argument('--files-list', default=[], nargs='+',
help='specify input file(s) to bypass the processList')
parser.add_argument(
'--output',
'-o', '--output',
type=str,
default='output.root',
help='specify output file name to bypass the processList and or '
Expand Down
Loading

0 comments on commit 1a2d1a8

Please sign in to comment.