From 8f720427f8aea74642b5c82b732e0c6588a17751 Mon Sep 17 00:00:00 2001 From: Rob Svirskas Date: Wed, 17 Jan 2024 19:52:15 -0500 Subject: [PATCH] Add Codex data to DynamoDB --- bin/load_codex_to_mongo.py | 174 ++++++++++++++++++++++++++++++++++--- 1 file changed, 164 insertions(+), 10 deletions(-) diff --git a/bin/load_codex_to_mongo.py b/bin/load_codex_to_mongo.py index 8da27a8..b5ee01a 100644 --- a/bin/load_codex_to_mongo.py +++ b/bin/load_codex_to_mongo.py @@ -11,6 +11,9 @@ import socket import sys import time +import boto3 +from boto3.dynamodb.conditions import Key +import inquirer from tqdm import tqdm import jrc_common.jrc_common as JRC @@ -19,6 +22,12 @@ DB = {} # UUIDs KEYS = {} +# Codex IDs and types +CODEX_TYPE = {} +# Actions +ACTION = {} +# Counters +COUNT = {'found': 0, 'hinsertions': 0, 'iinsertions': 0, 'minsertions': 0} def terminate_program(msg=None): ''' Terminate the program gracefully @@ -32,6 +41,29 @@ def terminate_program(msg=None): sys.exit(-1 if msg else 0) +def get_table(): + """ Allow usser to select a table + Keyword arguments: + None + Returns: + None + """ + DB['dynamo'] = boto3.resource("dynamodb") + dtables = list(DB['dynamo'].tables.all()) + tname = [] + for tbl in dtables: + ddbt = DB['dynamo'].Table(tbl.name) + if ddbt.name.startswith('janelia-neuronbridge-published-v'): + tname.append(ddbt.name) + quest = [inquirer.List('table', + message='Select NeuronBridge version', + choices=tname)] + ans = inquirer.prompt(quest) + if not ans: + terminate_program() + ARG.TABLE = ans['table'] + + def initialize_program(): ''' Intialize the program Keyword arguments: @@ -53,6 +85,18 @@ def initialize_program(): DB[source] = JRC.connect_database(dbo) except Exception as err: # pylint: disable=broad-exception-caught terminate_program(err) + if not ARG.TABLE: + get_table() + for dbn in ("MongoDB", "DynamoDB"): + ACTION[dbn] = False + quest = [inquirer.Checkbox("actions", + message="Which databases should be updated?", + choices=["MongoDB", "DynamoDB"], + default=["MongoDB", "DynamoDB"],)] + ans = inquirer.prompt(quest) + for dbn in ("MongoDB", "DynamoDB"): + if dbn in ans['actions']: + ACTION[dbn] = True def generate_uid(deployment_context=2): @@ -102,6 +146,8 @@ def insert_dataset(coll): Returns: UID """ + if not ACTION['MongoDB']: + return None LOGGER.warning("Dataset codex:%s will be created", ARG.VERSION) dtm = datetime.now() payload = {'class': 'org.janelia.model.domain.flyem.EMDataSet', @@ -120,7 +166,7 @@ def insert_dataset(coll): } last_uid = generate_uid() payload['_id'] = last_uid - if ARG.WRITE: + if ACTION['MongoDB'] and ARG.WRITE: result = coll.insert(payload) return result return None @@ -155,6 +201,104 @@ def create_body_payload(entry, dtm, dsid): return payload + +def read_neuron_type(ntype, tbl): + """ Retrieve a neuron record from DynamoDB table + Keyword arguments: + ntype: neuron type + tbl: DynamoDB table + Returns: + DynamoDB record + """ + response = tbl.query( + KeyConditionExpression=Key('itemType').eq('searchString') \ + & Key('searchKey').eq(ntype.lower()) + ) + return response + + +def add_body_ids(htype, payload): + """ Retrieve a neuron record from DynamoDB table + Keyword arguments: + htype: hemibrain (neuron) type + payload: current payload for neuron type + Returns: + None + """ + tbl = DB['dynamo'].Table(ARG.TABLE) + rec = read_neuron_type(htype, tbl) + if rec and rec['Items']: + for itm in rec['Items']: + if 'bodyIDs' in itm: + COUNT['found'] += 1 + payload['bodyIDs'] = itm['bodyIDs'] + + +def write_dynamodb(codex_id): + """ Write Codex hemibrain types and IDs to DynamoDB + Keyword arguments: + codex_id: Codex ID + Returns: + None + """ + LOGGER.debug(codex_id) + hbatch = [] + for htype in tqdm(CODEX_TYPE, desc='Processing Codex types'): + payload = {'itemType': 'searchString', + 'searchKey': htype.lower(), + 'filterKey': htype.lower(), + 'keyType': 'neuronType', + 'name': htype} + payload['codexIDs'] = [] + for cid in CODEX_TYPE[htype]: + payload['codexIDs'].append({cid: True}) + add_body_ids(htype, payload) + hbatch.append(payload) + if not ARG.WRITE: + return + tbl = DB['dynamo'].Table(ARG.TABLE) + # Hemibrain types + LOGGER.info(f"Batch writing {len(hbatch):,} hemibrain types to {ARG.TABLE}") + with tbl.batch_writer() as writer: + for item in tqdm(hbatch, desc="Writing Codex types"): + if ARG.THROTTLE and (not COUNT["hinsertions"] % ARG.THROTTLE): + time.sleep(2) + writer.put_item(Item=item) + COUNT["hinsertions"] += 1 + # Codex IDs + LOGGER.info(f"Batch writing {len(codex_id):,} Codex IDs to {ARG.TABLE}") + with tbl.batch_writer() as writer: + for item in tqdm(codex_id, desc="Writing Codex IDs"): + if ARG.THROTTLE and (not COUNT["iinsertions"] % ARG.THROTTLE): + time.sleep(2) + writer.put_item(Item=item) + COUNT["iinsertions"] += 1 + + +def create_dynamo_payload(entry): + """ Create the payload for a Codex ID insertion into DynamoDB + Keyword arguments: + entry: contains Codex root ID and neuron type + Returns: + payload + """ + # Codex ID + cid = f"c{entry[0]}" + payload = {'itemType': 'searchString', + 'searchKey': cid, + 'filterKey': cid, + 'keyType': 'codexID', + 'name': cid} + # Hemibrain type + if entry[1]: + for htype in entry[1].split(","): + if htype not in CODEX_TYPE: + CODEX_TYPE[htype] = {} + CODEX_TYPE[htype][cid] = True + # cdm? ppp? + return payload + + def process_entries(entries, dsid): """ Build an array of records to insert into jacs.emBody Keyword arguments: @@ -164,15 +308,20 @@ def process_entries(entries, dsid): payload """ dtm = datetime.now() - LOGGER.info("Found %d entries", len(entries)) + LOGGER.info(f"Found {len(entries):,} entries") coll = DB['jacs'].emBody docs = [] - for entry in tqdm(entries, desc='Building insert array'): + codex_id = [] + for entry in tqdm(entries, desc='Building insert list'): docs.append(create_body_payload(entry, dtm, dsid)) - time.sleep(.00001) - LOGGER.info("Writing %d records to emBody", len(docs)) - if ARG.WRITE: + codex_id.append(create_dynamo_payload(entry)) + if ACTION['MongoDB'] and ARG.WRITE: + print(f"Writing {len(docs):,} records to emBody") coll.insert_many(docs) + print(f"Found {len(codex_id):,} Codex IDs") + print(f"Found {len(CODEX_TYPE):,} Codex hemibrain types") + write_dynamodb(codex_id) + print(COUNT) def process_codex(): @@ -184,11 +333,9 @@ def process_codex(): """ coll = DB['jacs'].emDataSet result = coll.count_documents({'name': 'codex', 'version': ARG.VERSION}) - if result: + if result and ACTION['MongoDB']: LOGGER.warning('Dataset codex:%s already exists', ARG.VERSION) terminate_program() - #result = coll.find({'name': 'codex', 'version': ARG.VERSION}) - #dsid = result[0]['_id'] else: dsid = insert_dataset(coll) entries = [] @@ -198,6 +345,10 @@ def process_codex(): continue entries.append([row[0], row[6]]) process_entries(entries, dsid) + print(f"MongoDB Codex ID updates: {COUNT['minsertions']}") + print(f"DynamoDB Codex ID updates: {COUNT['iinsertions']}") + print(f"DynamoDB Codex type updates: {COUNT['hinsertions']}") + print(f"Previously existing types: {COUNT['found']}") # ----------------------------------------------------------------------------- @@ -205,13 +356,16 @@ def process_codex(): PARSER = argparse.ArgumentParser( description="Upload data from Codex") PARSER.add_argument('--file', dest='FILE', action='store', - default='classification.csv', help='Codex file') + required=True, help='Codex file') PARSER.add_argument('--version', dest='VERSION', action='store', required=True, help='Codex version (snapshot)') PARSER.add_argument('--manifold', dest='MANIFOLD', action='store', default='dev', choices=['dev', 'prod'], help='MongoDB manifold') + PARSER.add_argument('--table', dest='TABLE', help='DynamoDB table') PARSER.add_argument('--write', dest='WRITE', action='store_true', default=False, help='Write to MongoDB') + PARSER.add_argument('--throttle', type=int, dest='THROTTLE', + default=0, help='DynamoDB batch write throttle (# items)') PARSER.add_argument('--verbose', dest='VERBOSE', action='store_true', default=False, help='Flag, Chatty') PARSER.add_argument('--debug', dest='DEBUG', action='store_true',