Skip to content

Commit

Permalink
Adding submit sub-command
Browse files Browse the repository at this point in the history
  • Loading branch information
kjvbrt committed Jan 9, 2025
1 parent 1a2d1a8 commit 93ffacf
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 46 deletions.
10 changes: 7 additions & 3 deletions bin/fccanalysis
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ from init_analysis import init_analysis
from build_analysis import build_analysis
from test_fccanalyses import test_fccanalyses
from pin_analysis import PinAnalysis
from submit import submit_analysis
from run_analysis import run
from run_final_analysis import run_final
from do_plots import do_plots
Expand Down Expand Up @@ -53,12 +54,13 @@ class MaxLevelFilter(logging.Filter):
https://stackoverflow.com/questions/1383254/
'''
def __init__(self, level):
super().__init__()
self.level = level

def filter(self, record):
return record.levelno < self.level # "<" instead of "<=": since
# logger.setLevel is inclusive,
# this should be exclusive
# "<" instead of "<=": since logger.setLevel is inclusive,
# this should be exclusive
return record.levelno < self.level


# _____________________________________________________________________________
Expand Down Expand Up @@ -117,6 +119,8 @@ def main():
test_fccanalyses(parser)
elif args.command == 'pin':
PinAnalysis(parser)
elif args.command == 'submit':
submit_analysis(parser)
elif args.command == 'final':
run_final(parser)
elif args.command == 'plots':
Expand Down
12 changes: 6 additions & 6 deletions examples/FCCee/higgs/mH-recoil/mumu/analysis_stage1_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ class Analysis():
Higgs mass recoil analysis in Z(mumu)H.
'''
def __init__(self, cmdline_args):
# Parse additional arguments not known to the FCCAnalyses parsers.
# All command line arguments are provided in the `cmdline_arg`
# dictionary and arguments after "--" are stored under "remaining" key.
parser = ArgumentParser(
description='Additional analysis arguments',
usage='Provide additional arguments after analysis script path')
usage='Provided after "--"')
parser.add_argument('--muon-pt', default='10.', type=float,
help='Minimal pT of the mouns.')
# Parse additional arguments not known to the FCCAnalyses parsers
# All command line arguments know to fccanalysis are provided in the
# `cmdline_arg` dictionary.
self.ana_args, _ = parser.parse_known_args(cmdline_args['unknown'])
self.ana_args, _ = parser.parse_known_args(cmdline_args['remaining'])

# Mandatory: List of processes to run over
self.process_list = {
Expand All @@ -42,7 +42,7 @@ def __init__(self, cmdline_args):
# self.analysis_name = 'My Analysis'

# Optional: number of threads to run on, default is 'all available'
# self.n_threads = 4
self.n_threads = 4

# Optional: running on HTCondor, default is False
self.run_batch = True
Expand Down
62 changes: 43 additions & 19 deletions python/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,16 @@
import os
import sys
import time
import shutil
import json
import logging
import subprocess
import datetime
import numpy as np
from typing import Any

import ROOT # type: ignore
from anascript import get_element, get_element_dict, get_attribute
from process import get_process_info, get_entries_sow
from anascript import get_element, get_attribute
from process import get_process_info
from process import get_subfile_list, get_chunk_list


ROOT.gROOT.SetBatch(True)

LOGGER = logging.getLogger('FCCAnalyses.batch')


Expand Down Expand Up @@ -90,7 +86,7 @@ def create_condor_config(log_dir: str,
cfg += 'should_transfer_files = yes\n'
cfg += 'when_to_transfer_output = on_exit\n'

cfg += f'transfer_output_files = $(outfile)\n\n'
cfg += 'transfer_output_files = $(outfile)\n\n'

# add user batch configuration if any
user_batch_config = get_attribute(rdf_module, 'user_batch_config', None)
Expand Down Expand Up @@ -138,12 +134,12 @@ def create_subjob_script(local_dir: str,
f'chunk_{chunk_num}.root')

scr += 'which fccanalysis\n'
#scr += local_dir
# scr += local_dir
scr += f'fccanalysis run {anapath} --batch'
scr += f' --output {output_path}'
if cmd_args.ncpus > 0:
scr += ' --ncpus ' + get_element(rdf_module, "n_threads")
if len(cmd_args.unknown) > 0:
if hasattr(analysis, 'n_threads'):
scr += ' --ncpus ' + str(get_element(analysis, "n_threads"))
if len(cmd_args.remaining) > 0:
scr += ' ' + ' '.join(cmd_args.unknown)
scr += ' --files-list'
for file_path in chunk_list[chunk_num]:
Expand Down Expand Up @@ -198,17 +194,18 @@ def submit_job(cmd: str, max_trials: int) -> bool:


# _____________________________________________________________________________
def send_to_batch(args, analysis, chunk_list, sample_name, anapath: str):
def send_to_batch(config: dict[str, Any],
args, analysis,
sample_name: str) -> None:
'''
Send jobs to HTCondor batch system.
'''
sample_dict = config['process-list'][sample_name]

local_dir = os.environ['LOCAL_DIR']
current_date = datetime.datetime.fromtimestamp(
datetime.datetime.now().timestamp()).strftime('%Y-%m-%d_%H-%M-%S')
# log_dir = os.path.join(local_dir, 'BatchOutputs', current_date,
# sample_name)
log_dir = os.path.join('BatchOutputs', current_date,
sample_name)
log_dir = os.path.join('BatchOutputs', current_date, sample_name)
if not os.path.exists(log_dir):
os.system(f'mkdir -p {log_dir}')

Expand All @@ -223,6 +220,33 @@ def send_to_batch(args, analysis, chunk_list, sample_name, anapath: str):
'installed!\nAborting job submission...')
sys.exit(3)

# Determine the fraction of the input to be processed
fraction = 1.
if 'fraction' in sample_dict:
fraction = sample_dict['fraction']

# Determine the number of chunks the output will be split into
chunks = 1
if 'chunks' in sample_dict:
chunks = sample_dict['chunks']

file_list, event_list = get_process_info(sample_name,
config['production-tag'],
config['input-directory'])

if len(file_list) <= 0:
LOGGER.error('No files to process!\nContinuing...')
return

# Adjust number of input files according to the fraction requirement
if fraction < 1.:
file_list = get_subfile_list(file_list, event_list, fraction)

# Adjust number of output files according to the chunk requirement
chunk_list = [file_list]
if chunks > 1:
chunk_list = get_chunk_list(file_list, chunks)

subjob_scripts = []
for ch_num in range(len(chunk_list)):
subjob_script_path = os.path.join(
Expand All @@ -238,7 +262,7 @@ def send_to_batch(args, analysis, chunk_list, sample_name, anapath: str):
sample_name,
ch_num,
chunk_list,
anapath,
config['analysis-path'],
args)
ofile.write(subjob_script)
except IOError as err:
Expand Down
45 changes: 35 additions & 10 deletions python/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def setup_test_parser(parser):
)


# _____________________________________________________________________________
def setup_pin_parser(parser):
'''
Arguments for the pin sub-command
Expand All @@ -102,6 +103,23 @@ def setup_pin_parser(parser):
help='show pinned stack')


# _____________________________________________________________________________
def setup_submit_parser(parser):
'''
Define command line arguments for the submit sub-command.
'''
parser.add_argument('anascript_path',
type=str,
help='path to the analysis script')
parser.add_argument('-w', '--where',
type=str,
choices=['ht-condor', 'slurm', 'grid'],
default='ht-condor',
help='where to submit the analysis')
parser.add_argument('remaining', nargs=argparse.REMAINDER)


# _____________________________________________________________________________
def setup_run_parser(parser):
'''
Define command line arguments for the run sub-command.
Expand Down Expand Up @@ -138,6 +156,7 @@ def setup_run_parser(parser):
help=argparse.SUPPRESS)


# _____________________________________________________________________________
def setup_run_parser_final(parser):
'''
Define command line arguments for the final sub-command.
Expand All @@ -151,6 +170,7 @@ def setup_run_parser_final(parser):
'\'.dot\' or \'.png\'')


# _____________________________________________________________________________
def setup_run_parser_plots(parser):
'''
Define command line arguments for the plots sub-command.
Expand All @@ -172,6 +192,7 @@ def setup_run_parser_plots(parser):
help='maximal y position of the legend')


# _____________________________________________________________________________
def setup_run_parser_combine(parser):
'''
Define command line arguments for the combine sub-command.
Expand All @@ -180,34 +201,37 @@ def setup_run_parser_combine(parser):


# _____________________________________________________________________________
def setup_subparsers(subparsers):
def setup_subparsers(topparser):
'''
Sets all sub-parsers for all sub-commands
'''

# Create sub-parsers
parser_init = subparsers.add_parser(
# Instantiate sub-parsers
parser_init = topparser.add_parser(
'init',
help="generate a RDataFrame based FCC analysis")
parser_build = subparsers.add_parser(
parser_build = topparser.add_parser(
'build',
help='build and install local analysis')
parser_test = subparsers.add_parser(
parser_test = topparser.add_parser(
'test',
help='test whole or a part of the analysis framework')
parser_pin = subparsers.add_parser(
parser_pin = topparser.add_parser(
'pin',
help='pin fccanalyses to the current version of Key4hep stack')
parser_run = subparsers.add_parser(
parser_submit = topparser.add_parser(
'submit',
help="submit the analysis to be run on a remote machine(s)")
parser_run = topparser.add_parser(
'run',
help="run a RDataFrame based FCC analysis")
parser_run_final = subparsers.add_parser(
parser_run_final = topparser.add_parser(
'final',
help="run a RDataFrame based FCC analysis final configuration")
parser_run_plots = subparsers.add_parser(
parser_run_plots = topparser.add_parser(
'plots',
help="run a RDataFrame based FCC analysis plot configuration")
parser_run_combine = subparsers.add_parser(
parser_run_combine = topparser.add_parser(
'combine',
help="prepare combine cards to run basic template fits")

Expand All @@ -216,6 +240,7 @@ def setup_subparsers(subparsers):
setup_build_parser(parser_build)
setup_test_parser(parser_test)
setup_pin_parser(parser_pin)
setup_submit_parser(parser_submit)
setup_run_parser(parser_run)
setup_run_parser_final(parser_run_final)
setup_run_parser_plots(parser_run_plots)
Expand Down
1 change: 0 additions & 1 deletion python/run_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,6 @@ def run(parser):
anapath = os.path.abspath(args.anascript_path)

# Check that the analysis file exists
anapath = args.anascript_path
if not os.path.isfile(anapath):
LOGGER.error('Analysis script %s not found!\nAborting...',
anapath)
Expand Down
Loading

0 comments on commit 93ffacf

Please sign in to comment.