Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

poc(compat): squeue output parsing using header #20

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 54 additions & 14 deletions src/job_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::io::Lines;
use std::path::PathBuf;
use std::{io::BufRead, process::Command, thread, time::Duration};

Expand All @@ -15,6 +16,22 @@ struct JobWatcher {

pub struct JobWatcherHandle {}

fn addr_of(s: &str) -> usize {
s.as_ptr() as usize
}

fn split_whitespace_indices(s: &str) -> impl Iterator<Item = (usize, &str)> {
s.split_whitespace()
.map(move |sub| (addr_of(sub) - addr_of(s), sub))
}

fn offsets_of(s: String) -> Vec<usize> {
let iter = split_whitespace_indices(&s);
let mut indices: Vec<usize> = iter.map(|split| split.0).collect();
indices.push(s.len());
return indices;
}

impl JobWatcher {
fn new(app: Sender<AppMessage>, interval: Duration, squeue_args: Vec<String>) -> Self {
Self {
Expand All @@ -24,45 +41,51 @@ impl JobWatcher {
}
}

/// Run Slurm squeue compatible with legacy Slurm versions
///
/// API compatible with `run`. Uses column header index offsets instead of a magic
/// string splitter.
fn run(&mut self) -> Self {
let output_separator = "###turm###";
let fields = [
"jobid",
"name",
"state",
"username",
"timeused",
"tres-alloc",
"tres-alloc:80",
"partition",
"nodelist",
"stdout",
"stderr",
"command",
"stdout:80",
"stderr:80",
"command:80",
"statecompact",
"reason",
"ArrayJobID", // %A
"ArrayTaskID", // %a
"NodeList", // %N
"WorkDir", // for fallback
];
let output_format = fields
.map(|s| s.to_owned() + ":" + output_separator)
.join(",");
let output_format = fields.map(|s| s.to_owned()).join(",");

loop {
let jobs: Vec<Job> = Command::new("squeue")
let results = Command::new("squeue")
.args(&self.squeue_args)
.arg("--array")
.arg("--noheader")
.arg("--Format")
.arg(&output_format)
.output()
.expect("failed to execute process")
.stdout
.lines()
.expect("failed to execute process");
let mut lines: Lines<&[u8]> = results.stdout.lines();
let first_line = lines.next().unwrap().unwrap();
let offsets = offsets_of(first_line);
let jobs: Vec<Job> = lines
.map(|l| l.unwrap().trim().to_string())
.filter_map(|l| {
let parts: Vec<_> = l.split(output_separator).collect();
let mut parts: Vec<_> = (0..offsets.len() - 1)
.map(|i| l[offsets[i]..offsets[i + 1]].trim())
.collect();
let last_offset: usize = offsets[offsets.len() - 1];
parts.push(&l[last_offset..l.len() - 1]);

if parts.len() != fields.len() + 1 {
return None;
Expand Down Expand Up @@ -208,3 +231,20 @@ impl JobWatcherHandle {
Self {}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_offsets_of() {
let s = b"JOBID NAME STATE USER TIME TRES_ALLOC PARTITION NODELIST STDOUT STDERR COMMAND ST REASON ARRAY_JOB_ID ARRAY_TASK_ID NODELIST WORK_DIR";
let mut lines = s.lines();
let first_line = lines.next().unwrap().unwrap();
let results = offsets_of(first_line);
assert_eq!(
results,
[0, 20, 40, 60, 80, 100, 120, 140, 160, 260, 280, 300, 320, 340, 360, 380, 400, 408]
);
}
}