Skip to content

Commit

Permalink
Can now read from manifest
Browse files Browse the repository at this point in the history
  • Loading branch information
robsv committed Jan 11, 2024
1 parent ec9e93c commit 363394c
Showing 1 changed file with 161 additions and 42 deletions.
203 changes: 161 additions & 42 deletions bin/backcheck_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
from types import SimpleNamespace
import boto3
import MySQLdb
from tqdm import tqdm
import jrc_common.jrc_common as JRC
import neuronbridge_lib as NB
Expand All @@ -18,6 +19,10 @@
S3_SECONDS = 60 * 60 * 12
# Database
DB = {}
READ = {"RELEASES": "SELECT publishing_name,slide_code,GROUP_CONCAT(DISTINCT alps_release) "
+ "AS rels FROM image_data_mv WHERE alps_release IS NOT NULL GROUP BY 1,2",
}
RELEASE = {}
# Configuration
MANIFOLDS = ['dev', 'prod', 'devpre', 'prodpre']

Expand Down Expand Up @@ -75,18 +80,51 @@ def initialize_program():
dbconfig = JRC.get_config("databases")
except Exception as err: # pylint: disable=broad-exception-caught
terminate_program(err)
dbo = attrgetter(f"neuronbridge.{ARG.MONGO}.read")(dbconfig)
LOGGER.info("Connecting to %s %s on %s as %s", dbo.name, 'prod', dbo.host, dbo.user)
DB['NB'] = JRC.connect_database(dbo)
for dbname in ('sage', 'neuronbridge'):
mfd = 'prod' if dbname == 'sage' else ARG.MONGO
dbo = attrgetter(f"{dbname}.{mfd}.read")(dbconfig)
LOGGER.info("Connecting to %s %s on %s as %s", dbo.name, mfd, dbo.host, dbo.user)
if dbname == 'sage':
DB[dbname] = JRC.connect_database(dbo)
else:
DB['NB'] = JRC.connect_database(dbo)
initialize_s3()

def populate_releases():
""" Populate the RELEASE dict with releases for publishing names and slide codes
Keyword arguments:
None
Returns:
None
"""
LOGGER.info("Fetching releases for publishing names and slide codes")
try:
DB['sage']['cursor'].execute(READ['RELEASES'])
rows = DB['sage']['cursor'].fetchall()
except MySQLdb.Error as err:
terminate_program(JRC.sql_error(err))
for row in rows:
RELEASE[row['publishing_name']] = row['rels']
RELEASE[row['slide_code']] = row['rels']


def get_published_names():
""" Get published names from neuronbridgee.neuronMetadata
def get_releases(key):
""" Get ALPS releases for a given publishing name/slide code
Keyword arguments:
key: publishing name/slide code
Returns:
String containing releases
"""
return f" ({RELEASE[key]})" if key in RELEASE else ''


def get_mongo_data():
""" Get published names from neuronbridge.neuronMetadata
Keyword arguments:
None
Returns:
pname: dict of publishing names
scode: dict of slide codes
"""
try:
libraries = simplenamespace_to_dict(JRC.get_config("cdm_library"))
Expand All @@ -100,20 +138,25 @@ def get_published_names():
break
if not libname:
terminate_program(f"Could not find library for {ARG.LIBRARY}")
coll = DB['NB'].neuronMetadata
coll = DB['NB'][ARG.SOURCE]
payload = {"alignmentSpace": ARG.TEMPLATE,
"libraryName": libname}
pname = {}
LOGGER.info(f"Searching neuronMetadata for {ARG.TEMPLATE}/{libname}")
scode = {}
LOGGER.info(f"Searching {ARG.SOURCE} for {ARG.TEMPLATE}/{libname}")
try:
results = coll.find(payload)
except Exception as err:
terminate_program(err)
for row in results:
if row['publishedName']:
if ARG.SOURCE == 'publishedURL':
row['publishedName'] = row['publishedName'].split(':')[-1]
pname[row['publishedName']] = True
print(f"Found {len(pname):,} publishing names in neuronMetadata")
return pname
if 'slideCode' in row:
scode[row['slideCode']] = row['publishedName']
print(f"Found {len(pname):,} publishing names and {len(scode):,} slide codes in {ARG.SOURCE}")
return pname, scode


def humansize(num, suffix='B'):
Expand Down Expand Up @@ -147,55 +190,131 @@ def simplenamespace_to_dict(nspace):
return result


def run_backcheck():
""" Check publishing names in S3 vs. neuronMetadata
def get_aws_data():
""" Get published names and slide codes from AWS
Keyword arguments:
None
Returns:
None
pname: dict of publishing names
scode: dict of slide codes
"""
bucket = 'janelia-flylight-color-depth'
if not ARG.TEMPLATE:
ARG.TEMPLATE = NB.get_template(S3['client'], bucket)
if not ARG.LIBRARY:
ARG.LIBRARY = NB.get_library_from_aws(S3['client'], bucket, ARG.TEMPLATE)
mpname = get_published_names()
prefix = '/'.join([ARG.TEMPLATE, ARG.LIBRARY]) + '/'
objs = NB.get_all_s3_objects(S3['client'], Bucket=bucket, Prefix=prefix)
total_objs = total_size = 0
files = []
for obj in tqdm(objs, desc='Finding files on S3'):
total_objs += 1
if '/searchable_neurons/' not in obj['Key'] or not obj['Key'].endswith('.tif'):
continue
total_size += obj['Size']
files.append(obj['Key'])
if ARG.MANIFEST:
LOGGER.info(f"Searching manifest for {ARG.TEMPLATE}/{ARG.LIBRARY}")
prefix = '/'.join([ARG.TEMPLATE, ARG.LIBRARY]) + '/searchable_neurons/'
with open(ARG.MANIFEST, 'r', encoding='ascii') as instream:
rows = instream.read().splitlines()
for row in rows:
total_objs += 1
if not row.startswith(prefix) or not row.endswith('.tif'):
continue
files.append(row.split('/')[-1])
else:
LOGGER.info(f"Searching AWS for {ARG.TEMPLATE}/{ARG.LIBRARY}")
prefix = '/'.join([ARG.TEMPLATE, ARG.LIBRARY]) + '/'
objs = NB.get_all_s3_objects(S3['client'], Bucket=ARG.BUCKET, Prefix=prefix)
for obj in tqdm(objs, desc='Finding files on S3'):
total_objs += 1
if '/searchable_neurons/' not in obj['Key'] or not obj['Key'].endswith('.tif'):
continue
total_size += obj['Size']
files.append(obj['Key'])
LOGGER.info(f"Checked {total_objs:,} objects on S3")
print(f"Found {len(files):,} objects ({humansize(total_size)})")
apname = {}
pname = {}
scode = {}
for file in files:
apname[file.split('/')[-1].split('-')[0]] = True
print(f"Found {len(apname):,} distinct publishing names in S3")
good = True
for cpn in tqdm(apname, desc='AWS S3'):
if cpn not in mpname:
good = False
LOGGER.warning(f"{cpn} is in S3 but not in neuronMetadata")
for cpn in tqdm(mpname, desc='neuronMetadata'):
if cpn not in apname:
good = False
LOGGER.warning(f"{cpn} is in neuronMetadata but not in S3")
if good:
print("All publishing names matched")
fname = file.split('/')[-1]
pname[fname.split('-')[0]] = True
if library_type() != 'flyem':
scode[fname.split('-')[1]] = fname.split('-')[0]
print(f"Found {len(pname):,} publishing names and {len(scode):,} slide codes in S3")
return pname, scode


def library_type():
""" Get the library type from the library name
Keyword arguments:
None
Returns:
"flyem" or "flylight"
"""
return 'flyem' if 'flyem' in ARG.LIBRARY.lower() else 'flylight'


def report_errors(mpname, mscode, apname, ascode):
""" Report on publishing name/slide code errors
Keyword arguments:
mpname: dict of publishing names from Mongo
mscode: dict of slide codes from Mongo
apname: dict of publishing names from AWS
ascode: dict of slide codes from AWS
Returns:
errors: list of errors
"""
errors = []
# Slide codes
for key in tqdm(ascode, desc='AWS S3 slide codes'):
if key not in mscode:
rel = '' if library_type() == 'flyem' else get_releases(key)
errors.append(f"{key}{rel} is in S3 but not in {ARG.SOURCE}")
for key in tqdm(mscode, desc=f"{ARG.SOURCE} slide codes"):
if key not in ascode:
rel = '' if library_type() == 'flyem' else get_releases(key)
errors.append(f"{key}{rel} is in {ARG.SOURCE} but not in S3")
# Publishing names
for key in tqdm(apname, desc='AWS S3 publishing names'):
if key not in mpname:
rel = '' if library_type() == 'flyem' else get_releases(key)
errors.append(f"{key}{rel} is in S3 but not in {ARG.SOURCE}")
for key in tqdm(mpname, desc=f"{ARG.SOURCE} publishing names"):
if key not in apname:
rel = '' if library_type() == 'flyem' else get_releases(key)
errors.append(f"{key}{rel} is in {ARG.SOURCE} but not in S3")
return errors


def run_backcheck():
""" Check publishing names in S3 vs. publishedURL or neuronMetadata
Keyword arguments:
None
Returns:
None
"""
if not ARG.TEMPLATE:
ARG.TEMPLATE = NB.get_template(S3['client'], ARG.BUCKET)
if not ARG.LIBRARY:
ARG.LIBRARY = NB.get_library_from_aws(S3['client'], ARG.BUCKET, ARG.TEMPLATE)
if library_type() != 'flyem':
populate_releases()
mpname, mscode = get_mongo_data()
apname, ascode = get_aws_data()
errors = report_errors(mpname, mscode, apname, ascode)
if errors:
LOGGER.error("There are discrepancies in publishing names/slide codes")
with open('backcheck_mismatches.txt','w', encoding='ascii') as outstream:
outstream.write(f"{ARG.TEMPLATE}/{ARG.LIBRARY}\n")
for err in errors:
outstream.write(f"{err}\n")
else:
print("All publishing names/slide codes matched")


if __name__ == '__main__':
PARSER = argparse.ArgumentParser(
description="Backcheck AWS S3 to neuronMetadata")
PARSER.add_argument('--bucket', dest='BUCKET', action='store',
default='janelia-flylight-color-depth', help='AWS S3 bucket')
PARSER.add_argument('--template', dest='TEMPLATE', action='store',
help='alignment template')
help='Alignment template')
PARSER.add_argument('--library', dest='LIBRARY', action='store',
default='', help='color depth library')
default='', help='Color depth library')
PARSER.add_argument('--source', dest='SOURCE', action='store',
default='publishedURL', choices=['neuronMetadata', 'publishedURL'],
help='Source connection (neuronMetadata, publishedURL)')
PARSER.add_argument('--manifest', dest='MANIFEST', action='store',
help='AWS S3 bucket manifest')
PARSER.add_argument('--manifold', dest='MANIFOLD', action='store',
default='prod', choices=MANIFOLDS, help='S3 manifold')
PARSER.add_argument('--mongo', dest='MONGO', action='store',
Expand Down

0 comments on commit 363394c

Please sign in to comment.