Skip to content

Commit

Permalink
misc process_submissions fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dstndstn committed Apr 16, 2020
1 parent 9715347 commit 744be0e
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 59 deletions.
2 changes: 1 addition & 1 deletion __init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.78-33-gffadbe61'
__version__ = '0.79-26-g3e790b81'
4 changes: 4 additions & 0 deletions blind/engine-main.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ static an_option_t myopts[] = {
"run the index files in parallel"},
{'D', "data-log file", required_argument, "file",
"log data to the given filename"},
{'j', "job-id", required_argument, "jobid",
"IGNORED; purely to allow process to contain the job id!"},
};

static void print_help(const char* progname, bl* opts) {
Expand Down Expand Up @@ -122,6 +124,8 @@ int main(int argc, char** args) {
if (c == -1)
break;
switch (c) {
case 'j':
break;
case 'D':
datalog = optarg;
break;
Expand Down
64 changes: 48 additions & 16 deletions net/find.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@
logging.basicConfig(format='%(message)s',
level=logging.DEBUG)

def bounce_try_dojob(jobid):
print('Trying Job ID', jobid)
job = Job.objects.filter(id=jobid)[0]
print('Found Job', job)
return try_dojob(job, job.user_image)

if __name__ == '__main__':
def bounce_try_dojob(X):
jobid, solve_command, solve_locally = X
try:
from process_submissions import try_dojob
print('Trying Job ID', jobid)
job = Job.objects.filter(id=jobid)[0]
print('Found Job', job)
r = try_dojob(job, job.user_image, solve_command, solve_locally)
print('Job result for', job, ':', r)
return r
except:
import traceback
traceback.print_exc()

def main():
import optparse
parser = optparse.OptionParser('%(prog)')
parser.add_option('-s', '--sub', type=int, dest='sub', help='Submission ID')
Expand Down Expand Up @@ -57,6 +65,9 @@ def bounce_try_dojob(jobid):
parser.add_option('--delete', action='store_true', default=False,
help='Delete everything associated with the given image')

parser.add_option('--delextra', action='store_true', default=False,
help='Delete extraneous duplicate jobs?')

parser.add_option('--hide', action='store_true', default=False,
help='For a UserImage, set publicly_visible=False')

Expand All @@ -67,7 +78,12 @@ def bounce_try_dojob(jobid):
parser.print_help()
sys.exit(-1)

if opt.ssh or opt.empty:
if opt.threads is not None:
mp = multiproc(opt.threads)
else:
mp = None

if opt.ssh or opt.empty or opt.delextra:
subs = Submission.objects.all()
if opt.minsub:
subs = subs.filter(id__gt=opt.minsub)
Expand Down Expand Up @@ -109,28 +125,41 @@ def bounce_try_dojob(jobid):
allfailed = False
break

if opt.delextra:
print('Delextra:', len(jobs), 'jobs', len(uis), 'uis; failedjob:', failedjob)
if len(jobs) > 1 and failedjob is not None:
print('Delextra: delete', failedjob)

if not allfailed:
continue
print('All jobs failed for sub', sub.id) #, 'via ssh failure')
failedsubs.append(sub)
#failedsubs.append(sub)
failedjobs.append(failedjob)

print('Found total of', len(failedsubs), 'failed Submissions')
print('Found total of', len(failedsubs), 'failed Submissions and', len(failedjobs), 'failed Jobs')
if opt.rerun:
from process_submissions import try_dosub, try_dojob

if opt.threads is not None:
mp = multiproc(opt.threads)
args = []
for j in failedjobs:
if j is None:
continue
args.append(j.id) #, j.user_image))
args.append((j.id, opt.solve_command, opt.solve_locally))
mp.map(bounce_try_dojob, args)

else:
for sub in failedsubs:
print('Re-trying sub', sub.id)
try_dosub(sub, 1)
for job in failedjobs:
if job is None:
continue
print('Re-trying job', job.id)
try_dojob(job, job.user_image, opt.solve_command, opt.solve_locally)

# FIXME -- failed subs??
#
# else:
# for sub in failedsubs:
# print('Re-trying sub', sub.id)
# try_dosub(sub, 1)


if opt.sub:
Expand Down Expand Up @@ -232,3 +261,6 @@ def bounce_try_dojob(jobid):
if im.display_image:
im.display_image.delete()

if __name__ == '__main__':
main()

3 changes: 0 additions & 3 deletions net/log.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import logging
logger = logging.getLogger(__name__)
#debug = logger.debug
#loginfo = logger.info
#logmsg = logger.info

def _getstr(args):
try:
Expand Down
106 changes: 68 additions & 38 deletions net/process_submissions.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,26 @@
#! /usr/bin/env python3



import os
import sys
from subprocess import check_output #nosec

# add .. to PYTHONPATH
path = os.path.realpath(__file__)
#print('My path', path)
basedir = os.path.dirname(os.path.dirname(path))

#print('PYTHONPATH is', os.environ['PYTHONPATH'])

#print('Adding basedir', basedir, 'to PYTHONPATH')
sys.path.append(basedir)

# add ../blind and ../util to PATH
os.environ['PATH'] += ':' + os.path.join(basedir, 'blind')
os.environ['PATH'] += ':' + os.path.join(basedir, 'util')

# print('sys.path is:')
# for x in sys.path:
# print(' ', x)
#
# print('PATH is:', os.environ['PATH'])

os.environ['DJANGO_SETTINGS_MODULE'] = 'astrometry.net.settings'

import django
django.setup()

try:
import pyfits
except ImportError:
try:
from astropy.io import fits as pyfits
except ImportError:
raise ImportError("Cannot import either pyfits or astropy.io.fits")


import tempfile
import traceback
from urllib.parse import urlparse
import logging
import urllib.request, urllib.parse, urllib.error
import shutil
import multiprocessing
Expand All @@ -54,10 +31,6 @@
import zipfile
import math

import logging
logging.basicConfig(format='%(message)s',
level=logging.DEBUG)

from astrometry.util import image2pnm
from astrometry.util.filetype import filetype_short
from astrometry.util.run_command import run_command
Expand All @@ -66,7 +39,6 @@
from astrometry.util import util as anutil
from astrometry.util.fits import *

#import astrometry.net.settings as settings
import settings
settings.LOGGING['loggers'][''] = {
'handlers': ['console'],
Expand All @@ -81,10 +53,10 @@
from django.db.models import Q

from logging.config import dictConfig

dictConfig(settings.LOGGING)


import logging
logging.basicConfig(format='%(message)s', level=logging.DEBUG)

def is_tarball(fn):
logmsg('is_tarball: %s' % fn)
Expand Down Expand Up @@ -181,18 +153,24 @@ def create_job_logger(job):
return MyLogger(logger)

def try_dojob(job, userimage, solve_command, solve_locally):
print('try_dojob', job)
print('try_dojob', job, '(sub', job.user_image.submission.id, ')')
try:
return dojob(job, userimage, solve_command=solve_command,
r = dojob(job, userimage, solve_command=solve_command,
solve_locally=solve_locally)
print('try_dojob', job, 'completed:', r)
return r
except OSError as e:
import os.errno
print('OSError processing job', job)
print(e)
import errno
# Too many open files
if e.errno == os.errno.EMFILE:
print('e.errno:', e.errno)
print('errno.EMFILE:', errno.EMFILE)
if e.errno == errno.EMFILE:
print('Too many open files -- exiting!')
sys.exit(-1)
except IOError as e:
import errno
# Too many open files
print('Caught IOError')
print('Errno:', e.errno)
if e.errno == errno.EMFILE:
Expand Down Expand Up @@ -623,6 +601,10 @@ def create_source_list(df):
img = None
fits = None
source_type = None

path = df.get_path()
print('path:', path, type(path))

try:
# see if disk file is a fits list
fits = fits_table(str(df.get_path()))
Expand All @@ -648,7 +630,7 @@ def create_source_list(df):
fits = fits_table(fitsfn)
source_type = 'text'
except Exception as e:
logmsg('Traceback:\n' + traceback.format_exc())
#logmsg('Traceback:\n' + traceback.format_exc())
logmsg('fitsfn: %s' % fitsfn)
logmsg(e)
logmsg('file is not a text list')
Expand Down Expand Up @@ -741,7 +723,9 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,
all_user_images = UserImage.objects.annotate(job_count=Count('jobs'))
newuis = all_user_images.filter(job_count=0)
if newuis.count():
print('Found', len(newuis), 'UserImages without Jobs:', [u.id for u in newuis])
#print('Found', len(newuis), 'UserImages without Jobs:', [u.id for u in newuis])
#print('Found', len(newuis), 'UserImages without Jobs.')
print('Jobs queued:', len(newuis))

runsubs = me.subs.filter(finished=False)
if subresults != lastsubs:
Expand Down Expand Up @@ -777,6 +761,18 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,
qj.success = res.successful()
qj.save()

try:
job = Job.objects.get(id=jid)
print('Job:', job)
print(' status:', job.status)
print(' error message:', job.error_message)
#logfn = job.get_log_file()
print(' log file tail:')
print(job.get_log_tail(nlines=10))

except:
print('exception getting job')

if res.successful():
print('result:', res.get(),)
print()
Expand All @@ -789,7 +785,6 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,

# FIXME -- order by user, etc


## HACK -- order 'newuis' to do the newest ones first... helpful when there
# is a big backlog.
newuis = newuis.order_by('-submission__submitted_on')
Expand All @@ -813,6 +808,41 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,
try_dosub(sub, max_sub_retries)


if dojob_pool:
n_add = dojob_nthreads - len(jobresults)
if n_add <= 0:
# Already full!
continue
# Queue some new ones -- randomly select from waiting users
newuis = list(newuis)
start_newuis = []
import numpy as np
from collections import Counter
print('Need to start', n_add, 'jobs;', len(newuis), 'eligible uis')
while n_add > 0:
cusers = Counter([u.user for u in newuis])
print('Jobs queued:', len(newuis), 'by', len(cusers), 'users; top:')
for k,user in cusers.most_common(5):
try:
print(' ', k, user, user.get_profile().display_name)
except:
print(' ', k, user)
users = list(cusers.keys())
print(len(users), 'eligible users')
if len(users) == 0:
break
iu = np.random.randint(len(users))
user = users[iu]
print('Selected user', user)
for ui in newuis:
if ui.user == user:
print('Selected ui', ui)
newuis.remove(ui)
start_newuis.append(ui)
n_add -= 1
break
newuis = start_newuis

for userimage in newuis:
job = Job(user_image=userimage)
job.set_queued_time()
Expand Down
3 changes: 2 additions & 1 deletion net/views/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,8 @@ def search(req):

from django.test import Client
c = Client()
r = c.get('/user_images/2676353')
#r = c.get('/user_images/2676353')
r = c.get('/extraction_image_full/4005556')
#print(r)
with open('out.html', 'wb') as f:
for x in r:
Expand Down

0 comments on commit 744be0e

Please sign in to comment.