From 1a2d1a8f0dd346436534f0885005028ec640bf00 Mon Sep 17 00:00:00 2001 From: Juraj Smiesko Date: Wed, 8 Jan 2025 15:09:29 +0100 Subject: [PATCH] Batch related functions separed to a module --- bin/fccanalysis | 35 +- .../mH-recoil/mumu/analysis_stage1_batch.py | 6 +- python/batch.py | 283 ++++++++++++++ python/parsers.py | 2 +- python/process.py | 51 ++- python/run_analysis.py | 324 +--------------- python/run_fccanalysis.py | 347 +----------------- python/run_final_analysis.py | 2 +- python/{frame.py => utils.py} | 23 +- python/{frame.pyi => utils.pyi} | 0 10 files changed, 421 insertions(+), 652 deletions(-) create mode 100644 python/batch.py rename python/{frame.py => utils.py} (75%) rename python/{frame.pyi => utils.pyi} (100%) diff --git a/bin/fccanalysis b/bin/fccanalysis index 731a17fdcd9..0fb9af34177 100755 --- a/bin/fccanalysis +++ b/bin/fccanalysis @@ -5,6 +5,7 @@ Starting point (executable) for fccanalysis command import argparse import logging +import sys from parsers import setup_subparsers from init_analysis import init_analysis @@ -17,6 +18,7 @@ from do_plots import do_plots from do_combine import do_combine +# _____________________________________________________________________________ class MultiLineFormatter(logging.Formatter): ''' Multi-line formatter. @@ -42,6 +44,24 @@ class MultiLineFormatter(logging.Formatter): return head + ''.join(indent + line for line in trailing) +# _____________________________________________________________________________ +class MaxLevelFilter(logging.Filter): + ''' + Filters (lets through) all messages with level < LEVEL + + Found here: + https://stackoverflow.com/questions/1383254/ + ''' + def __init__(self, level): + self.level = level + + def filter(self, record): + return record.levelno < self.level # "<" instead of "<=": since + # logger.setLevel is inclusive, + # this should be exclusive + + +# _____________________________________________________________________________ def main(): ''' Starting point for fccanalysis command @@ -78,10 +98,16 @@ def main(): logger = logging.getLogger('FCCAnalyses') logger.setLevel(verbosity_level) formatter = MultiLineFormatter(fmt='----> %(levelname)s: %(message)s') - stream_handler = logging.StreamHandler() - stream_handler.setLevel(verbosity_level) - stream_handler.setFormatter(formatter) - logger.addHandler(stream_handler) + stdout_stream_handler = logging.StreamHandler(sys.stdout) + stderr_stream_handler = logging.StreamHandler(sys.stderr) + lower_than_warning_filter = MaxLevelFilter(logging.WARNING) + stdout_stream_handler.addFilter(lower_than_warning_filter) + stdout_stream_handler.setLevel(verbosity_level) + stderr_stream_handler.setLevel(logging.WARNING) + stdout_stream_handler.setFormatter(formatter) + stderr_stream_handler.setFormatter(formatter) + logger.addHandler(stdout_stream_handler) + logger.addHandler(stderr_stream_handler) if args.command == 'init': init_analysis(parser) @@ -101,5 +127,6 @@ def main(): run(parser) +# _____________________________________________________________________________ if __name__ == "__main__": main() diff --git a/examples/FCCee/higgs/mH-recoil/mumu/analysis_stage1_batch.py b/examples/FCCee/higgs/mH-recoil/mumu/analysis_stage1_batch.py index 25a29c0ea87..6f4a870c0a6 100644 --- a/examples/FCCee/higgs/mH-recoil/mumu/analysis_stage1_batch.py +++ b/examples/FCCee/higgs/mH-recoil/mumu/analysis_stage1_batch.py @@ -57,12 +57,12 @@ def __init__(self, cmdline_args): # Optional: output directory on eos, if specified files will be copied # there once the batch job is done, default is empty - self.output_dir_eos = '/eos/experiment/fcc/ee/analyses/case-studies/' \ - f'higgs/mH-recoil/stage1_{self.ana_args.muon_pt}' + # self.output_dir_eos = '/eos/experiment/fcc/ee/analyses/case-studies/' \ + # f'higgs/mH-recoil/stage1_{self.ana_args.muon_pt}' # Optional: type for eos, needed when is specified. The # default is FCC EOS, which is eospublic - self.eos_type = 'eospublic' + # self.eos_type = 'eospublic' # Optional: test file self.test_file = 'root://eospublic.cern.ch//eos/experiment/fcc/ee/' \ diff --git a/python/batch.py b/python/batch.py new file mode 100644 index 00000000000..5a38f19eda2 --- /dev/null +++ b/python/batch.py @@ -0,0 +1,283 @@ +''' +Submitting to the HTCondor batch system. +''' + +import os +import sys +import time +import shutil +import json +import logging +import subprocess +import datetime +import numpy as np + +import ROOT # type: ignore +from anascript import get_element, get_element_dict, get_attribute +from process import get_process_info, get_entries_sow + + +ROOT.gROOT.SetBatch(True) + +LOGGER = logging.getLogger('FCCAnalyses.batch') + + +# _____________________________________________________________________________ +def determine_os(local_dir: str) -> str | None: + ''' + Determines platform on which FCCAnalyses was compiled + ''' + cmake_config_path = local_dir + '/build/CMakeFiles/CMakeConfigureLog.yaml' + if not os.path.isfile(cmake_config_path): + LOGGER.warning('CMake configuration file was not found!\n' + 'Was FCCAnalyses properly build?') + return None + + with open(cmake_config_path, 'r', encoding='utf-8') as cmake_config_file: + cmake_config = cmake_config_file.read() + if 'centos7' in cmake_config: + return 'centos7' + if 'almalinux9' in cmake_config: + return 'almalinux9' + + return None + + +# _____________________________________________________________________________ +def create_condor_config(log_dir: str, + process_name: str, + build_os: str | None, + rdf_module, + subjob_scripts: list[str]) -> str: + ''' + Creates contents of HTCondor submit description file. + ''' + cfg = 'executable = $(scriptfile)\n' + + cfg += f'log = {log_dir}/condor_job.{process_name}.' + cfg += '$(ClusterId).log\n' + + cfg += f'output = {log_dir}/condor_job.{process_name}.' + cfg += '$(ClusterId).$(ProcId).out\n' + + cfg += f'error = {log_dir}/condor_job.{process_name}.' + cfg += '$(ClusterId).$(ProcId).error\n' + + cfg += 'getenv = False\n' + + cfg += f'environment = "LS_SUBCWD={log_dir}"\n' # not sure + + cfg += 'requirements = ( ' + if build_os == 'centos7': + cfg += '(OpSysAndVer =?= "CentOS7") && ' + if build_os == 'almalinux9': + cfg += '(OpSysAndVer =?= "AlmaLinux9") && ' + if build_os is None: + LOGGER.warning('Submitting jobs to default operating system. There ' + 'may be compatibility issues.') + cfg += '(Machine =!= LastRemoteHost) && (TARGET.has_avx2 =?= True) )\n' + + cfg += 'on_exit_remove = (ExitBySignal == False) && (ExitCode == 0)\n' + + cfg += 'max_retries = 3\n' + + cfg += '+JobFlavour = "%s"\n' % get_element(rdf_module, 'batch_queue') + + cfg += '+AccountingGroup = "%s"\n' % get_element(rdf_module, 'comp_group') + + cfg += 'RequestCpus = %i\n' % get_element(rdf_module, "n_threads") + + cfg += 'should_transfer_files = yes\n' + cfg += 'when_to_transfer_output = on_exit\n' + + cfg += f'transfer_output_files = $(outfile)\n\n' + + # add user batch configuration if any + user_batch_config = get_attribute(rdf_module, 'user_batch_config', None) + if user_batch_config is not None: + if not os.path.isfile(user_batch_config): + LOGGER.warning('userBatchConfig file can\'t be found! Will not ' + 'add it to the default config.') + else: + with open(user_batch_config, 'r', encoding='utf-8') as cfgfile: + for line in cfgfile: + cfg += line + '\n' + cfg += '\n\n' + + output_dir = get_attribute(rdf_module, 'output_dir', None) + output_path = os.path.join(output_dir, process_name) + cfg += 'queue scriptfile, outfile from (\n' + for idx, scriptfile in enumerate(subjob_scripts): + cfg += f' {scriptfile}, {output_path}/chunk_{idx}.root\n' + cfg += ')\n' + + return cfg + + +# _____________________________________________________________________________ +def create_subjob_script(local_dir: str, + analysis, + process_name: str, + chunk_num: int, + chunk_list: list[list[str]], + anapath: str, + cmd_args) -> str: + ''' + Creates sub-job script to be run. + ''' + + output_dir = get_attribute(analysis, 'output_dir', None) + + scr = '#!/bin/bash\n\n' + scr += 'source ' + local_dir + '/setup.sh\n\n' + + # scr += f'mkdir job_{process_name}_chunk_{chunk_num}\n' + # scr += f'cd job_{process_name}_chunk_{chunk_num}\n\n' + + output_path = os.path.join(output_dir, process_name, + f'chunk_{chunk_num}.root') + + scr += 'which fccanalysis\n' + #scr += local_dir + scr += f'fccanalysis run {anapath} --batch' + scr += f' --output {output_path}' + if cmd_args.ncpus > 0: + scr += ' --ncpus ' + get_element(rdf_module, "n_threads") + if len(cmd_args.unknown) > 0: + scr += ' ' + ' '.join(cmd_args.unknown) + scr += ' --files-list' + for file_path in chunk_list[chunk_num]: + scr += f' {file_path}' + scr += '\n\n' + + # output_dir_eos = get_attribute(analysis, 'output_dir_eos', None) + # if not os.path.isabs(output_dir) and output_dir_eos is None: + # final_dest = os.path.join(local_dir, output_dir, process_name, + # f'chunk_{chunk_num}.root') + # scr += f'cp {output_path} {final_dest}\n' + + # if output_dir_eos is not None: + # eos_type = get_attribute(analysis, 'eos_type', 'eospublic') + + # final_dest = os.path.join(output_dir_eos, + # process_name, + # f'chunk_{chunk_num}.root') + # final_dest = f'root://{eos_type}.cern.ch/' + final_dest + # scr += f'xrdcp {output_path} {final_dest}\n' + + scr += f'ls -alh {output_path}\n' + scr += 'pwd\n' + scr += f'find "$PWD" -name *.root\n' + + return scr + + +# _____________________________________________________________________________ +def submit_job(cmd: str, max_trials: int) -> bool: + ''' + Submit job to condor, retry `max_trials` times. + ''' + for i in range(max_trials): + with subprocess.Popen(cmd, shell=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) as proc: + (stdout, stderr) = proc.communicate() + + if proc.returncode == 0 and len(stderr) == 0: + LOGGER.info(stdout) + LOGGER.info('GOOD SUBMISSION') + return True + + LOGGER.warning('Error while submitting, retrying...\n ' + 'Trial: %i / %i\n Error: %s', + i, max_trials, stderr) + time.sleep(10) + + LOGGER.error('Failed submitting after: %i trials!', max_trials) + return False + + +# _____________________________________________________________________________ +def send_to_batch(args, analysis, chunk_list, sample_name, anapath: str): + ''' + Send jobs to HTCondor batch system. + ''' + local_dir = os.environ['LOCAL_DIR'] + current_date = datetime.datetime.fromtimestamp( + datetime.datetime.now().timestamp()).strftime('%Y-%m-%d_%H-%M-%S') + # log_dir = os.path.join(local_dir, 'BatchOutputs', current_date, + # sample_name) + log_dir = os.path.join('BatchOutputs', current_date, + sample_name) + if not os.path.exists(log_dir): + os.system(f'mkdir -p {log_dir}') + + # Making sure the FCCAnalyses libraries are compiled and installed + try: + subprocess.check_output(['make', 'install'], + cwd=local_dir+'/build', + stderr=subprocess.DEVNULL + ) + except subprocess.CalledProcessError: + LOGGER.error('The FCCanalyses libraries are not properly build and ' + 'installed!\nAborting job submission...') + sys.exit(3) + + subjob_scripts = [] + for ch_num in range(len(chunk_list)): + subjob_script_path = os.path.join( + log_dir, + f'job_{sample_name}_chunk_{ch_num}.sh') + subjob_scripts.append(subjob_script_path) + + for i in range(3): + try: + with open(subjob_script_path, 'w', encoding='utf-8') as ofile: + subjob_script = create_subjob_script(local_dir, + analysis, + sample_name, + ch_num, + chunk_list, + anapath, + args) + ofile.write(subjob_script) + except IOError as err: + if i < 2: + LOGGER.warning('I/O error(%i): %s', + err.errno, err.strerror) + else: + LOGGER.error('I/O error(%i): %s', err.errno, err.strerror) + sys.exit(3) + else: + break + time.sleep(10) + subprocess.getstatusoutput(f'chmod u+x {subjob_script_path}') + + LOGGER.debug('Sub-job scripts to be run:\n - %s', + '\n - '.join(subjob_scripts)) + + condor_config_path = f'{log_dir}/job_desc_{sample_name}.cfg' + + for i in range(3): + try: + with open(condor_config_path, 'w', encoding='utf-8') as cfgfile: + condor_config = create_condor_config(log_dir, + sample_name, + determine_os(local_dir), + analysis, + subjob_scripts) + cfgfile.write(condor_config) + except IOError as err: + LOGGER.warning('I/O error(%i): %s', err.errno, err.strerror) + if i == 2: + sys.exit(3) + else: + break + time.sleep(10) + # subprocess.getstatusoutput(f'chmod u+x {condor_config_path}') + + batch_cmd = f'condor_submit -spool {condor_config_path}' + LOGGER.info('Batch command:\n %s', batch_cmd) + success = submit_job(batch_cmd, 10) + if not success: + sys.exit(3) diff --git a/python/parsers.py b/python/parsers.py index 75db7975afc..6bcc38d5109 100644 --- a/python/parsers.py +++ b/python/parsers.py @@ -111,7 +111,7 @@ def setup_run_parser(parser): parser.add_argument('--files-list', default=[], nargs='+', help='specify input file(s) to bypass the processList') parser.add_argument( - '--output', + '-o', '--output', type=str, default='output.root', help='specify output file name to bypass the processList and or ' diff --git a/python/process.py b/python/process.py index c510eb440eb..2a69455d2a8 100644 --- a/python/process.py +++ b/python/process.py @@ -12,13 +12,15 @@ import yaml # type: ignore import ROOT # type: ignore import cppyy +import numpy as np -ROOT.gROOT.SetBatch(True) +ROOT.gROOT.SetBatch(True) LOGGER: logging.Logger = logging.getLogger('FCCAnalyses.process_info') +# _____________________________________________________________________________ def get_entries(inpath: str) -> int | None: ''' Retrieves number of entries in the "events" TTree from the provided ROOT @@ -196,6 +198,7 @@ def get_process_info_yaml(process_name: str, return filelist, eventlist +# _____________________________________________________________________________ def get_process_dict(proc_dict_location: str) -> dict: ''' Pick up the dictionary with process information @@ -241,6 +244,7 @@ def get_process_dict(proc_dict_location: str) -> dict: return proc_dict +# _____________________________________________________________________________ def get_process_dict_dirs() -> list[str]: ''' Get search directories for the process dictionaries @@ -255,3 +259,48 @@ def get_process_dict_dirs() -> list[str]: dirs[:] = [d for d in dirs if d] return dirs + + +# _____________________________________________________________________________ +def get_subfile_list(in_file_list: list[str], + event_list: list[int], + fraction: float) -> list[str]: + ''' + Obtain list of files roughly containing the requested fraction of events. + ''' + nevts_total: int = sum(event_list) + nevts_target: int = int(nevts_total * fraction) + + if nevts_target <= 0: + LOGGER.error('The reduction fraction %f too stringent, no events ' + 'left!\nAborting...', fraction) + sys.exit(3) + + nevts_real: int = 0 + out_file_list: list[str] = [] + for i, nevts in enumerate(event_list): + if nevts_real >= nevts_target: + break + nevts_real += nevts + out_file_list.append(in_file_list[i]) + + info_msg = f'Reducing the input file list by fraction "{fraction}" of ' + info_msg += 'total events:\n\t' + info_msg += f'- total number of events: {nevts_total:,}\n\t' + info_msg += f'- targeted number of events: {nevts_target:,}\n\t' + info_msg += '- number of events in the resulting file list: ' + info_msg += f'{nevts_real:,}\n\t' + info_msg += '- number of files after reduction: ' + info_msg += str((len(out_file_list))) + LOGGER.info(info_msg) + + return out_file_list + + +# _____________________________________________________________________________ +def get_chunk_list(file_list: str, chunks: int): + ''' + Get list of input file paths arranged into chunks. + ''' + chunk_list = list(np.array_split(file_list, chunks)) + return [chunk for chunk in chunk_list if chunk.size > 0] diff --git a/python/run_analysis.py b/python/run_analysis.py index 52e3a9b6605..4be1f455225 100644 --- a/python/run_analysis.py +++ b/python/run_analysis.py @@ -11,241 +11,19 @@ import subprocess import importlib.util import datetime -import numpy as np import ROOT # type: ignore import cppyy from anascript import get_element, get_element_dict from process import get_process_info, get_process_dict -from frame import generate_graph +from process import get_subfile_list, get_chunk_list +from batch import send_to_batch +from utils import generate_graph, save_benchmark -LOGGER = logging.getLogger('FCCAnalyses.run') ROOT.gROOT.SetBatch(True) - -# _____________________________________________________________________________ -def determine_os(local_dir: str) -> str | None: - ''' - Determines platform on which FCCAnalyses was compiled - ''' - cmake_config_path = local_dir + '/build/CMakeFiles/CMakeConfigureLog.yaml' - if not os.path.isfile(cmake_config_path): - LOGGER.warning('CMake configuration file was not found!\n' - 'Was FCCAnalyses properly build?') - return None - - with open(cmake_config_path, 'r', encoding='utf-8') as cmake_config_file: - cmake_config = cmake_config_file.read() - if 'centos7' in cmake_config: - return 'centos7' - if 'almalinux9' in cmake_config: - return 'almalinux9' - - return None - - -# _____________________________________________________________________________ -def create_condor_config(log_dir: str, - process_name: str, - build_os: str | None, - rdf_module, - subjob_scripts: list[str]) -> str: - ''' - Creates contents of condor configuration file. - ''' - cfg = 'executable = $(filename)\n' - - cfg += f'Log = {log_dir}/condor_job.{process_name}.' - cfg += '$(ClusterId).$(ProcId).log\n' - - cfg += f'Output = {log_dir}/condor_job.{process_name}.' - cfg += '$(ClusterId).$(ProcId).out\n' - - cfg += f'Error = {log_dir}/condor_job.{process_name}.' - cfg += '$(ClusterId).$(ProcId).error\n' - - cfg += 'getenv = False\n' - - cfg += 'environment = "LS_SUBCWD={log_dir}"\n' # not sure - - cfg += 'requirements = ( ' - if build_os == 'centos7': - cfg += '(OpSysAndVer =?= "CentOS7") && ' - if build_os == 'almalinux9': - cfg += '(OpSysAndVer =?= "AlmaLinux9") && ' - if build_os is None: - LOGGER.warning('Submitting jobs to default operating system. There ' - 'may be compatibility issues.') - cfg += '(Machine =!= LastRemoteHost) && (TARGET.has_avx2 =?= True) )\n' - - cfg += 'on_exit_remove = (ExitBySignal == False) && (ExitCode == 0)\n' - - cfg += 'max_retries = 3\n' - - cfg += '+JobFlavour = "%s"\n' % get_element(rdf_module, 'batchQueue') - - cfg += '+AccountingGroup = "%s"\n' % get_element(rdf_module, 'compGroup') - - cfg += 'RequestCpus = %i\n' % get_element(rdf_module, "nCPUS") - - cfg += 'queue filename matching files' - for script in subjob_scripts: - cfg += ' ' + script - cfg += '\n' - - return cfg - - -# _____________________________________________________________________________ -def create_subjob_script(local_dir: str, - rdf_module, - process_name: str, - chunk_num: int, - chunk_list: list[list[str]], - anapath: str) -> str: - ''' - Creates sub-job script to be run. - ''' - - output_dir = get_element(rdf_module, "outputDir") - output_dir_eos = get_element(rdf_module, "outputDirEos") - eos_type = get_element(rdf_module, "eosType") - user_batch_config = get_element(rdf_module, "userBatchConfig") - - scr = '#!/bin/bash\n\n' - scr += 'source ' + local_dir + '/setup.sh\n\n' - - # add userBatchConfig if any - if user_batch_config != '': - if not os.path.isfile(user_batch_config): - LOGGER.warning('userBatchConfig file can\'t be found! Will not ' - 'add it to the default config.') - else: - with open(user_batch_config, 'r', encoding='utf-8') as cfgfile: - for line in cfgfile: - scr += line + '\n' - scr += '\n\n' - - scr += f'mkdir job_{process_name}_chunk_{chunk_num}\n' - scr += f'cd job_{process_name}_chunk_{chunk_num}\n\n' - - if not os.path.isabs(output_dir): - output_path = os.path.join(output_dir, f'chunk_{chunk_num}.root') - else: - output_path = os.path.join(output_dir, process_name, - f'chunk_{chunk_num}.root') - - scr += local_dir - scr += f'/bin/fccanalysis run {anapath} --batch ' - scr += f'--output {output_path} ' - scr += '--files-list' - for file_path in chunk_list[chunk_num]: - scr += f' {file_path}' - scr += '\n\n' - - if not os.path.isabs(output_dir) and output_dir_eos == '': - final_dest = os.path.join(local_dir, output_dir, process_name, - f'chunk_{chunk_num}.root') - scr += f'cp {output_path} {final_dest}\n' - - if output_dir_eos != '': - final_dest = os.path.join(output_dir_eos, - process_name, - f'chunk_{chunk_num}.root') - final_dest = f'root://{eos_type}.cern.ch/' + final_dest - scr += f'xrdcp {output_path} {final_dest}\n' - - return scr - - -# _____________________________________________________________________________ -def get_subfile_list(in_file_list: list[str], - event_list: list[int], - fraction: float) -> list[str]: - ''' - Obtain list of files roughly containing the requested fraction of events. - ''' - nevts_total: int = sum(event_list) - nevts_target: int = int(nevts_total * fraction) - - if nevts_target <= 0: - LOGGER.error('The reduction fraction %f too stringent, no events ' - 'left!\nAborting...', fraction) - sys.exit(3) - - nevts_real: int = 0 - out_file_list: list[str] = [] - for i, nevts in enumerate(event_list): - if nevts_real >= nevts_target: - break - nevts_real += nevts - out_file_list.append(in_file_list[i]) - - info_msg = f'Reducing the input file list by fraction "{fraction}" of ' - info_msg += 'total events:\n\t' - info_msg += f'- total number of events: {nevts_total:,}\n\t' - info_msg += f'- targeted number of events: {nevts_target:,}\n\t' - info_msg += '- number of events in the resulting file list: ' - info_msg += f'{nevts_real:,}\n\t' - info_msg += '- number of files after reduction: ' - info_msg += str((len(out_file_list))) - LOGGER.info(info_msg) - - return out_file_list - - -# _____________________________________________________________________________ -def get_chunk_list(file_list: str, chunks: int): - ''' - Get list of input file paths arranged into chunks. - ''' - chunk_list = list(np.array_split(file_list, chunks)) - return [chunk for chunk in chunk_list if chunk.size > 0] - - -# _____________________________________________________________________________ -def save_benchmark(outfile, benchmark): - ''' - Save benchmark results to a JSON file. - ''' - benchmarks = [] - try: - with open(outfile, 'r', encoding='utf-8') as benchin: - benchmarks = json.load(benchin) - except OSError: - pass - - benchmarks = [b for b in benchmarks if b['name'] != benchmark['name']] - benchmarks.append(benchmark) - - with open(outfile, 'w', encoding='utf-8') as benchout: - json.dump(benchmarks, benchout, indent=2) - - -# _____________________________________________________________________________ -def submit_job(cmd: str, max_trials: int) -> bool: - ''' - Submit job to condor, retry `max_trials` times. - ''' - for i in range(max_trials): - with subprocess.Popen(cmd, shell=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True) as proc: - (stdout, stderr) = proc.communicate() - - if proc.returncode == 0 and len(stderr) == 0: - LOGGER.info(stdout) - LOGGER.info('GOOD SUBMISSION') - return True - - LOGGER.warning('Error while submitting, retrying...\n ' - 'Trial: %i / %i\n Error: %s', - i, max_trials, stderr) - time.sleep(10) - - LOGGER.error('Failed submitting after: %i trials!', max_trials) - return False +LOGGER = logging.getLogger('FCCAnalyses.run') # _____________________________________________________________________________ @@ -314,7 +92,7 @@ def initialize(args, rdf_module, anapath: str): # _____________________________________________________________________________ def run_rdf(rdf_module, input_list: list[str], - out_file: str, + outfile_path: str, args) -> int: ''' Create RDataFrame and snapshot it. @@ -343,7 +121,7 @@ def run_rdf(rdf_module, if args.graph: generate_graph(dframe, args) - dframe3.Snapshot("events", out_file, branch_list) + dframe3.Snapshot("events", outfile_path, branch_list) except cppyy.gbl.std.runtime_error as err: LOGGER.error('%s\nDuring the execution of the analysis script an ' 'exception occurred!\nAborting...', err) @@ -352,86 +130,6 @@ def run_rdf(rdf_module, return evtcount_init.GetValue(), evtcount_final.GetValue() -# _____________________________________________________________________________ -def send_to_batch(rdf_module, chunk_list, process, anapath: str): - ''' - Send jobs to HTCondor batch system. - ''' - local_dir = os.environ['LOCAL_DIR'] - current_date = datetime.datetime.fromtimestamp( - datetime.datetime.now().timestamp()).strftime('%Y-%m-%d_%H-%M-%S') - log_dir = os.path.join(local_dir, 'BatchOutputs', current_date, process) - if not os.path.exists(log_dir): - os.system(f'mkdir -p {log_dir}') - - # Making sure the FCCAnalyses libraries are compiled and installed - try: - subprocess.check_output(['make', 'install'], - cwd=local_dir+'/build', - stderr=subprocess.DEVNULL - ) - except subprocess.CalledProcessError: - LOGGER.error('The FCCAnalyses libraries are not properly build and ' - 'installed!\nAborting job submission...') - sys.exit(3) - - subjob_scripts = [] - for ch in range(len(chunk_list)): - subjob_script_path = os.path.join(log_dir, - f'job_{process}_chunk_{ch}.sh') - subjob_scripts.append(subjob_script_path) - - for i in range(3): - try: - with open(subjob_script_path, 'w', encoding='utf-8') as ofile: - subjob_script = create_subjob_script(local_dir, - rdf_module, - process, - ch, - chunk_list, - anapath) - ofile.write(subjob_script) - except IOError as e: - if i < 2: - LOGGER.warning('I/O error(%i): %s', e.errno, e.strerror) - else: - LOGGER.error('I/O error(%i): %s', e.errno, e.strerror) - sys.exit(3) - else: - break - time.sleep(10) - subprocess.getstatusoutput(f'chmod 777 {subjob_script_path}') - - LOGGER.debug('Sub-job scripts to be run:\n - %s', - '\n - '.join(subjob_scripts)) - - condor_config_path = f'{log_dir}/job_desc_{process}.cfg' - - for i in range(3): - try: - with open(condor_config_path, 'w', encoding='utf-8') as cfgfile: - condor_config = create_condor_config(log_dir, - process, - determine_os(local_dir), - rdf_module, - subjob_scripts) - cfgfile.write(condor_config) - except IOError as e: - LOGGER.warning('I/O error(%i): %s', e.errno, e.strerror) - if i == 2: - sys.exit(3) - else: - break - time.sleep(10) - subprocess.getstatusoutput(f'chmod 777 {condor_config_path}') - - batch_cmd = f'condor_submit {condor_config_path}' - LOGGER.info('Batch command:\n %s', batch_cmd) - success = submit_job(batch_cmd, 10) - if not success: - sys.exit(3) - - # _____________________________________________________________________________ def apply_filepath_rewrites(filepath: str) -> str: ''' @@ -515,9 +213,10 @@ def run_local(rdf_module, infile_list, args): output_dir = get_element(rdf_module, "outputDir") if not args.batch: if os.path.isabs(args.output): - LOGGER.warning('Provided output path is absolute, "outputDir" ' + LOGGER.warning('Provided output file path is absolute, "outputDir" ' 'from analysis script will be ignored!') - outfile_path = os.path.join(output_dir, args.output) + else: + outfile_path = os.path.join(output_dir, args.output) else: outfile_path = args.output LOGGER.info('Output file path:\n%s', outfile_path) @@ -594,7 +293,7 @@ def run_stages(args, rdf_module, anapath): if not os.path.exists(output_dir) and output_dir: os.system(f'mkdir -p {output_dir}') - # Check if outputDir exist and if not create it + # Check if EOS outputDir exist and if not create it output_dir_eos = get_element(rdf_module, "outputDirEos") if not os.path.exists(output_dir_eos) and output_dir_eos: os.system(f'mkdir -p {output_dir_eos}') @@ -667,6 +366,9 @@ def run_stages(args, rdf_module, anapath): chunk_list = [file_list] if chunks > 1: chunk_list = get_chunk_list(file_list, chunks) + args.output = f'{output_stem}/chunk{index}.root' + else: + args.output = f'{output_stem}.root' LOGGER.info('Number of the output files: %s', f'{len(chunk_list):,}') # Create directory if more than 1 chunk diff --git a/python/run_fccanalysis.py b/python/run_fccanalysis.py index 97dd0d73ac8..4996525c92e 100644 --- a/python/run_fccanalysis.py +++ b/python/run_fccanalysis.py @@ -10,246 +10,18 @@ import logging import subprocess import datetime -import numpy as np import ROOT # type: ignore from anascript import get_element, get_element_dict, get_attribute -from process import get_process_info, get_entries_sow -from frame import generate_graph +from process import get_process_info, get_entries_sow, get_subfile_list +from process import get_subfile_list, get_chunk_list +from batch import send_to_batch +from utils import generate_graph, save_benchmark -LOGGER = logging.getLogger('FCCAnalyses.run') ROOT.gROOT.SetBatch(True) - -# _____________________________________________________________________________ -def determine_os(local_dir: str) -> str | None: - ''' - Determines platform on which FCCAnalyses was compiled - ''' - cmake_config_path = local_dir + '/build/CMakeFiles/CMakeConfigureLog.yaml' - if not os.path.isfile(cmake_config_path): - LOGGER.warning('CMake configuration file was not found!\n' - 'Was FCCAnalyses properly build?') - return None - - with open(cmake_config_path, 'r', encoding='utf-8') as cmake_config_file: - cmake_config = cmake_config_file.read() - if 'centos7' in cmake_config: - return 'centos7' - if 'almalinux9' in cmake_config: - return 'almalinux9' - - return None - - -# _____________________________________________________________________________ -def create_condor_config(log_dir: str, - process_name: str, - build_os: str | None, - rdf_module, - subjob_scripts: list[str]) -> str: - ''' - Creates contents of condor configuration file. - ''' - cfg = 'executable = $(filename)\n' - - cfg += f'Log = {log_dir}/condor_job.{process_name}.' - cfg += '$(ClusterId).$(ProcId).log\n' - - cfg += f'Output = {log_dir}/condor_job.{process_name}.' - cfg += '$(ClusterId).$(ProcId).out\n' - - cfg += f'Error = {log_dir}/condor_job.{process_name}.' - cfg += '$(ClusterId).$(ProcId).error\n' - - cfg += 'getenv = False\n' - - cfg += 'environment = "LS_SUBCWD={log_dir}"\n' # not sure - - cfg += 'requirements = ( ' - if build_os == 'centos7': - cfg += '(OpSysAndVer =?= "CentOS7") && ' - if build_os == 'almalinux9': - cfg += '(OpSysAndVer =?= "AlmaLinux9") && ' - if build_os is None: - LOGGER.warning('Submitting jobs to default operating system. There ' - 'may be compatibility issues.') - cfg += '(Machine =!= LastRemoteHost) && (TARGET.has_avx2 =?= True) )\n' - - cfg += 'on_exit_remove = (ExitBySignal == False) && (ExitCode == 0)\n' - - cfg += 'max_retries = 3\n' - - cfg += '+JobFlavour = "%s"\n' % get_element(rdf_module, 'batchQueue') - - cfg += '+AccountingGroup = "%s"\n' % get_element(rdf_module, 'compGroup') - - cfg += 'RequestCpus = %i\n' % get_element(rdf_module, "nCPUS") - - cfg += 'queue filename matching files' - for script in subjob_scripts: - cfg += ' ' + script - cfg += '\n' - - return cfg - - -# _____________________________________________________________________________ -def create_subjob_script(local_dir: str, - analysis, - process_name: str, - chunk_num: int, - chunk_list: list[list[str]], - anapath: str, - cmd_args) -> str: - ''' - Creates sub-job script to be run. - ''' - - output_dir = get_attribute(analysis, 'output_dir', None) - - scr = '#!/bin/bash\n\n' - scr += 'source ' + local_dir + '/setup.sh\n\n' - - # add user batch configuration if any - user_batch_config = get_attribute(analysis, 'user_batch_config', None) - if user_batch_config is not None: - if not os.path.isfile(user_batch_config): - LOGGER.warning('userBatchConfig file can\'t be found! Will not ' - 'add it to the default config.') - else: - with open(user_batch_config, 'r', encoding='utf-8') as cfgfile: - for line in cfgfile: - scr += line + '\n' - scr += '\n\n' - - scr += f'mkdir job_{process_name}_chunk_{chunk_num}\n' - scr += f'cd job_{process_name}_chunk_{chunk_num}\n\n' - - if not os.path.isabs(output_dir): - output_path = os.path.join(output_dir, f'chunk_{chunk_num}.root') - else: - output_path = os.path.join(output_dir, process_name, - f'chunk_{chunk_num}.root') - - scr += local_dir - scr += f'/bin/fccanalysis run {anapath} --batch' - scr += f' --output {output_path}' - if cmd_args.ncpus > 0: - scr += f' --ncpus {cmd_args.ncpus}' - if len(cmd_args.unknown) > 0: - scr += ' ' + ' '.join(cmd_args.unknown) - scr += ' --files-list' - for file_path in chunk_list[chunk_num]: - scr += f' {file_path}' - scr += '\n\n' - - output_dir_eos = get_attribute(analysis, 'output_dir_eos', None) - if not os.path.isabs(output_dir) and output_dir_eos is None: - final_dest = os.path.join(local_dir, output_dir, process_name, - f'chunk_{chunk_num}.root') - scr += f'cp {output_path} {final_dest}\n' - - if output_dir_eos is not None: - eos_type = get_attribute(analysis, 'eos_type', 'eospublic') - - final_dest = os.path.join(output_dir_eos, - process_name, - f'chunk_{chunk_num}.root') - final_dest = f'root://{eos_type}.cern.ch/' + final_dest - scr += f'xrdcp {output_path} {final_dest}\n' - - return scr - - -# _____________________________________________________________________________ -def get_subfile_list(in_file_list: list[str], - event_list: list[int], - fraction: float) -> list[str]: - ''' - Obtain list of files roughly containing the requested fraction of events. - ''' - nevts_total: int = sum(event_list) - nevts_target: int = int(nevts_total * fraction) - - if nevts_target <= 0: - LOGGER.error('The reduction fraction %f too stringent, no events ' - 'left!\nAborting...', fraction) - sys.exit(3) - - nevts_real: int = 0 - out_file_list: list[str] = [] - for i, nevts in enumerate(event_list): - if nevts_real >= nevts_target: - break - nevts_real += nevts - out_file_list.append(in_file_list[i]) - - info_msg = f'Reducing the input file list by fraction "{fraction}" of ' - info_msg += 'total events:\n\t' - info_msg += f'- total number of events: {nevts_total:,}\n\t' - info_msg += f'- targeted number of events: {nevts_target:,}\n\t' - info_msg += '- number of events in the resulting file list: ' - info_msg += f'{nevts_real:,}\n\t' - info_msg += '- number of files after reduction: ' - info_msg += str((len(out_file_list))) - LOGGER.info(info_msg) - - return out_file_list - - -# _____________________________________________________________________________ -def get_chunk_list(file_list: str, chunks: int): - ''' - Get list of input file paths arranged into chunks. - ''' - chunk_list = list(np.array_split(file_list, chunks)) - return [chunk for chunk in chunk_list if chunk.size > 0] - - -# _____________________________________________________________________________ -def save_benchmark(outfile, benchmark): - ''' - Save benchmark results to a JSON file. - ''' - benchmarks = [] - try: - with open(outfile, 'r', encoding='utf-8') as benchin: - benchmarks = json.load(benchin) - except OSError: - pass - - benchmarks = [b for b in benchmarks if b['name'] != benchmark['name']] - benchmarks.append(benchmark) - - with open(outfile, 'w', encoding='utf-8') as benchout: - json.dump(benchmarks, benchout, indent=2) - - -# _____________________________________________________________________________ -def submit_job(cmd: str, max_trials: int) -> bool: - ''' - Submit job to condor, retry `max_trials` times. - ''' - for i in range(max_trials): - with subprocess.Popen(cmd, shell=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True) as proc: - (stdout, stderr) = proc.communicate() - - if proc.returncode == 0 and len(stderr) == 0: - LOGGER.info(stdout) - LOGGER.info('GOOD SUBMISSION') - return True - - LOGGER.warning('Error while submitting, retrying...\n ' - 'Trial: %i / %i\n Error: %s', - i, max_trials, stderr) - time.sleep(10) - - LOGGER.error('Failed submitting after: %i trials!', max_trials) - return False +LOGGER = logging.getLogger('FCCAnalyses.run') # _____________________________________________________________________________ @@ -270,6 +42,10 @@ def merge_config(args: object, analysis: object) -> dict[str, any]: if get_attribute(analysis, 'do_weighted', False): config['do_weighted'] = True + # Check the output path + # config['output_file_path'] = None + # if args.output + return config @@ -388,90 +164,6 @@ def run_rdf(config: dict[str, any], return evtcount_init.GetValue(), evtcount_final.GetValue(), sow_init.GetValue(), sow_final.GetValue() -# _____________________________________________________________________________ -def send_to_batch(args, analysis, chunk_list, sample_name, anapath: str): - ''' - Send jobs to HTCondor batch system. - ''' - local_dir = os.environ['LOCAL_DIR'] - current_date = datetime.datetime.fromtimestamp( - datetime.datetime.now().timestamp()).strftime('%Y-%m-%d_%H-%M-%S') - log_dir = os.path.join(local_dir, 'BatchOutputs', current_date, - sample_name) - if not os.path.exists(log_dir): - os.system(f'mkdir -p {log_dir}') - - # Making sure the FCCAnalyses libraries are compiled and installed - try: - subprocess.check_output(['make', 'install'], - cwd=local_dir+'/build', - stderr=subprocess.DEVNULL - ) - except subprocess.CalledProcessError: - LOGGER.error('The FCCanalyses libraries are not properly build and ' - 'installed!\nAborting job submission...') - sys.exit(3) - - subjob_scripts = [] - for ch_num in range(len(chunk_list)): - subjob_script_path = os.path.join( - log_dir, - f'job_{sample_name}_chunk_{ch_num}.sh') - subjob_scripts.append(subjob_script_path) - - for i in range(3): - try: - with open(subjob_script_path, 'w', encoding='utf-8') as ofile: - subjob_script = create_subjob_script(local_dir, - analysis, - sample_name, - ch_num, - chunk_list, - anapath, - args) - ofile.write(subjob_script) - except IOError as err: - if i < 2: - LOGGER.warning('I/O error(%i): %s', - err.errno, err.strerror) - else: - LOGGER.error('I/O error(%i): %s', err.errno, err.strerror) - sys.exit(3) - else: - break - time.sleep(10) - subprocess.getstatusoutput(f'chmod 777 {subjob_script_path}') - - LOGGER.debug('Sub-job scripts to be run:\n - %s', - '\n - '.join(subjob_scripts)) - - condor_config_path = f'{log_dir}/job_desc_{sample_name}.cfg' - - for i in range(3): - try: - with open(condor_config_path, 'w', encoding='utf-8') as cfgfile: - condor_config = create_condor_config(log_dir, - sample_name, - determine_os(local_dir), - analysis, - subjob_scripts) - cfgfile.write(condor_config) - except IOError as err: - LOGGER.warning('I/O error(%i): %s', err.errno, err.strerror) - if i == 2: - sys.exit(3) - else: - break - time.sleep(10) - subprocess.getstatusoutput(f'chmod 777 {condor_config_path}') - - batch_cmd = f'condor_submit {condor_config_path}' - LOGGER.info('Batch command:\n %s', batch_cmd) - success = submit_job(batch_cmd, 10) - if not success: - sys.exit(3) - - # _____________________________________________________________________________ def apply_filepath_rewrites(filepath: str) -> str: ''' @@ -565,7 +257,6 @@ def run_local(config: dict[str, any], LOGGER.info(info_msg) - if nevents_orig > 0: LOGGER.info('Number of events:\n\t- original: %s\n\t- local: %s', f'{nevents_orig:,}', f'{nevents_local:,}') @@ -681,7 +372,7 @@ def run_fccanalysis(args, analysis_module): if output_dir is not None and not os.path.exists(output_dir): os.system(f'mkdir -p {output_dir}') - # Check if eos output directory exist and if not create it + # Check if EOS output directory exist and if not create it output_dir_eos = get_attribute(analysis, 'output_dir_eos', None) if output_dir_eos is not None and not os.path.exists(output_dir_eos): os.system(f'mkdir -p {output_dir_eos}') @@ -728,8 +419,6 @@ def run_fccanalysis(args, analysis_module): 'analysis script!\nAborting...') sys.exit(3) - - for process_name in process_list: LOGGER.info('Started processing sample "%s" ...', process_name) file_list, event_list = get_process_info(process_name, @@ -770,14 +459,6 @@ def run_fccanalysis(args, analysis_module): chunk_list = get_chunk_list(file_list, chunks) LOGGER.info('Number of the output files: %s', f'{len(chunk_list):,}') - # Create directory if more than 1 chunk - if len(chunk_list) > 1: - output_directory = os.path.join(output_dir if output_dir else '', - output_stem) - - if not os.path.exists(output_directory): - os.system(f'mkdir -p {output_directory}') - if run_batch: # Sending to the batch system LOGGER.info('Running on the batch...') @@ -790,6 +471,14 @@ def run_fccanalysis(args, analysis_module): send_to_batch(args, analysis, chunk_list, process_name, anapath) else: + # Create directory if more than 1 chunk + if len(chunk_list) > 1: + output_directory = os.path.join(output_dir if output_dir else '', + output_stem) + + if not os.path.exists(output_directory): + os.system(f'mkdir -p {output_directory}') + # Running locally LOGGER.info('Running locally...') if len(chunk_list) == 1: diff --git a/python/run_final_analysis.py b/python/run_final_analysis.py index 8b8a90c40bf..ce6727df014 100644 --- a/python/run_final_analysis.py +++ b/python/run_final_analysis.py @@ -16,7 +16,7 @@ import cppyy from anascript import get_element, get_attribute from process import get_process_dict, get_entries_sow -from frame import generate_graph +from utils import generate_graph LOGGER = logging.getLogger('FCCAnalyses.run_final') diff --git a/python/frame.py b/python/utils.py similarity index 75% rename from python/frame.py rename to python/utils.py index 78a58cfa8ab..08760905c47 100644 --- a/python/frame.py +++ b/python/utils.py @@ -1,5 +1,5 @@ ''' -RDataFrame helpers. +RDataFrame or other helpers. ''' import os @@ -11,7 +11,7 @@ ROOT.gROOT.SetBatch(True) -LOGGER: logging.Logger = logging.getLogger('FCCAnalyses.frame') +LOGGER: logging.Logger = logging.getLogger('FCCAnalyses.utils') # _____________________________________________________________________________ @@ -57,3 +57,22 @@ def generate_graph(dframe, args, suffix: str | None = None) -> None: # Convert .dot file into .png os.system(f'dot -Tpng {graph_path.with_suffix(".dot")} ' f'-o {graph_path.with_suffix(".png")}') + + +# _____________________________________________________________________________ +def save_benchmark(outfile, benchmark): + ''' + Save benchmark results to a JSON file. + ''' + benchmarks = [] + try: + with open(outfile, 'r', encoding='utf-8') as benchin: + benchmarks = json.load(benchin) + except OSError: + pass + + benchmarks = [b for b in benchmarks if b['name'] != benchmark['name']] + benchmarks.append(benchmark) + + with open(outfile, 'w', encoding='utf-8') as benchout: + json.dump(benchmarks, benchout, indent=2) diff --git a/python/frame.pyi b/python/utils.pyi similarity index 100% rename from python/frame.pyi rename to python/utils.pyi