Skip to content

Commit

Permalink
Add Codex data to DynamoDB
Browse files Browse the repository at this point in the history
  • Loading branch information
robsv committed Jan 18, 2024
1 parent 363394c commit 8f72042
Showing 1 changed file with 164 additions and 10 deletions.
174 changes: 164 additions & 10 deletions bin/load_codex_to_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import socket
import sys
import time
import boto3
from boto3.dynamodb.conditions import Key
import inquirer
from tqdm import tqdm
import jrc_common.jrc_common as JRC

Expand All @@ -19,6 +22,12 @@
DB = {}
# UUIDs
KEYS = {}
# Codex IDs and types
CODEX_TYPE = {}
# Actions
ACTION = {}
# Counters
COUNT = {'found': 0, 'hinsertions': 0, 'iinsertions': 0, 'minsertions': 0}

def terminate_program(msg=None):
''' Terminate the program gracefully
Expand All @@ -32,6 +41,29 @@ def terminate_program(msg=None):
sys.exit(-1 if msg else 0)


def get_table():
""" Allow usser to select a table
Keyword arguments:
None
Returns:
None
"""
DB['dynamo'] = boto3.resource("dynamodb")
dtables = list(DB['dynamo'].tables.all())
tname = []
for tbl in dtables:
ddbt = DB['dynamo'].Table(tbl.name)
if ddbt.name.startswith('janelia-neuronbridge-published-v'):
tname.append(ddbt.name)
quest = [inquirer.List('table',
message='Select NeuronBridge version',
choices=tname)]
ans = inquirer.prompt(quest)
if not ans:
terminate_program()
ARG.TABLE = ans['table']


def initialize_program():
''' Intialize the program
Keyword arguments:
Expand All @@ -53,6 +85,18 @@ def initialize_program():
DB[source] = JRC.connect_database(dbo)
except Exception as err: # pylint: disable=broad-exception-caught
terminate_program(err)
if not ARG.TABLE:
get_table()
for dbn in ("MongoDB", "DynamoDB"):
ACTION[dbn] = False
quest = [inquirer.Checkbox("actions",
message="Which databases should be updated?",
choices=["MongoDB", "DynamoDB"],
default=["MongoDB", "DynamoDB"],)]
ans = inquirer.prompt(quest)
for dbn in ("MongoDB", "DynamoDB"):
if dbn in ans['actions']:
ACTION[dbn] = True


def generate_uid(deployment_context=2):
Expand Down Expand Up @@ -102,6 +146,8 @@ def insert_dataset(coll):
Returns:
UID
"""
if not ACTION['MongoDB']:
return None
LOGGER.warning("Dataset codex:%s will be created", ARG.VERSION)
dtm = datetime.now()
payload = {'class': 'org.janelia.model.domain.flyem.EMDataSet',
Expand All @@ -120,7 +166,7 @@ def insert_dataset(coll):
}
last_uid = generate_uid()
payload['_id'] = last_uid
if ARG.WRITE:
if ACTION['MongoDB'] and ARG.WRITE:
result = coll.insert(payload)
return result
return None
Expand Down Expand Up @@ -155,6 +201,104 @@ def create_body_payload(entry, dtm, dsid):
return payload



def read_neuron_type(ntype, tbl):
""" Retrieve a neuron record from DynamoDB table
Keyword arguments:
ntype: neuron type
tbl: DynamoDB table
Returns:
DynamoDB record
"""
response = tbl.query(
KeyConditionExpression=Key('itemType').eq('searchString') \
& Key('searchKey').eq(ntype.lower())
)
return response


def add_body_ids(htype, payload):
""" Retrieve a neuron record from DynamoDB table
Keyword arguments:
htype: hemibrain (neuron) type
payload: current payload for neuron type
Returns:
None
"""
tbl = DB['dynamo'].Table(ARG.TABLE)
rec = read_neuron_type(htype, tbl)
if rec and rec['Items']:
for itm in rec['Items']:
if 'bodyIDs' in itm:
COUNT['found'] += 1
payload['bodyIDs'] = itm['bodyIDs']


def write_dynamodb(codex_id):
""" Write Codex hemibrain types and IDs to DynamoDB
Keyword arguments:
codex_id: Codex ID
Returns:
None
"""
LOGGER.debug(codex_id)
hbatch = []
for htype in tqdm(CODEX_TYPE, desc='Processing Codex types'):
payload = {'itemType': 'searchString',
'searchKey': htype.lower(),
'filterKey': htype.lower(),
'keyType': 'neuronType',
'name': htype}
payload['codexIDs'] = []
for cid in CODEX_TYPE[htype]:
payload['codexIDs'].append({cid: True})
add_body_ids(htype, payload)
hbatch.append(payload)
if not ARG.WRITE:
return
tbl = DB['dynamo'].Table(ARG.TABLE)
# Hemibrain types
LOGGER.info(f"Batch writing {len(hbatch):,} hemibrain types to {ARG.TABLE}")
with tbl.batch_writer() as writer:
for item in tqdm(hbatch, desc="Writing Codex types"):
if ARG.THROTTLE and (not COUNT["hinsertions"] % ARG.THROTTLE):
time.sleep(2)
writer.put_item(Item=item)
COUNT["hinsertions"] += 1
# Codex IDs
LOGGER.info(f"Batch writing {len(codex_id):,} Codex IDs to {ARG.TABLE}")
with tbl.batch_writer() as writer:
for item in tqdm(codex_id, desc="Writing Codex IDs"):
if ARG.THROTTLE and (not COUNT["iinsertions"] % ARG.THROTTLE):
time.sleep(2)
writer.put_item(Item=item)
COUNT["iinsertions"] += 1


def create_dynamo_payload(entry):
""" Create the payload for a Codex ID insertion into DynamoDB
Keyword arguments:
entry: contains Codex root ID and neuron type
Returns:
payload
"""
# Codex ID
cid = f"c{entry[0]}"
payload = {'itemType': 'searchString',
'searchKey': cid,
'filterKey': cid,
'keyType': 'codexID',
'name': cid}
# Hemibrain type
if entry[1]:
for htype in entry[1].split(","):
if htype not in CODEX_TYPE:
CODEX_TYPE[htype] = {}
CODEX_TYPE[htype][cid] = True
# cdm? ppp?
return payload


def process_entries(entries, dsid):
""" Build an array of records to insert into jacs.emBody
Keyword arguments:
Expand All @@ -164,15 +308,20 @@ def process_entries(entries, dsid):
payload
"""
dtm = datetime.now()
LOGGER.info("Found %d entries", len(entries))
LOGGER.info(f"Found {len(entries):,} entries")
coll = DB['jacs'].emBody
docs = []
for entry in tqdm(entries, desc='Building insert array'):
codex_id = []
for entry in tqdm(entries, desc='Building insert list'):
docs.append(create_body_payload(entry, dtm, dsid))
time.sleep(.00001)
LOGGER.info("Writing %d records to emBody", len(docs))
if ARG.WRITE:
codex_id.append(create_dynamo_payload(entry))
if ACTION['MongoDB'] and ARG.WRITE:
print(f"Writing {len(docs):,} records to emBody")
coll.insert_many(docs)
print(f"Found {len(codex_id):,} Codex IDs")
print(f"Found {len(CODEX_TYPE):,} Codex hemibrain types")
write_dynamodb(codex_id)
print(COUNT)


def process_codex():
Expand All @@ -184,11 +333,9 @@ def process_codex():
"""
coll = DB['jacs'].emDataSet
result = coll.count_documents({'name': 'codex', 'version': ARG.VERSION})
if result:
if result and ACTION['MongoDB']:
LOGGER.warning('Dataset codex:%s already exists', ARG.VERSION)
terminate_program()
#result = coll.find({'name': 'codex', 'version': ARG.VERSION})
#dsid = result[0]['_id']
else:
dsid = insert_dataset(coll)
entries = []
Expand All @@ -198,20 +345,27 @@ def process_codex():
continue
entries.append([row[0], row[6]])
process_entries(entries, dsid)
print(f"MongoDB Codex ID updates: {COUNT['minsertions']}")
print(f"DynamoDB Codex ID updates: {COUNT['iinsertions']}")
print(f"DynamoDB Codex type updates: {COUNT['hinsertions']}")
print(f"Previously existing types: {COUNT['found']}")

# -----------------------------------------------------------------------------

if __name__ == '__main__':
PARSER = argparse.ArgumentParser(
description="Upload data from Codex")
PARSER.add_argument('--file', dest='FILE', action='store',
default='classification.csv', help='Codex file')
required=True, help='Codex file')
PARSER.add_argument('--version', dest='VERSION', action='store',
required=True, help='Codex version (snapshot)')
PARSER.add_argument('--manifold', dest='MANIFOLD', action='store',
default='dev', choices=['dev', 'prod'], help='MongoDB manifold')
PARSER.add_argument('--table', dest='TABLE', help='DynamoDB table')
PARSER.add_argument('--write', dest='WRITE', action='store_true',
default=False, help='Write to MongoDB')
PARSER.add_argument('--throttle', type=int, dest='THROTTLE',
default=0, help='DynamoDB batch write throttle (# items)')
PARSER.add_argument('--verbose', dest='VERBOSE', action='store_true',
default=False, help='Flag, Chatty')
PARSER.add_argument('--debug', dest='DEBUG', action='store_true',
Expand Down

0 comments on commit 8f72042

Please sign in to comment.