Skip to content
This repository has been archived by the owner on Jun 1, 2022. It is now read-only.

Optimize /api/importSourceLocations #583

Open
simonw opened this issue May 17, 2021 · 14 comments
Open

Optimize /api/importSourceLocations #583

simonw opened this issue May 17, 2021 · 14 comments
Labels
big-impact importers Tools that import data into VIAL performance Site and API performance optimizations

Comments

@simonw
Copy link
Collaborator

simonw commented May 17, 2021

It gets hit with 160,000 writes (batched 500 at a time) every six hours and it hasn't had any optimization work done at all.

Discord: https://discord.com/channels/799147121357881364/824443781990187008/843889279763218452 and https://discord.com/channels/799147121357881364/813861006718926848/843934864397828156

@simonw simonw added performance Site and API performance optimizations importers Tools that import data into VIAL labels May 17, 2021
@simonw
Copy link
Collaborator Author

simonw commented May 17, 2021

One idea is to have our clients calculate hashes for every imported item, store those in VIAL, and provide a supported mechanism for those clients to maintain their own cache of those hashes - so they can avoid sending us data that we already have.

@simonw
Copy link
Collaborator Author

simonw commented May 17, 2021

There is definitely low-hanging fruit within the endpoint itself though:

vial/vaccinate/api/views.py

Lines 237 to 324 in c33b0d0

@csrf_exempt
@log_api_requests
@require_api_key
@beeline.traced(name="import_source_locations")
def import_source_locations(request, on_request_logged):
if request.method != "POST":
return JsonResponse({"error": "POST required"}, status=400)
try:
import_run = ImportRun.objects.get(id=request.GET.get("import_run_id", "0"))
except ImportRun.DoesNotExist:
return JsonResponse({"error": "?import_run_id=X is required"}, status=400)
try:
post_data = request.body.decode("utf-8")
lines = post_data.split("\n")
json_records = [orjson.loads(line) for line in lines if line.strip()]
except ValueError as e:
return JsonResponse({"error": "JSON error: " + str(e)}, status=400)
# Validate those JSON records
errors = []
records = []
for json_record in json_records:
try:
record = ImportSourceLocation(**json_record)
if (
record.import_json.address is not None
and record.import_json.address.state is None
):
errors.append((json_record, "no state specified on address"))
records.append(record)
except ValidationError as e:
errors.append((json_record, e.errors()))
if errors:
return JsonResponse({"errors": errors}, status=400)
# All are valid, record them
created = []
updated = []
for json_record, record in zip(json_records, records):
matched_location = None
if record.match is not None and record.match.action == "existing":
matched_location = Location.objects.get(public_id=record.match.id)
defaults = {
"source_name": record.source_name,
"name": record.name,
"latitude": record.latitude,
"longitude": record.longitude,
"import_json": json_record["import_json"],
"import_run": import_run,
"last_imported_at": timezone.now(),
}
source_location, was_created = SourceLocation.objects.update_or_create(
source_uid=record.source_uid, defaults=defaults
)
safe_to_match = was_created or source_location.matched_location is None
if safe_to_match and matched_location is not None:
source_location.matched_location = matched_location
source_location.save()
import_json = record.import_json
links = list(import_json.links) if import_json.links is not None else []
# Always use the (source, id) as a concordance
links.append(
Link(authority=import_json.source.source, id=import_json.source.id)
)
for link in links:
identifier, _ = ConcordanceIdentifier.objects.get_or_create(
authority=link.authority, identifier=link.id
)
identifier.source_locations.add(source_location)
if safe_to_match:
if record.match is not None and record.match.action == "new":
matched_location = build_location_from_source_location(source_location)
if matched_location is not None:
new_concordances = source_location.concordances.difference(
matched_location.concordances.all()
)
matched_location.concordances.add(*new_concordances)
if was_created:
created.append(source_location.pk)
else:
updated.append(source_location.pk)
return JsonResponse({"created": created, "updated": updated})

@simonw
Copy link
Collaborator Author

simonw commented May 17, 2021

@bryanculbertson
Copy link

I am going to test out doing content hashes within ingestion by downloading all loaded source locations, calculating a content hash e.g. sorted keys with no timestamp fields, and then skipping incoming locations with the same hash value.

If this plan works than I would like a place to store these content hashes on source_location so I don't need to compute then again. But I think I can get a majority of the speed up without VIAL needing to store the content hash.

@shashank025
Copy link
Contributor

Suggestion: instead of creating content hashes in the application layer, add server logic to leverage Postgres' ETL capability, which basically amounts to:

-- Step 1: temp table with all uploaded data
-- Modify /api/importSourceLocations to insert
-- to this temp table in a streaming fashion
-- (very low memory overhead)
create temp table tmp_new_data as
    ...;

begin;  -- start transactions as late as possible

-- Step 2: update rows that have changed
update destination_table d
   set
    col_a = t.col_a,
    col_b = t.col_b,
    ...
  from tmp_new_data t
 where
    t.pkey_col_1 = d.pkey_col1   
    and ...
    and (t.col_a, t.col_b, ...) IS DISTINCT FROM (d.col_a, d.col_b, ...);

-- Step 3: insert new rows
insert into destination_table d
select * from tmp_new_data t
 where not exists (
    select 1 from tmp_new_data t
     where t.pkey_col_1 = d.pkey_col1 ...
);

commit;

-- Step 4: clean up
drop table tmp_new_data;

Sorry for "drive-by" design, but I'm happy to collaborate more on this. I've written a bunch of ETL in past lives, and this seems like basically that.

@shashank025
Copy link
Contributor

shashank025 commented May 18, 2021

The idea is that Postgres (and RBMSes in general) do(es) really well when operated in bulk fashion (as opposed to inserting/updating one row at a time), and the above suggestion does exactly that.

@bryanculbertson
Copy link

bryanculbertson commented May 18, 2021

@shashank025 The idea of bulk loading to a temp table and then selectively copying to the main table is a great idea. This would speed up bulk loading in general. We would probably still want content hashes though to avoid having to do multiple comparisons for each field, especially since we would need to do those comparisons to fields inside of a JSON field.

If VIAL authors have time they can look into bypassing the ORM to try this temp table load technique that would be great. When i have done this in the past I use psycopg2's copy_expert to stream data directly from a fileobj to temp table which had a low memory overhead, but that is may be overkill for the <10MB files we have.

In the meantime, here is how it looks when we calculate content hashes in ingestion, I think this solves the broken load problem well enough for now: CAVaccineInventory/vaccine-feed-ingest#661

@simonw
Copy link
Collaborator Author

simonw commented May 18, 2021

The problem with dropping down for raw SQL to this is that we then need to keep that SQL in sync with further changes we make to the database models - operating via the ORM protects us from having to think about that.

Keeping in sync isn't impossible though: with comprehensive automated tests around this I'm confident we could at least cause easily fixed test failures should the raw SQL optimization be broken by any future changes.

It's a complex enough solution though that I'd prefer to avoid it unless we urgently need it.

@bryanculbertson
Copy link

bryanculbertson commented May 19, 2021

@simonw Ingest implemented content hashs, and that allowed the load to finish successfully in 4.5 hours. This is crucially under the 5 hour deadline!

However, this is still too close to the deadline for my comfort, so I think we need further optimization to importSourceLocation. Right now it takes about 2 minutes to load 500 source locations which is 240ms per row. Perhaps using the ORM bulk_create would help speed this up while still keeping everything in ORM land? I know this works well in SQLAlchemy, but I don't know the Django ORM.

Another option to increase performance is you could skip Pydantic validation of NormalizedLocation and trust that I am handing you a valid location 😬

@simonw
Copy link
Collaborator Author

simonw commented May 19, 2021

OK, sounds like we do need to make some improvements here. I'll look for some low hanging fruit.

@simonw
Copy link
Collaborator Author

simonw commented May 19, 2021

Recent /api/importSourceLocations trace: https://ui.honeycomb.io/vaccinateca/datasets/vial-production/result/nLWWNjPCtFR/trace/eQxie317P1N

vial-production_Trace___Honeycomb_io

Looks to me like the main problem here is we're executing hundreds (if not thousands) of tiny 3ms queries.

@bryanculbertson
Copy link

The "2 minutes for 500 locations" includes encoding and transfer time on ingestion side. I can look into optimizing that like using orjson

@bryanculbertson
Copy link

Ah, I also misinterpreted the logs! This means we are loading 2,500 records (5*500) in about 90 secs, so that is only 35 ms per record. That is much more reasonable.

2021-05-19T17:52:54+0000 INFO:vial.py:Submitted 5 batches of up to 500 records to VIAL.
2021-05-19T17:54:22+0000 INFO:vial.py:Submitted 10 batches of up to 500 records to VIAL.
2021-05-19T17:55:50+0000 INFO:vial.py:Submitted 15 batches of up to 500 records to VIAL.
2021-05-19T17:57:18+0000 INFO:vial.py:Submitted 20 batches of up to 500 records to VIAL.
2021-05-19T17:58:42+0000 INFO:vial.py:Submitted 25 batches of up to 500 records to VIAL.
2021-05-19T18:00:07+0000 INFO:vial.py:Submitted 30 batches of up to 500 records to VIAL.

@rhkeeler
Copy link

Is this duplicative of #646 ? Do we need them both open? @simonw

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
big-impact importers Tools that import data into VIAL performance Site and API performance optimizations
Projects
None yet
Development

No branches or pull requests

4 participants