This repository has been archived by the owner on Jun 1, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Optimize away N+1 queries in v0preview, refs #558
- Loading branch information
Showing
1 changed file
with
73 additions
and
41 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
import itertools | ||
import json | ||
from collections import namedtuple | ||
from typing import Dict | ||
|
@@ -9,11 +10,23 @@ | |
|
||
OutputFormat = namedtuple( | ||
"OutputFormat", | ||
("prepare_queryset", "start", "transform", "separator", "end", "content_type"), | ||
# transform_bach runs once against a batch that has been prepared by calling transform on each item | ||
( | ||
"prepare_queryset", | ||
"start", | ||
"transform", | ||
"transform_batch", | ||
"serialize", | ||
"separator", | ||
"end", | ||
"content_type", | ||
), | ||
) | ||
|
||
|
||
def build_stream(qs, stream_qs, formatter, beeline_trace_name): | ||
def build_stream( | ||
qs, stream_qs, formatter, beeline_trace_name, transform_batch_size=100 | ||
): | ||
trace_id = None | ||
parent_id = None | ||
bl = beeline.get_beeline() | ||
|
@@ -28,11 +41,15 @@ def stream(): | |
else: | ||
yield formatter.start | ||
started = False | ||
for location in stream_qs: | ||
if started and formatter.separator: | ||
yield formatter.separator | ||
started = True | ||
yield formatter.transform(location) | ||
for record_batch in chunks(stream_qs, transform_batch_size): | ||
records = formatter.transform_batch( | ||
[formatter.transform(record) for record in record_batch] | ||
) | ||
for record in records: | ||
if started and formatter.separator: | ||
yield formatter.separator | ||
started = True | ||
yield formatter.serialize(record) | ||
yield formatter.end(qs) | ||
|
||
return stream | ||
|
@@ -118,41 +135,42 @@ def location_geojson(location: Location) -> Dict[str, object]: | |
} | ||
|
||
|
||
def make_location_v0_json(expansion): | ||
def location_v0_json(location: Location) -> Dict[str, object]: | ||
record = { | ||
"id": location.public_id, | ||
"name": location.name, | ||
"state": location.state.abbreviation, | ||
"latitude": float(location.latitude), | ||
"longitude": float(location.longitude), | ||
"location_type": location.location_type.name, | ||
"phone_number": location.phone_number, | ||
"vaccines_offered": None, | ||
"full_address": location.full_address, | ||
"city": location.city, | ||
"county": location.county.name if location.county else None, | ||
"zip_code": location.zip_code, | ||
"hours": {"unstructured": location.hours}, | ||
"website": location.website, | ||
"concordances": [str(c) for c in location.concordances.all()], | ||
"last_verified_by_vts": location.dn_latest_non_skip_report.created_at.isoformat() | ||
if location.dn_latest_non_skip_report | ||
else None, | ||
"vts_url": "https://www.vaccinatethestates.com/?lng={}&lat={}#{}".format( | ||
location.longitude, location.latitude, location.public_id | ||
), | ||
} | ||
record["vaccines_offered"] = expansion.expand([record])[record["id"]] or [] | ||
return record | ||
|
||
return location_v0_json | ||
def location_v0_json(location: Location) -> Dict[str, object]: | ||
return { | ||
"id": location.public_id, | ||
"name": location.name, | ||
"state": location.state.abbreviation, | ||
"latitude": float(location.latitude), | ||
"longitude": float(location.longitude), | ||
"location_type": location.location_type.name, | ||
"phone_number": location.phone_number, | ||
"full_address": location.full_address, | ||
"city": location.city, | ||
"county": location.county.name if location.county else None, | ||
"zip_code": location.zip_code, | ||
"hours": {"unstructured": location.hours}, | ||
"website": location.website, | ||
"vaccines_offered": [], | ||
"concordances": [str(c) for c in location.concordances.all()], | ||
"last_verified_by_vts": location.dn_latest_non_skip_report.created_at.isoformat() | ||
if location.dn_latest_non_skip_report | ||
else None, | ||
"vts_url": "https://www.vaccinatethestates.com/?lng={}&lat={}#{}".format( | ||
location.longitude, location.latitude, location.public_id | ||
), | ||
} | ||
|
||
|
||
def location_formats(preload_vaccinefinder=False): | ||
formats = make_formats(location_json, location_geojson) | ||
expansion = VaccineFinderInventoryExpansion(preload_vaccinefinder) | ||
location_v0_json = make_location_v0_json(expansion) | ||
|
||
def transform_batch(batch): | ||
lookups = expansion.expand(batch) | ||
for record in batch: | ||
record["vaccines_offered"] = lookups.get(record["id"]) or [] | ||
return batch | ||
|
||
formats["v0preview"] = OutputFormat( | ||
prepare_queryset=lambda qs: qs.select_related("dn_latest_non_skip_report"), | ||
start=( | ||
|
@@ -164,7 +182,9 @@ def location_formats(preload_vaccinefinder=False): | |
'"contact": {"partnersEmail": "[email protected]"}},' | ||
'"content": [' | ||
), | ||
transform=lambda l: json.dumps(location_v0_json(l)), | ||
transform=lambda l: location_v0_json(l), | ||
transform_batch=transform_batch, | ||
serialize=json.dumps, | ||
separator=",", | ||
end=lambda qs: "]}", | ||
content_type="application/json", | ||
|
@@ -177,25 +197,37 @@ def make_formats(json_convert, geojson_convert): | |
"json": OutputFormat( | ||
prepare_queryset=lambda qs: qs, | ||
start='{"results": [', | ||
transform=lambda l: json.dumps(json_convert(l)), | ||
transform=lambda l: json_convert(l), | ||
transform_batch=lambda batch: batch, | ||
serialize=json.dumps, | ||
separator=",", | ||
end=lambda qs: '], "total": TOTAL}'.replace("TOTAL", str(qs.count())), | ||
content_type="application/json", | ||
), | ||
"geojson": OutputFormat( | ||
prepare_queryset=lambda qs: qs, | ||
start='{"type": "FeatureCollection", "features": [', | ||
transform=lambda l: json.dumps(geojson_convert(l)), | ||
transform=lambda l: geojson_convert(l), | ||
transform_batch=lambda batch: batch, | ||
serialize=json.dumps, | ||
separator=",", | ||
end=lambda qs: "]}", | ||
content_type="application/json", | ||
), | ||
"nlgeojson": OutputFormat( | ||
prepare_queryset=lambda qs: qs, | ||
start="", | ||
transform=lambda l: json.dumps(geojson_convert(l)), | ||
transform=lambda l: geojson_convert(l), | ||
transform_batch=lambda batch: batch, | ||
serialize=json.dumps, | ||
separator="\n", | ||
end=lambda qs: "", | ||
content_type="text/plain", | ||
), | ||
} | ||
|
||
|
||
def chunks(sequence, size): | ||
iterator = iter(sequence) | ||
for item in iterator: | ||
yield itertools.chain([item], itertools.islice(iterator, size - 1)) |