Skip to content

Commit

Permalink
poc(compat): squeue output parsing using header
Browse files Browse the repository at this point in the history
  • Loading branch information
fleimgruber committed Nov 2, 2023
1 parent f104c7c commit 5142fa5
Showing 1 changed file with 53 additions and 13 deletions.
66 changes: 53 additions & 13 deletions src/job_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::io::Lines;
use std::path::PathBuf;
use std::{io::BufRead, process::Command, thread, time::Duration};

Expand All @@ -15,6 +16,22 @@ struct JobWatcher {

pub struct JobWatcherHandle {}

fn addr_of(s: &str) -> usize {
s.as_ptr() as usize
}

fn split_whitespace_indices(s: &str) -> impl Iterator<Item = (usize, &str)> {
s.split_whitespace()
.map(move |sub| (addr_of(sub) - addr_of(s), sub))
}

fn offsets_of(s: String) -> Vec<usize> {
let iter = split_whitespace_indices(&s);
let mut indices: Vec<usize> = iter.map(|split| split.0).collect();
indices.push(s.len());
return indices;
}

impl JobWatcher {
fn new(app: Sender<AppMessage>, interval: Duration, squeue_args: Vec<String>) -> Self {
Self {
Expand All @@ -24,45 +41,51 @@ impl JobWatcher {
}
}

/// Run Slurm squeue compatible with legacy Slurm versions
///
/// API compatible with `run`. Uses column header index offsets instead of a magic
/// string splitter.
fn run(&mut self) -> Self {
let output_separator = "###turm###";
let fields = [
"jobid",
"name",
"state",
"username",
"timeused",
"tres-alloc",
"tres-alloc:80",
"partition",
"nodelist",
"stdout",
"stdout:80",
"stderr",
"command",
"command:80",
"statecompact",
"reason",
"ArrayJobID", // %A
"ArrayTaskID", // %a
"NodeList", // %N
"WorkDir", // for fallback
];
let output_format = fields
.map(|s| s.to_owned() + ":" + output_separator)
.join(",");
let output_format = fields.map(|s| s.to_owned()).join(",");

loop {
let jobs: Vec<Job> = Command::new("squeue")
let results = Command::new("squeue")
.args(&self.squeue_args)
.arg("--array")
.arg("--noheader")
.arg("--Format")
.arg(&output_format)
.output()
.expect("failed to execute process")
.stdout
.lines()
.expect("failed to execute process");
let mut lines: Lines<&[u8]> = results.stdout.lines();
let first_line = lines.next().unwrap().unwrap();
let offsets = offsets_of(first_line);
let jobs: Vec<Job> = lines
.map(|l| l.unwrap().trim().to_string())
.filter_map(|l| {
let parts: Vec<_> = l.split(output_separator).collect();
let mut parts: Vec<_> = (0..offsets.len() - 1)
.map(|i| l[offsets[i]..offsets[i + 1]].trim())
.collect();
let last_offset: usize = offsets[offsets.len() - 1];
parts.push(&l[last_offset..l.len() - 1]);

if parts.len() != fields.len() + 1 {
return None;
Expand Down Expand Up @@ -208,3 +231,20 @@ impl JobWatcherHandle {
Self {}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_offsets_of() {
let s = b"JOBID NAME STATE USER TIME TRES_ALLOC PARTITION NODELIST STDOUT STDERR COMMAND ST REASON ARRAY_JOB_ID ARRAY_TASK_ID NODELIST WORK_DIR";
let mut lines = s.lines();
let first_line = lines.next().unwrap().unwrap();
let results = offsets_of(first_line);
assert_eq!(
results,
[0, 20, 40, 60, 80, 100, 120, 140, 160, 260, 280, 300, 320, 340, 360, 380, 400, 408]
);
}
}

0 comments on commit 5142fa5

Please sign in to comment.