Skip to content

Commit

Permalink
Add loguru for logging (#282)
Browse files Browse the repository at this point in the history
* Add loguru for logging

* Add logger

* Replace exception with error

---------

Co-authored-by: Raghu Ganapathi <[email protected]>
  • Loading branch information
raghu017 and Raghu Ganapathi authored May 31, 2023
1 parent 66d91c9 commit 36354a2
Show file tree
Hide file tree
Showing 20 changed files with 141 additions and 132 deletions.
5 changes: 3 additions & 2 deletions datastore/providers/analyticdb_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime
from loguru import logger

from psycopg2cffi import compat

Expand Down Expand Up @@ -252,7 +253,7 @@ def create_results(data):
QueryResult(query=query.query, results=results)
)
except Exception as e:
print("error:", e)
logger.error(e)
query_results.append(QueryResult(query=query.query, results=[]))
return query_results
finally:
Expand All @@ -275,7 +276,7 @@ async def execute_delete(query: str, params: Optional[List] = None) -> bool:
self.conn.commit()
return True
except Exception as e:
print(f"Error: {e}")
logger.error(e)
return False
finally:
self.connection_pool.putconn(conn)
Expand Down
59 changes: 26 additions & 33 deletions datastore/providers/milvus_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import asyncio

from loguru import logger
from typing import Dict, List, Optional
from pymilvus import (
Collection,
Expand Down Expand Up @@ -124,14 +125,6 @@ def __init__(
self._create_collection(MILVUS_COLLECTION, create_new) # type: ignore
self._create_index()

def _print_info(self, msg):
# TODO: logger
print(msg)

def _print_err(self, msg):
# TODO: logger
print(msg)

def _get_schema(self):
return SCHEMA_V1 if self._schema_ver == "V1" else SCHEMA_V2

Expand All @@ -143,7 +136,7 @@ def _create_connection(self):
addr = connections.get_connection_addr(x[0])
if x[1] and ('address' in addr) and (addr['address'] == "{}:{}".format(MILVUS_HOST, MILVUS_PORT)):
self.alias = x[0]
self._print_info("Reuse connection to Milvus server '{}:{}' with alias '{:s}'"
logger.info("Reuse connection to Milvus server '{}:{}' with alias '{:s}'"
.format(MILVUS_HOST, MILVUS_PORT, self.alias))
break

Expand All @@ -158,10 +151,10 @@ def _create_connection(self):
password=MILVUS_PASSWORD, # type: ignore
secure=MILVUS_USE_SECURITY,
)
self._print_info("Create connection to Milvus server '{}:{}' with alias '{:s}'"
logger.info("Create connection to Milvus server '{}:{}' with alias '{:s}'"
.format(MILVUS_HOST, MILVUS_PORT, self.alias))
except Exception as e:
self._print_err("Failed to create connection to Milvus server '{}:{}', error: {}"
logger.error("Failed to create connection to Milvus server '{}:{}', error: {}"
.format(MILVUS_HOST, MILVUS_PORT, e))

def _create_collection(self, collection_name, create_new: bool) -> None:
Expand Down Expand Up @@ -189,7 +182,7 @@ def _create_collection(self, collection_name, create_new: bool) -> None:
consistency_level=self._consistency_level,
)
self._schema_ver = "V2"
self._print_info("Create Milvus collection '{}' with schema {} and consistency level {}"
logger.info("Create Milvus collection '{}' with schema {} and consistency level {}"
.format(collection_name, self._schema_ver, self._consistency_level))
else:
# If the collection exists, point to it
Expand All @@ -201,10 +194,10 @@ def _create_collection(self, collection_name, create_new: bool) -> None:
if field.name == "id" and field.is_primary:
self._schema_ver = "V2"
break
self._print_info("Milvus collection '{}' already exists with schema {}"
logger.info("Milvus collection '{}' already exists with schema {}"
.format(collection_name, self._schema_ver))
except Exception as e:
self._print_err("Failed to create collection '{}', error: {}".format(collection_name, e))
logger.error("Failed to create collection '{}', error: {}".format(collection_name, e))

def _create_index(self):
# TODO: verify index/search params passed by os.environ
Expand All @@ -216,7 +209,7 @@ def _create_index(self):
if self.index_params is not None:
# Convert the string format to JSON format parameters passed by MILVUS_INDEX_PARAMS
self.index_params = json.loads(self.index_params)
self._print_info("Create Milvus index: {}".format(self.index_params))
logger.info("Create Milvus index: {}".format(self.index_params))
# Create an index on the 'embedding' field with the index params found in init
self.col.create_index(EMBEDDING_FIELD, index_params=self.index_params)
else:
Expand All @@ -227,24 +220,24 @@ def _create_index(self):
"index_type": "HNSW",
"params": {"M": 8, "efConstruction": 64},
}
self._print_info("Attempting creation of Milvus '{}' index".format(i_p["index_type"]))
logger.info("Attempting creation of Milvus '{}' index".format(i_p["index_type"]))
self.col.create_index(EMBEDDING_FIELD, index_params=i_p)
self.index_params = i_p
self._print_info("Creation of Milvus '{}' index successful".format(i_p["index_type"]))
logger.info("Creation of Milvus '{}' index successful".format(i_p["index_type"]))
# If create fails, most likely due to being Zilliz Cloud instance, try to create an AutoIndex
except MilvusException:
self._print_info("Attempting creation of Milvus default index")
logger.info("Attempting creation of Milvus default index")
i_p = {"metric_type": "IP", "index_type": "AUTOINDEX", "params": {}}
self.col.create_index(EMBEDDING_FIELD, index_params=i_p)
self.index_params = i_p
self._print_info("Creation of Milvus default index successful")
logger.info("Creation of Milvus default index successful")
# If an index already exists, grab its params
else:
# How about if the first index is not vector index?
for index in self.col.indexes:
idx = index.to_dict()
if idx["field"] == EMBEDDING_FIELD:
self._print_info("Index already exists: {}".format(idx))
logger.info("Index already exists: {}".format(idx))
self.index_params = idx['index_param']
break

Expand Down Expand Up @@ -272,9 +265,9 @@ def _create_index(self):
}
# Set the search params
self.search_params = default_search_params[self.index_params["index_type"]]
self._print_info("Milvus search parameters: {}".format(self.search_params))
logger.info("Milvus search parameters: {}".format(self.search_params))
except Exception as e:
self._print_err("Failed to create index, error: {}".format(e))
logger.error("Failed to create index, error: {}".format(e))

async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
"""Upsert chunks into the datastore.
Expand Down Expand Up @@ -319,18 +312,18 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
for batch in batches:
if len(batch[0]) != 0:
try:
self._print_info(f"Upserting batch of size {len(batch[0])}")
logger.info(f"Upserting batch of size {len(batch[0])}")
self.col.insert(batch)
self._print_info(f"Upserted batch successfully")
logger.info(f"Upserted batch successfully")
except Exception as e:
self._print_err(f"Failed to insert batch records, error: {e}")
logger.error(f"Failed to insert batch records, error: {e}")
raise e

# This setting perfoms flushes after insert. Small insert == bad to use
# self.col.flush()
return doc_ids
except Exception as e:
self._print_err("Failed to insert records, error: {}".format(e))
logger.error("Failed to insert records, error: {}".format(e))
return []


Expand Down Expand Up @@ -365,7 +358,7 @@ def _get_values(self, chunk: DocumentChunk) -> List[any] | None: # type: ignore
x = values.get(key) or default
# If one of our required fields is missing, ignore the entire entry
if x is Required:
self._print_info("Chunk " + values["id"] + " missing " + key + " skipping")
logger.info("Chunk " + values["id"] + " missing " + key + " skipping")
return None
# Add the corresponding value if it passes the tests
ret.append(x)
Expand Down Expand Up @@ -436,7 +429,7 @@ async def _single_query(query: QueryWithEmbedding) -> QueryResult:

return QueryResult(query=query.query, results=results)
except Exception as e:
self._print_err("Failed to query, error: {}".format(e))
logger.error("Failed to query, error: {}".format(e))
return QueryResult(query=query.query, results=[])

results: List[QueryResult] = await asyncio.gather(
Expand All @@ -460,7 +453,7 @@ async def delete(
# If deleting all, drop and create the new collection
if delete_all:
coll_name = self.col.name
self._print_info("Delete the entire collection {} and create new one".format(coll_name))
logger.info("Delete the entire collection {} and create new one".format(coll_name))
# Release the collection from memory
self.col.release()
# Drop the collection
Expand Down Expand Up @@ -490,7 +483,7 @@ async def delete(
pks = ['"' + pk + '"' for pk in pks]

# Delete by ids batch by batch(avoid too long expression)
self._print_info("Apply {:d} deletions to schema {:s}".format(len(pks), self._schema_ver))
logger.info("Apply {:d} deletions to schema {:s}".format(len(pks), self._schema_ver))
while len(pks) > 0:
batch_pks = pks[:batch_size]
pks = pks[batch_size:]
Expand All @@ -499,7 +492,7 @@ async def delete(
# Increment our deleted count
delete_count += int(res.delete_count) # type: ignore
except Exception as e:
self._print_err("Failed to delete by ids, error: {}".format(e))
logger.error("Failed to delete by ids, error: {}".format(e))

try:
# Check if empty filter
Expand All @@ -524,9 +517,9 @@ async def delete(
# Increment our delete count
delete_count += int(res.delete_count) # type: ignore
except Exception as e:
self._print_err("Failed to delete by filter, error: {}".format(e))
logger.error("Failed to delete by filter, error: {}".format(e))

self._print_info("{:d} records deleted".format(delete_count))
logger.info("{:d} records deleted".format(delete_count))

# This setting performs flushes after delete. Small delete == bad to use
# self.col.flush()
Expand Down
3 changes: 2 additions & 1 deletion datastore/providers/pgvector_datastore.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from datetime import datetime
from loguru import logger

from services.date import to_unix_timestamp
from datastore.datastore import DataStore
Expand Down Expand Up @@ -147,7 +148,7 @@ async def _query(self, queries: List[QueryWithEmbedding]) -> List[QueryResult]:
results.append(document_chunk)
query_results.append(QueryResult(query=query.query, results=results))
except Exception as e:
print("error:", e)
logger.error(e)
query_results.append(QueryResult(query=query.query, results=[]))
return query_results

Expand Down
43 changes: 22 additions & 21 deletions datastore/providers/pinecone_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pinecone
from tenacity import retry, wait_random_exponential, stop_after_attempt
import asyncio
from loguru import logger

from datastore.datastore import DataStore
from models.models import (
Expand Down Expand Up @@ -41,7 +42,7 @@ def __init__(self):

# Create a new index with the specified name, dimension, and metadata configuration
try:
print(
logger.info(
f"Creating index {PINECONE_INDEX} with metadata config {fields_to_index}"
)
pinecone.create_index(
Expand All @@ -50,18 +51,18 @@ def __init__(self):
metadata_config={"indexed": fields_to_index},
)
self.index = pinecone.Index(PINECONE_INDEX)
print(f"Index {PINECONE_INDEX} created successfully")
logger.info(f"Index {PINECONE_INDEX} created successfully")
except Exception as e:
print(f"Error creating index {PINECONE_INDEX}: {e}")
logger.error(f"Error creating index {PINECONE_INDEX}: {e}")
raise e
elif PINECONE_INDEX and PINECONE_INDEX in pinecone.list_indexes():
# Connect to an existing index with the specified name
try:
print(f"Connecting to existing index {PINECONE_INDEX}")
logger.info(f"Connecting to existing index {PINECONE_INDEX}")
self.index = pinecone.Index(PINECONE_INDEX)
print(f"Connected to index {PINECONE_INDEX} successfully")
logger.info(f"Connected to index {PINECONE_INDEX} successfully")
except Exception as e:
print(f"Error connecting to index {PINECONE_INDEX}: {e}")
logger.error(f"Error connecting to index {PINECONE_INDEX}: {e}")
raise e

@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3))
Expand All @@ -78,7 +79,7 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
for doc_id, chunk_list in chunks.items():
# Append the id to the ids list
doc_ids.append(doc_id)
print(f"Upserting document_id: {doc_id}")
logger.info(f"Upserting document_id: {doc_id}")
for chunk in chunk_list:
# Create a vector tuple of (id, embedding, metadata)
# Convert the metadata object to a dict with unix timestamps for dates
Expand All @@ -97,11 +98,11 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
# Upsert each batch to Pinecone
for batch in batches:
try:
print(f"Upserting batch of size {len(batch)}")
logger.info(f"Upserting batch of size {len(batch)}")
self.index.upsert(vectors=batch)
print(f"Upserted batch successfully")
logger.info(f"Upserted batch successfully")
except Exception as e:
print(f"Error upserting batch: {e}")
logger.error(f"Error upserting batch: {e}")
raise e

return doc_ids
Expand All @@ -117,7 +118,7 @@ async def _query(

# Define a helper coroutine that performs a single query and returns a QueryResult
async def _single_query(query: QueryWithEmbedding) -> QueryResult:
print(f"Query: {query.query}")
logger.debug(f"Query: {query.query}")

# Convert the metadata filter object to a dict with pinecone filter expressions
pinecone_filter = self._get_pinecone_filter(query.filter)
Expand All @@ -132,7 +133,7 @@ async def _single_query(query: QueryWithEmbedding) -> QueryResult:
include_metadata=True,
)
except Exception as e:
print(f"Error querying index: {e}")
logger.error(f"Error querying index: {e}")
raise e

query_results: List[DocumentChunkWithScore] = []
Expand Down Expand Up @@ -184,35 +185,35 @@ async def delete(
# Delete all vectors from the index if delete_all is True
if delete_all:
try:
print(f"Deleting all vectors from index")
logger.info(f"Deleting all vectors from index")
self.index.delete(delete_all=True)
print(f"Deleted all vectors successfully")
logger.info(f"Deleted all vectors successfully")
return True
except Exception as e:
print(f"Error deleting all vectors: {e}")
logger.error(f"Error deleting all vectors: {e}")
raise e

# Convert the metadata filter object to a dict with pinecone filter expressions
pinecone_filter = self._get_pinecone_filter(filter)
# Delete vectors that match the filter from the index if the filter is not empty
if pinecone_filter != {}:
try:
print(f"Deleting vectors with filter {pinecone_filter}")
logger.info(f"Deleting vectors with filter {pinecone_filter}")
self.index.delete(filter=pinecone_filter)
print(f"Deleted vectors with filter successfully")
logger.info(f"Deleted vectors with filter successfully")
except Exception as e:
print(f"Error deleting vectors with filter: {e}")
logger.error(f"Error deleting vectors with filter: {e}")
raise e

# Delete vectors that match the document ids from the index if the ids list is not empty
if ids is not None and len(ids) > 0:
try:
print(f"Deleting vectors with ids {ids}")
logger.info(f"Deleting vectors with ids {ids}")
pinecone_filter = {"document_id": {"$in": ids}}
self.index.delete(filter=pinecone_filter) # type: ignore
print(f"Deleted vectors with ids successfully")
logger.info(f"Deleted vectors with ids successfully")
except Exception as e:
print(f"Error deleting vectors with ids: {e}")
logger.error(f"Error deleting vectors with ids: {e}")
raise e

return True
Expand Down
Loading

0 comments on commit 36354a2

Please sign in to comment.