Skip to content

Commit

Permalink
change indexing strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
capjamesg committed Aug 19, 2024
1 parent eb525c6 commit 7bd7b9f
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 59 deletions.
21 changes: 12 additions & 9 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
from nosql import NoSQL
from nosql.index import GSI_INDEX_STRATEGIES
import json

with open("tests/fixtures/documents.json") as f:
documents = json.load(f)

index = NoSQL(
index_by=["title", "lyric"],
)
index = NoSQL()

for document in documents: # * 200000:
for document in documents * 200000:
index.add(document)

query = {
"query": {},
"limit": 2
"query": {
"and": [
{"title": {"starts_with": "tolerate"}},
{"lyric": {"contains": "my mural"}},
]
},
"limit": 2,
"sort_by": "title",
}

index.create_gsi("title")
index.create_gsi("lyric")
index.create_gsi("title", strategy=GSI_INDEX_STRATEGIES.PREFIX)

result = index.search(query)

# print(result)

print("Showing search results for query: ", query)
Expand Down
120 changes: 71 additions & 49 deletions nosql/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,42 @@
import uuid
from collections import defaultdict, OrderedDict
from typing import Dict, List
from tqdm import tqdm
from enum import Enum

import pybmoore

import pygtrie

KEYW0RDS = ["and", "or"]

METHODS = {"and": set.intersection, "or": set.union}

class GSI_INDEX_STRATEGIES(Enum):
PREFIX = "prefix"
CONTAINS = "contains"
FLAT = "flat"

class NoSQL:
def __init__(self, index_by: List[List[str]] | List[str]) -> None:
self.global_index = OrderedDict()
def __init__(self) -> None:
self.global_index = {}
self.uuids_to_position_in_global_index = {}
self.gsis = {}
self.index_by = index_by

def _create_reverse_index(self, document: str) -> Dict[str, List[int]]:
"""
Accepts a document and returns a reverse index of the document in the form:
{word: word_count}
Where `word` is every word in the document and `word_count` is the number of times it appears.
"""
index = defaultdict(int)

for word in document.split():
index[word] += 1

return index

def add(self, document: list, partition_key=None) -> Dict[str, dict]:
"""
Expand All @@ -42,7 +63,7 @@ def add(self, document: list, partition_key=None) -> Dict[str, dict]:

def update(self, uuid: str, document: dict) -> Dict[str, dict]:
"""
Accepts a UUID and a document and updates the document associated with that key.
Accepts a UUID and a Tdocument and updates the document associated with that key.
"""
if uuid not in self.uuids_to_position_in_global_index:
return {"error": "Document not found"}
Expand All @@ -53,25 +74,14 @@ def update(self, uuid: str, document: dict) -> Dict[str, dict]:

return document

def remove(self, uuid: str) -> Dict[str, dict]:
def remove(self, uuid: str) -> None:
"""
Accepts a UUID and removes the document associated with that key.
"""

if uuid not in self.uuids_to_position_in_global_index:
return {"error": "Document not found"}

position_in_global_index = self.uuids_to_position_in_global_index[uuid]

print("Position in global index: ", position_in_global_index)

document = self.global_index.popitem(position_in_global_index)
del self.global_index[uuid]

print("Document removed: ", document)

return document

def create_gsi(self, index_by: List[str] | str) -> Dict[str, str]:
def create_gsi(self, index_by: str | List[str], strategy: GSI_INDEX_STRATEGIES = "flat", prefix_limit = 20) -> Dict[str, dict]:
"""
The raw index returned by create_index is not optimized for querying. Instead, it is designed as
a single source of truth for all data.
Expand All @@ -93,22 +103,28 @@ def create_gsi(self, index_by: List[str] | str) -> Dict[str, str]:
the GSI needs to be updated (keys that no longer satisfy the GSI criteria will need to be
removed, new keys that satisfy the GSI criteria will need to be added, and deleted keys will need to
be removed).
A GSI has three types:
- Prefix
- Contains (reverse index)
- Flat
"""
gsi = {}

for _, document in self.global_index.items():
if isinstance(index_by, list):
join_key = "".join([document[key] for key in index_by])
else:
join_key = document.get(index_by)
if strategy == GSI_INDEX_STRATEGIES.PREFIX:
gsi = pygtrie.CharTrie()

if join_key:
gsi[join_key] = document["uuid"]
for item in self.global_index.values():
gsi[item.get(index_by)[:prefix_limit]] = item.get("uuid")
elif strategy == GSI_INDEX_STRATEGIES.CONTAINS:
gsi = self._create_reverse_index(index_by)
elif strategy == GSI_INDEX_STRATEGIES.FLAT:
gsi = defaultdict(list)

if isinstance(index_by, list):
self.gsis["".join(index_by)] = gsi
else:
self.gsis[index_by] = gsi
for item in self.global_index.values():
gsi[item.get(index_by)].append(item.get("uuid"))

self.gsis[index_by] = {"gsi": gsi, "strategy": strategy}

return gsi

Expand Down Expand Up @@ -217,31 +233,37 @@ def _run(self, query: dict, query_field: str) -> List[str]:

if not self.gsis.get(query_field):
# print("GSI does not exist on query field ", query_field, ". Creating GSI now.")
self.create_gsi(query_field)

for value in self.gsis[query_field].values():
doc_entry = self.global_index.get(value)
self.create_gsi(query_field, GSI_INDEX_STRATEGIES.FLAT)

if doc_entry is None or doc_entry.get(query_field) is None:
continue
gsi_type = self.gsis[query_field]["strategy"]
gsi = self.gsis[query_field]["gsi"]

if query_type == "equals" and query_term == doc_entry[query_field]:
matching_documents.append(value)
if gsi_type != GSI_INDEX_STRATEGIES.FLAT:
if query_type == "starts_with" and gsi_type == GSI_INDEX_STRATEGIES.PREFIX:
matches = gsi.keys(prefix=query_term)
matching_documents.extend([gsi[match] for match in matches])

matches = pybmoore.search(query_term, doc_entry[query_field])
if query_type == "contains" and gsi_type == GSI_INDEX_STRATEGIES.CONTAINS:
for word in query_term.split():
if value.get(word) is not None:
matching_documents.extend(value[word])
break
else:
for sort_key, value in self.gsis[query_field]["gsi"].items():
matches = pybmoore.search(query_term, sort_key)

if len(matches) == 0:
continue
if len(matches) == 0:
continue

if query_type == "contains":
matching_documents.append(value)
if query_type == "contains":
matching_documents.extend(value)

if query_type == "starts_with":
for match in matches:
# this indicates that the query term is a prefix of the sort key
if match[0] == 0:
matching_documents.append(value)
break
if query_type == "starts_with":
for match in matches:
# this indicates that the query term is a prefix of the sort key
if match[0] == 0:
matching_documents.extend(value)
break

end_time = time.time()

Expand Down
2 changes: 1 addition & 1 deletion tests/stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

times = []

for i in tqdm(range(100000)):
for i in tqdm(range(100)):
result = index.search(query)
times.append(float(result["query_time"]))

Expand Down

0 comments on commit 7bd7b9f

Please sign in to comment.