Skip to content

Commit

Permalink
db-solr-sync-next
Browse files Browse the repository at this point in the history
  • Loading branch information
FuhuXia committed Aug 22, 2024
1 parent ea5ba64 commit 752bf88
Showing 1 changed file with 116 additions and 0 deletions.
116 changes: 116 additions & 0 deletions ckanext/geodatagov/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,41 @@ def index_for(_type):
return NoopSearchIndex()


def get_all_entity_ids_date():
"""
Return a list of the IDs and metadata_modified of all indexed packages.
Solr result is to be processed in batches at 10000
pagination to avoid out-of-memory.
"""
query = "*:*"
fq = '+site_id:"%s" ' % config.get("ckan.site_id")
fq += "+state:active "
fq += "+type:dataset "

ret_all = []

start = 0
page_size = 10000
conn = make_connection()

log.info(f"Now loading SOLR packages using page size {page_size}...")

while True:
log.info(f"loading packages starting from {start}")
data = conn.search(query, fq=fq, start=start, rows=page_size, fl="id, metadata_modified")

if not data:
break

for r in data.docs:
ret_all.append((r.get("id"), r.get("metadata_modified")))

start += page_size

return ret_all


def get_all_entity_ids_date_hoid():
"""
Return a list of the IDs and metadata_modified of all indexed packages.
Expand Down Expand Up @@ -484,6 +519,87 @@ def db_solr_sync(dryrun, cleanup_solr, update_solr):
print(*active_package_id_wo_ho, sep='\n')


@geodatagov.command()
@click.option("--dryrun", is_flag=True, help="inspect what will be updated")
@click.option(
"--cleanup_solr", is_flag=True, help="Only remove orphaned entries in Solr"
)
@click.option(
"--update_solr",
is_flag=True,
help=(
"(Update solr entries with new data from DB) OR (Add DB data to Solr that is missing)"
),
)
def db_solr_sync_next(dryrun, cleanup_solr, update_solr):
"""db solr sync next"""
if dryrun:
log.info("Starting dryrun to update index.")

package_index = index_for(model.Package)

# get active packages from DB
active_package = {
(r[0], r[1].replace(microsecond=0))
for r in model.Session.query(
model.Package.id,
model.Package.metadata_modified
)
.filter(
model.Package.type == "dataset",
model.Package.state == "active"
)
.all()
}

log.info(f"total {len(active_package)} DB active_package")

# get indexed packages from solr
indexed_package = set(get_all_entity_ids_date())
log.info(f"total {len(indexed_package)} solr indexed_package")

solr_package = indexed_package - active_package
db_package = active_package - indexed_package

work_list = {}
for id, *_ in solr_package:
work_list[id] = "solr"
for id, *_ in db_package:
if id in work_list:
work_list[id] = "solr-db"
else:
work_list[id] = "db"

both = cleanup_solr == update_solr
set_cleanup = {i if work_list[i] == "solr" else None for i in work_list} - {None}
set_update = work_list.keys() - set_cleanup
log.info(f"{len(set_cleanup)} packages need to be removed from Solr")
log.info(f"{len(set_update)} packages need to be updated/added to Solr")

if not dryrun and set_cleanup and (cleanup_solr or both):
log.info("Deleting indexes")
delete_packages(set_cleanup)
package_index.commit()
log.info("Finished cleaning solr entries.")

if not dryrun and set_update and (update_solr or both):
log.info("Rebuilding indexes")
try:
rebuild(package_ids=set_update, defer_commit=True)
except Exception as e:
log.error("Error while rebuild index %s: %s" % (id, repr(e)))
package_index.commit()
log.info("Finished updating solr entries.")
log.info("Here is the first a few dataset ids that are rebuilt:")
count = 0
max = 10
for id in set_update:
count = count + 1
if count > max:
break
log.info(f"{count}: {id}")


@geodatagov.command()
def check_stuck_jobs():
"""check stuck harvest jobs"""
Expand Down

0 comments on commit 752bf88

Please sign in to comment.