-
Notifications
You must be signed in to change notification settings - Fork 5
/
slurm_job_waste.py
127 lines (109 loc) · 4.25 KB
/
slurm_job_waste.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/python
"""
slurm_job_waste.py
Who is requesting more resources than they need?
"""
import re
import subprocess
from datetime import datetime
import diamond.collector
class SlurmJobWasteCollector(diamond.collector.Collector):
def get_default_config(self):
"""
Returns the default collector settings
"""
config = super(SlurmJobWasteCollector, self).get_default_config()
config.update({
'path': 'waste'
})
return config
def convert2sec(self, t):
#Converts DD-HH:MM:SS to seconds
if ('-' in t):
(days, tempt) = t.split('-')
else:
tempt = t
days = 0
if (tempt.count(":") == 2):
(hours, mins, secs) = tempt.split(':')
elif(tempt.count(":") == 1):
(mins,secs) = tempt.split(':')
hours = 0
else:
secs = tempt
mins = 0
hours = 0
tsec = 86400.0*float(days)+3600.0*float(hours)+60.0*float(mins)+float(secs)
return tsec
def collect(self):
# Initial settings
CPUTRESWeight = 1.0
MemTRESWeight = 0.25
CPUWastedTRES = 0
MemWastedTRES = 0
User = ""
memstats = {}
cpustats = {}
account = {}
"""
Collect job waste per user
"""
try:
# Grab data from slurm
proc = subprocess.Popen(
'sacct -S {hour}:00 -E now --state=CD --units=G -n -P -o user,Account,ReqCPUS,NNodes,ReqMEM,MaxRSS,Elapsed,TotalCPU'.format(
hour=datetime.now().hour
).split(),
stdout=subprocess.PIPE,
universal_newlines=True
)
except Exception:
self.log.exception("error occured fetching job hash")
return
else:
for line in proc.stdout:
# Split out data
(LUser, LAccount, LReqCPUS, LNNodes, LReqMem, LMaxRSS, LElapsed, LTotalCPU) = line.strip().split('|')
if LUser != "":
# If on initial job entry, pull majority of data
if User != "":
# If this the start of a new Job permanently store data
cpustats[User] = float(cpustats[User])+CPUWastedTRES
memstats[User] = float(memstats[User])+MemWastedTRES
# Data
User = LUser
if User in memstats:
continue
else:
memstats[User]=0
if User in cpustats:
continue
else:
cpustats[User]=0
account[User]=LAccount
ReqCPUS = LReqCPUS
NNodes = LNNodes
ReqMem = LReqMem
Elapsed = LElapsed
TotalCPU = LTotalCPU
MaxRSS = 0
if ('Gn' in ReqMem):
ReqMem=ReqMem.strip("Gn")
if ('Gc' in ReqMem):
ReqMem=float(ReqMem.strip("Gc"))*float(ReqCPUS)/float(NNodes)
# Now to compute CPU Wasted Tres
elapsedt = self.convert2sec(Elapsed)
totalcput = self.convert2sec(TotalCPU)
CPUWastedTRES=max(0,CPUTRESWeight*(float(ReqCPUS)*float(elapsedt)-float(totalcput)))
else:
MaxRSS=max(MaxRSS,LMaxRSS.strip("G"))
# Now to compute Mem Wasted Tres
MemWastedTRES=max(0,MemTRESWeight*(float(ReqMem)-float(MaxRSS))*float(elapsedt))
cpustats[User] = float(cpustats[User])+CPUWastedTRES
memstats[User] = float(memstats[User])+MemWastedTRES
# Publish data
for user in account:
totalwaste = float(cpustats[user]) + float(memstats[user])
self.publish('{}.{}.cpuwaste'.format(account[user],user),cpustats[user])
self.publish('{}.{}.memwaste'.format(account[user],user),memstats[user])
self.publish('{}.{}.totalwaste'.format(account[user],user),totalwaste)