forked from dhruvpanchal7/GeminiPdfChat
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app2.py
320 lines (257 loc) · 13.2 KB
/
app2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import os
from PyPDF2 import PdfReader
from pdfminer.high_level import extract_text
from pdf2image import convert_from_path
import pytesseract
from PIL import Image
import io
import tempfile
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings
import streamlit as st
import google.generativeai as genai
import backoff
from langchain_community.vectorstores import FAISS
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts import PromptTemplate
from dotenv import load_dotenv
import logging
import uuid # Importante para generar identificadores únicos
import time
import threading
# Configuración básica del logging
logging.basicConfig(
level=logging.DEBUG, # Captura todos los niveles de debug hacia arriba
format='%(asctime)s - %(levelname)s - %(message)s',
filename='app.log', # Guarda los logs en un archivo
filemode='w' # Sobrescribe el archivo de log cada vez que se inicia la app
)
# Para capturar también los logs en la consola, puedes agregar un StreamHandler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger('').addHandler(console_handler)
logging.getLogger().setLevel(logging.INFO)
load_dotenv()
os.getenv("xxxxxxxxxxxxxxxxxxxxxxxxxxxx")
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
class BlockedPromptException(Exception):
"""Excepción para indicar que se ha bloqueado un prompt por algún motivo."""
pass
class GoogleAPIError(Exception):
"""Excepción para errores al llamar a la API de Google Generative AI."""
pass
# Configuración de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def process_heavy_file(file_path):
print(f"Procesando archivo: {file_path}")
time.sleep(10) # Simula el tiempo de procesamiento
print(f"Archivo procesado: {file_path}")
def start_background_processing(file_path):
thread = threading.Thread(target=process_heavy_file, args=(file_path,))
thread.start()
return thread
# Definir una función para borrar los archivos despues de 1 dia
def clean_old_faiss_indices():
directory = '.' # Directorio donde se guardan los índices FAISS
now = time.time()
cutoff = 86400 # 24 horas * 60 minutos * 60 segundos
for filename in os.listdir(directory):
if filename.startswith('faiss_index_'): # Asegúrate de que este prefijo coincida con cómo nombras los archivos de índice
filepath = os.path.join(directory, filename)
if os.path.isfile(filepath):
file_modified = os.path.getmtime(filepath)
if now - file_modified > cutoff:
os.remove(filepath)
print(f'Índice FAISS eliminado: {filename}')
# Definir una función que maneje los eventos de backoff, opcional pero útil para el logging
def backoff_hdlr(details):
logging.warning("Backing off {wait:0.1f} seconds afters {tries} tries calling function {target} with args {args} and kwargs {kwargs}".format(**details))
# Decorador para implementar reintentos con backoff exponencial en la función correcta
@backoff.on_exception(backoff.expo,
GoogleAPIError, # Asegúrate de que esta excepción se lanza/captura correctamente en tu código
max_tries=8,
on_backoff=backoff_hdlr)
def call_chain_with_backoff(docs, user_question):
chain = get_conversational_chain()
return chain({"input_documents": docs, "question": user_question}, return_only_outputs=True)
# read all pdf files and return text
def get_pdf_text(pdf_docs):
text = ""
for uploaded_file in pdf_docs:
try:
logging.info("Iniciando extracción de texto del PDF.")
# Lee el contenido del archivo cargado
bytes_data = uploaded_file.getvalue()
# Extraer texto usando PDFMiner directamente de bytes
extracted_text = extract_text(io.BytesIO(bytes_data))
if extracted_text.strip():
text += extracted_text + "\n"
else:
# Guarda temporalmente el archivo para usar con pdf2image
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
tmp_file.write(bytes_data)
tmp_file_path = tmp_file.name
# Convertir PDF a imágenes (una por página) y aplicar OCR
images = convert_from_path(tmp_file_path)
for image in images:
text += pytesseract.image_to_string(image) + "\n"
except Exception as e:
logging.error("Error procesando PDF: %s", e)
st.error(f"Error procesando PDF: {e}")
pass
return text
# split text into chunks
def get_text_chunks(text):
# Suponiendo que esta función divide el texto en chunks
# Asegúrate de que el texto no esté vacío antes de dividirlo
if not text.strip():
return [] # Devuelve una lista vacía si el texto está vacío
splitter = RecursiveCharacterTextSplitter(
chunk_size=10000, chunk_overlap=1000)
chunks = splitter.split_text(text)
return chunks # list of strings
# get embeddings for each chunk
def get_conversational_chain():
prompt_template = """
Contesta la pregunta con la información mas completa, analizando perfectamente bien el documento que el usuario te proporcionó\n\n
Context:\n {context}?\n
Question: \n{question}\n
Answer:
"""
model = ChatGoogleGenerativeAI(model="gemini-pro",
client=genai,
temperature=0.3,
)
prompt = PromptTemplate(template=prompt_template,
input_variables=["context", "question"])
chain = load_qa_chain(llm=model, chain_type="stuff", prompt=prompt)
return chain
def clear_chat_history():
st.session_state.messages = [
{"role": "assistant", "content": "upload some pdfs and ask me a question"}]
def get_vector_store(chunks):
try:
logging.info("Generando embeddings y guardando el índice FAISS específico del usuario.")
print("Generando embeddings...")
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
vector_store = FAISS.from_texts(chunks, embedding=embeddings)
faiss_index_path = get_user_specific_faiss_index_path()
print(f"Guardando el índice FAISS en {faiss_index_path}...")
vector_store.save_local(faiss_index_path)
print("Índice FAISS creado y guardado específicamente para el usuario.")
except Exception as e:
logging.error("Error generando embeddings o guardando el índice FAISS: %s", e)
st.error(f"Error durante la generación de embeddings o al guardar el índice FAISS: {e}")
st.write(f"Detalles del error: {e}")
# Función modificada para gestionar el índice FAISS por usuario
def get_user_specific_faiss_index_path():
if 'user_id' not in st.session_state:
st.session_state.user_id = str(uuid.uuid4())
return f"faiss_index_{st.session_state.user_id}"
def user_input(user_question):
try:
# Asegúrate de que embeddings sea una instancia adecuada de Embeddings.
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
# faiss_index_path debe ser el directorio donde se encuentran el índice y los archivos relacionados.
faiss_index_path = get_user_specific_faiss_index_path()
# Carga el índice con la deserialización insegura permitida si confías en el origen de los archivos.
new_db = FAISS.load_local(folder_path=faiss_index_path,
embeddings=embeddings,
allow_dangerous_deserialization=True)
docs = new_db.similarity_search(user_question)
response = call_chain_with_backoff(docs, user_question)
print(response)
return response
except BlockedPromptException as e:
logging.error(f"Se bloqueó el procesamiento de un documento debido a contenido potencialmente dañino: {e}")
st.error("No se pudo procesar el documento debido a restricciones de seguridad.")
return {"output_text": "El contenido no se pudo procesar debido a restricciones de seguridad."}
except GoogleAPIError as e:
logging.error(f"Error al llamar a Google Generative AI API: {e}")
st.error("Hubo un problema al conectar con Google Generative AI API. Por favor, inténtalo de nuevo.")
return {"output_text": "Error al conectar con el servicio de Google Generative AI."}
def main():
clean_old_faiss_indices() # Limpieza de índices antiguos al inicio
st.set_page_config(page_title="Tu PDF.AI", page_icon="🤖")
# CSS para ocultar el menú hamburguesa y el pie de página "Made with Streamlit"
hide_streamlit_style = """
<style>
#MainMenu {visibility: hidden;}
footer {visibility: hidden;}
</style>
"""
st.markdown(hide_streamlit_style, unsafe_allow_html=True)
if 'pdf_preview' not in st.session_state:
st.session_state.pdf_preview = None
if 'processing_attempted' not in st.session_state:
st.session_state.processing_attempted = False
with st.sidebar:
st.title("Menú:")
pdf_docs = st.file_uploader("Sube tu archivo PDF y haz click en Subir y Procesar", type=["pdf"], accept_multiple_files=False)
if pdf_docs is not None:
# Verifica el tamaño del archivo aquí
file_size = pdf_docs.size # Tamaño del archivo en bytes
file_size_mb = file_size / (1024 * 1024) # Convertir a megabytes
if file_size_mb > 35:
st.error("El archivo supera el límite de 35MB. Por favor, carga un archivo más pequeño.")
else:
if st.button("Subir y Procesar"):
process_pdf(pdf_docs)
# Main content area for displaying chat messages and PDF previews
st.write("Conversa con tu PDF!")
if st.session_state.pdf_preview is not None:
st.image(st.session_state.pdf_preview, caption='Preview de la primera página del PDF', use_column_width=True)
# Resto de la lógica para el chat y el procesamiento de preguntas
if "messages" not in st.session_state:
st.session_state.messages = [{"role": "assistant", "content": "Sube tu PDF y hazme preguntas"}]
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
prompt = st.chat_input()
if prompt:
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.write(prompt)
if st.session_state.messages[-1]["role"] != "assistant":
with st.spinner("Thinking..."):
response = user_input(prompt)
placeholder = st.empty()
full_response = ''.join(response.get('output_text', ["Ocurrió un error al procesar tu pregunta. Por favor, inténtalo de nuevo."]))
placeholder.markdown(full_response)
message = {"role": "assistant", "content": full_response}
st.session_state.messages.append(message)
st.sidebar.button('Borrar Historial del Chat', on_click=clear_chat_history)
def process_pdf(pdf_docs):
try:
# Verificación del tamaño del archivo
file_size = pdf_docs.size # Tamaño del archivo en bytes
file_size_mb = file_size / (1024 * 1024) # Convertir a megabytes
# Si el archivo es mayor a 35 MB, muestra un mensaje de error y retorna
if file_size_mb > 35:
st.error("El archivo supera el límite de 35MB. Por favor, carga un archivo más pequeño.")
return
logging.info("Procesando archivo PDF.")
except Exception as e:
logging.error("Error al procesar el PDF: %s", e)
st.error(f"Error al procesar el PDF: {e}")
try:
bytes_data = pdf_docs.getvalue()
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
tmp_file.write(bytes_data)
tmp_file_path = tmp_file.name
images = convert_from_path(tmp_file_path)
st.session_state.pdf_preview = images[0]
st.success("PDF procesado con éxito.")
except Exception as e:
st.error(f"Error al procesar el PDF: {e}")
raw_text = get_pdf_text([pdf_docs])
text_chunks = get_text_chunks(raw_text)
if text_chunks:
get_vector_store(text_chunks)
else:
if st.session_state.processing_attempted:
st.error("No se encontró texto para procesar.")
if __name__ == "__main__":
main()