Skip to content

Commit

Permalink
also report the duration and user of running jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
nova committed Jul 30, 2024
1 parent 7a31e88 commit 1b398aa
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions net/process_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,8 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,

subresults = []
jobresults = []
# jobid -> (user, time started)
jobinfo = {}

#
me = ProcessSubmissions(pid=os.getpid())
Expand Down Expand Up @@ -879,16 +881,16 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,
#all_user_images = UserImage.objects.annotate(job_count=Count('jobs'))
#newuis = all_user_images.filter(job_count=0)
t0 = time.time()
newuis = UserImage.objects.filter(has_job=False)
newuis = UserImage.objects.filter(has_job=False).select_related('user')
t1 = time.time()
print('Selecting UserImages without Jobs: took %.1f seconds' % (t1-t0))
nuc = newuis.count()
n_new_ui = newuis.count()
t2 = time.time()
print('Counting: %.1f sec' % (t2-t1))
if nuc:
if n_new_ui:
#print('Found', len(newuis), 'UserImages without Jobs:', [u.id for u in newuis])
#print('Found', len(newuis), 'UserImages without Jobs.')
print('Jobs need to be started for', nuc, 'UserImages')
print('Jobs need to be started for', n_new_ui, 'UserImages')

runsubs = me.subs.filter(finished=False)
if subresults != lastsubs:
Expand All @@ -913,12 +915,17 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,
print('Jobs running:', len(jobresults))
lastjobs = jobresults
any_jobs_finished = False
tnow = time.time()
for jid,res in jobresults[:]:
print(' Job id', jid, 'ready:', res.ready(), end=' ')
u,t0 = jobinfo[jid]
print(' Job id', jid, 'ready:', res.ready(),
'running for %.1f sec' % (tnow - t0), 'user', u,
end=' ')
if res.ready():
any_jobs_finished = True
jobresults.remove((jid,res))
print('success:', res.successful())
del jobinfo[jid]

qj = runjobs.get(job__id=jid)
qj.finished = True
Expand Down Expand Up @@ -958,7 +965,7 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,
continue

# No new work to do?
if (len(newsubs) + len(newuis)) == 0:
if (len(newsubs) + n_new_ui) == 0:
time.sleep(refresh_rate)
continue

Expand Down Expand Up @@ -1006,7 +1013,10 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,
while n_add > 0:
if len(newuis) == 0:
break
t0 = time.time()
cusers = Counter([u.user for u in newuis])
t1 = time.time()
print('Counting newui users: %.1f sec' % (t1-t0))
print('Jobs queued:', len(newuis), 'by', len(cusers), 'users; top:')
for user,n in cusers.most_common(5):
try:
Expand Down Expand Up @@ -1061,6 +1071,7 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,
res = dojob_pool.apply_async(try_dojob, (job, userimage, solve_command, solve_locally),
callback=job_callback)
jobresults.append((job.id, res))
jobinfo[job.id] = (userimage.user, time.time())
else:
try_dojob(job, userimage, solve_command=solve_command, solve_locally=solve_locally)

Expand Down

0 comments on commit 1b398aa

Please sign in to comment.