-
Notifications
You must be signed in to change notification settings - Fork 104
/
request_processor.py
119 lines (97 loc) · 4.57 KB
/
request_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
Copyright (C) 2023-2024 Fern Lane
This file is part of the GPT-Telegramus distribution
(see <https://github.com/F33RNI/GPT-Telegramus>)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import logging
import multiprocessing
from typing import Dict
import logging_handler
import messages
import users_handler
import request_response_container
import module_wrapper_global
from async_helper import async_helper
from bot_sender import send_message_async
from queue_container_helpers import get_container_from_queue, put_container_to_queue
def request_processor(
config: Dict,
messages_: messages.Messages,
users_handler_: users_handler.UsersHandler,
logging_queue: multiprocessing.Queue,
request_response_queue: multiprocessing.Queue,
lock: multiprocessing.Lock,
request_id: int,
module: module_wrapper_global.ModuleWrapperGlobal,
) -> None:
"""Processes request to any module
This method should be called from multiprocessing as process
Args:
config (Dict): global config
messages_ (messages.Messages): initialized messages handler
users_handler_ (users_handler.UsersHandler): initialized users handler
logging_queue (multiprocessing.Queue): logging queue from logging handler
request_response_queue (multiprocessing.Queue): queue of request-response containers
lock (multiprocessing.Lock): lock from queue handler
request_id (int): ID of container
module (module_wrapper_global.ModuleWrapperGlobal): requested module
"""
# Setup logging for current process
logging_handler.worker_configurer(logging_queue)
logging.info("request_processor started")
# Get request
request_ = get_container_from_queue(request_response_queue, lock, request_id)
user_id = request_.user_id
# Check request
if request_ is None:
logging.error("Error retrieving container from the queue")
return
try:
# Send initial message
if config.get("telegram").get("response_initial_message"):
request_.response_text = config.get("telegram").get("response_initial_message")
async_helper(send_message_async(config.get("telegram"), messages_, request_, end=False))
request_.response_text = ""
# Set active state
request_.processing_state = request_response_container.PROCESSING_STATE_ACTIVE
user = users_handler_.get_user(user_id)
# Increment number of requests for statistics
users_handler_.set_key(
user_id, f"requests_{module.name}", users_handler_.get_key(0, f"requests_{module.name}", 0, user=user) + 1
)
users_handler_.set_key(
user_id, "requests_total", users_handler_.get_key(0, "requests_total", 0, user=user) + 1
)
# Save request data (for regenerate function)
users_handler_.set_key(user_id, "request_last", request_.request_text)
if request_.request_image:
users_handler_.save_request_image(user_id, request_.request_image)
else:
users_handler_.save_request_image(user_id, None)
users_handler_.set_key(user_id, "reply_message_id_last", request_.reply_message_id)
# Update container in the queue
put_container_to_queue(request_response_queue, lock, request_)
# Process request
module.process_request(request_)
# Error during processing request
except Exception as e:
request_.error = True
lang_id = users_handler_.get_key(user_id, "lang_id", "eng")
error_text = str(e)[:1000]
request_.response_text = messages_.get_message("response_error", lang_id=lang_id).format(error_text=error_text)
async_helper(send_message_async(config.get("telegram"), messages_, request_, end=True))
logging.error("Error processing request", exc_info=e)
# Set done state
request_.processing_state = request_response_container.PROCESSING_STATE_DONE
# Finally, update container in the queue
put_container_to_queue(request_response_queue, lock, request_)