1
1
# from search_redirect import SearchSettings
2
2
import logging
3
3
import uuid
4
- from functools import reduce
5
4
from memas .interface .corpus import Corpus , CorpusFactory
6
5
from memas .interface .corpus import Citation
7
- from memas .interface .storage_driver import DocumentEntity
6
+ from memas .interface .storage_driver import CorpusDocumentMetadataStore , CorpusDocumentStore , CorpusVectorStore , DocumentEntity
8
7
from memas .interface .exceptions import SentenceLengthOverflowException
9
- from memas .context_manager import ctx
10
8
from memas .text_parsing .text_parsers import segment_document
11
9
from memas .corpus .corpus_searching import normalize_and_combine
12
10
17
15
18
16
class BasicCorpus (Corpus ):
19
17
20
- def __init__ (self , corpus_id : uuid .UUID , corpus_name : str ):
18
+ def __init__ (self , corpus_id : uuid .UUID , corpus_name : str , metadata_store : CorpusDocumentMetadataStore , doc_store : CorpusDocumentStore , vec_store : CorpusVectorStore ):
21
19
super ().__init__ (corpus_id , corpus_name )
20
+ self .metadata_store : CorpusDocumentMetadataStore = metadata_store
21
+ self .doc_store : CorpusDocumentStore = doc_store
22
+ self .vec_store : CorpusVectorStore = vec_store
22
23
23
24
"""
24
25
The function stores a document in the elastic search DB, vecDB, and doc MetaData.
25
26
Returns True on Success, False on Failure
26
27
"""
27
28
28
- def store_and_index (self , document : str , document_name : str , citation : Citation ) -> bool :
29
+ def store_and_index (self , document : str , citation : Citation ) -> bool :
29
30
_log .debug (f"Corpus storing and indexing [corpus_id={ self .corpus_id } ]" )
30
31
31
32
doc_id = uuid .uuid4 ()
32
- doc_entity = DocumentEntity (self .corpus_id , doc_id , document_name , document )
33
+ doc_entity = DocumentEntity (self .corpus_id , doc_id , citation . document_name , document )
33
34
34
35
document_chunks = segment_document (document , MAX_SEGMENT_LENGTH )
35
36
36
37
# TODO : Need to investigate how to undo when failures on partial insert
37
- meta_save = ctx . corpus_metadata .insert_document_metadata (
38
- self .corpus_id , doc_id , len (document_chunks ), document_name , citation )
38
+ meta_save = self . metadata_store .insert_document_metadata (
39
+ self .corpus_id , doc_id , len (document_chunks ), citation )
39
40
40
- vec_save = ctx . corpus_vec .save_documents ([doc_entity ])
41
+ vec_save = self . vec_store .save_documents ([doc_entity ])
41
42
42
43
# Divide longer documents for document store
43
44
chunk_num = 0
@@ -46,11 +47,11 @@ def store_and_index(self, document: str, document_name: str, citation: Citation)
46
47
# Create the new IDs for the document chunk combo
47
48
chunk_id = doc_id .hex + '{:032b}' .format (chunk_num )
48
49
chunk_num = chunk_num + 1
49
- doc_chunk_entity = DocumentEntity (self .corpus_id , doc_id , document_name , chunk )
50
+ doc_chunk_entity = DocumentEntity (self .corpus_id , doc_id , citation . document_name , chunk )
50
51
chunk_id_entity_pairs .append ((chunk_id , doc_chunk_entity ))
51
52
52
53
# Insert all chunks of document at once
53
- doc_save = ctx . corpus_doc .save_documents (id_doc_pairs = chunk_id_entity_pairs )
54
+ doc_save = self . doc_store .save_documents (id_doc_pairs = chunk_id_entity_pairs )
54
55
55
56
return meta_save and vec_save and doc_save
56
57
@@ -67,24 +68,24 @@ def search(self, clue: str) -> list[tuple[float, str, Citation]]:
67
68
vector_search_count : int = 10
68
69
69
70
doc_store_results : list [tuple [float , str , Citation ]] = []
70
- temp_res = ctx . corpus_doc .search_corpora ([self .corpus_id ], clue )
71
+ temp_res = self . doc_store .search_corpora ([self .corpus_id ], clue )
71
72
# Search the document store
72
73
for score , doc_entity in temp_res :
73
74
document_text = doc_entity .document
74
- citation = ctx . corpus_metadata .get_document_citation (self .corpus_id , doc_entity .document_id )
75
+ citation = self . metadata_store .get_document_citation (self .corpus_id , doc_entity .document_id )
75
76
doc_store_results .append ([score , document_text , citation ])
76
77
77
78
# Search for the vectors
78
79
vec_store_results : list [tuple [float , str , Citation ]] = []
79
- temp_res2 = ctx . corpus_vec .search_corpora ([self .corpus_id ], clue )
80
+ temp_res2 = self . vec_store .search_corpora ([self .corpus_id ], clue )
80
81
for score , doc_entity , start_index , end_index in temp_res2 :
81
82
82
83
# Verify that the text recovered from the vectors fits the maximum sentence criteria
83
84
if end_index - start_index != len (doc_entity .document ):
84
85
_log .error ("Index not aligned with actual document" , exc_info = True )
85
86
raise SentenceLengthOverflowException (end_index - start_index )
86
87
87
- citation = ctx . corpus_metadata .get_document_citation (self .corpus_id , doc_entity .document_id )
88
+ citation = self . metadata_store .get_document_citation (self .corpus_id , doc_entity .document_id )
88
89
vec_store_results .append ([score , doc_entity .document , citation ])
89
90
90
91
# If any of the searches returned no results combine and return
@@ -100,11 +101,14 @@ def search(self, clue: str) -> list[tuple[float, str, Citation]]:
100
101
101
102
return results
102
103
103
- def generate_search_instructions (self , clue : str ) -> any :
104
- pass
105
-
106
104
107
105
class BasicCorpusFactory (CorpusFactory ):
106
+ def __init__ (self , metadata_store : CorpusDocumentMetadataStore , doc_store : CorpusDocumentStore , vec_store : CorpusVectorStore ) -> None :
107
+ super ().__init__ ()
108
+ self .metadata_store : CorpusDocumentMetadataStore = metadata_store
109
+ self .doc_store : CorpusDocumentStore = doc_store
110
+ self .vec_store : CorpusVectorStore = vec_store
111
+
108
112
def produce (self , corpus_id : uuid .UUID ):
109
113
# TODO: Maybe change the Corpus Name Parameter
110
- return BasicCorpus (corpus_id , "BasicCorpus" )
114
+ return BasicCorpus (corpus_id , "BasicCorpus" , self . metadata_store , self . doc_store , self . vec_store )
0 commit comments