diff --git a/ckanext/geodatagov/cli.py b/ckanext/geodatagov/cli.py index 58de08de..fabc0e16 100644 --- a/ckanext/geodatagov/cli.py +++ b/ckanext/geodatagov/cli.py @@ -306,6 +306,41 @@ def index_for(_type): return NoopSearchIndex() +def get_all_entity_ids_date(): + """ + Return a list of the IDs and metadata_modified of all indexed packages. + + Solr result is to be processed in batches at 10000 + pagination to avoid out-of-memory. + """ + query = "*:*" + fq = '+site_id:"%s" ' % config.get("ckan.site_id") + fq += "+state:active " + fq += "+type:dataset " + + ret_all = [] + + start = 0 + page_size = 10000 + conn = make_connection() + + log.info(f"Now loading SOLR packages using page size {page_size}...") + + while True: + log.info(f"loading packages starting from {start}") + data = conn.search(query, fq=fq, start=start, rows=page_size, fl="id, metadata_modified") + + if not data: + break + + for r in data.docs: + ret_all.append((r.get("id"), r.get("metadata_modified"))) + + start += page_size + + return ret_all + + def get_all_entity_ids_date_hoid(): """ Return a list of the IDs and metadata_modified of all indexed packages. @@ -484,6 +519,87 @@ def db_solr_sync(dryrun, cleanup_solr, update_solr): print(*active_package_id_wo_ho, sep='\n') +@geodatagov.command() +@click.option("--dryrun", is_flag=True, help="inspect what will be updated") +@click.option( + "--cleanup_solr", is_flag=True, help="Only remove orphaned entries in Solr" +) +@click.option( + "--update_solr", + is_flag=True, + help=( + "(Update solr entries with new data from DB) OR (Add DB data to Solr that is missing)" + ), +) +def db_solr_sync_next(dryrun, cleanup_solr, update_solr): + """db solr sync next""" + if dryrun: + log.info("Starting dryrun to update index.") + + package_index = index_for(model.Package) + + # get active packages from DB + active_package = { + (r[0], r[1].replace(microsecond=0)) + for r in model.Session.query( + model.Package.id, + model.Package.metadata_modified + ) + .filter( + model.Package.type == "dataset", + model.Package.state == "active" + ) + .all() + } + + log.info(f"total {len(active_package)} DB active_package") + + # get indexed packages from solr + indexed_package = set(get_all_entity_ids_date()) + log.info(f"total {len(indexed_package)} solr indexed_package") + + solr_package = indexed_package - active_package + db_package = active_package - indexed_package + + work_list = {} + for id, *_ in solr_package: + work_list[id] = "solr" + for id, *_ in db_package: + if id in work_list: + work_list[id] = "solr-db" + else: + work_list[id] = "db" + + both = cleanup_solr == update_solr + set_cleanup = {i if work_list[i] == "solr" else None for i in work_list} - {None} + set_update = work_list.keys() - set_cleanup + log.info(f"{len(set_cleanup)} packages need to be removed from Solr") + log.info(f"{len(set_update)} packages need to be updated/added to Solr") + + if not dryrun and set_cleanup and (cleanup_solr or both): + log.info("Deleting indexes") + delete_packages(set_cleanup) + package_index.commit() + log.info("Finished cleaning solr entries.") + + if not dryrun and set_update and (update_solr or both): + log.info("Rebuilding indexes") + try: + rebuild(package_ids=set_update, defer_commit=True) + except Exception as e: + log.error("Error while rebuild index %s: %s" % (id, repr(e))) + package_index.commit() + log.info("Finished updating solr entries.") + log.info("Here is the first a few dataset ids that are rebuilt:") + count = 0 + max = 10 + for id in set_update: + count = count + 1 + if count > max: + break + log.info(f"{count}: {id}") + + @geodatagov.command() def check_stuck_jobs(): """check stuck harvest jobs"""