diff --git a/main.py b/main.py index 535dc8f..5533f52 100644 --- a/main.py +++ b/main.py @@ -12,7 +12,7 @@ import food import util -from util import UserSelector +from selection import UserSelector bot = BotApp() userSelector = UserSelector() @@ -37,7 +37,7 @@ async def capture(): asyncio.create_task(capture()) # noinspection PyTypeChecker -@bot.command(name="add", desc="إضافة طلبك (فقط إذا لم يتم إضافته بشكل تلقائي)") +@bot.command(name="add", desc="إضافة طلبك") async def add_command(update: Update, context: ContextTypes.DEFAULT_TYPE): order = " ".join(context.args) if not order: @@ -52,11 +52,14 @@ async def add_command(update: Update, context: ContextTypes.DEFAULT_TYPE): @bot.command(name="delete", desc="مسح طلبك") async def delete_command(update: Update, context: ContextTypes.DEFAULT_TYPE): user = util.current_user(update) - for (msg_id, u), _ in list(orders.items()): + not_found = True + for (msg_id, u), o in list(orders.items()): if user == u: + not_found = False del orders[(msg_id, u)] + await update.message.reply_text(f'تم مسح طلبك "{o}" بنجاح') - await update.message.reply_text('تمت مسح طلبك بنجاح') + if not_found: await update.message.reply_text('لا توجد طلبات') @bot.command(name="list", desc="عرض الطلبات") @@ -83,9 +86,11 @@ async def clear_command(update: Update, context: ContextTypes.DEFAULT_TYPE): global orders orders = {} - if context.args == "+selection_history": + msg = "تم مسح جميع الطلبات بنجاح" + if "+selection_history" in context.args: + msg += " و تم مسح جميع اختيارات المستخدمين أيضا" userSelector.clear_history() - await update.message.reply_text("تم مسح جميع الطلبات بنجاح") + await update.message.reply_text(msg) @bot.command(name="ping", desc="اختبار البوت") diff --git a/selection.py b/selection.py new file mode 100644 index 0000000..94c7d5c --- /dev/null +++ b/selection.py @@ -0,0 +1,74 @@ +import json +import logging +import os +import random +from abc import abstractmethod, ABC + + +class HistoryManager(ABC): + def __init__(self): + self.history = [] + + @abstractmethod + def load_history(self): + pass + + def save_history(self, h): + self.history = h + + +class InMemoryHistoryManager(HistoryManager): + def load_history(self): + pass + + +class FileSystemHistoryManager(HistoryManager): + def __init__(self, file_path='~/.lunchy/history.json'): + super().__init__() + self.file_path = os.path.expanduser(file_path) + self.load_history() + + def load_history(self): + directory = os.path.dirname(self.file_path) + if directory and not os.path.exists(directory): + os.makedirs(directory) + + if os.path.exists(self.file_path): + with open(self.file_path, 'r') as file: + self.history = json.load(file) + else: + self.history = [] + self.save_history(self.history) + + def save_history(self, h): + super().save_history(h) + with open(self.file_path, 'w') as file: + json.dump(self.history, file) + + +class UserSelector: + def __init__(self, exclude_gap=0, history_manager=FileSystemHistoryManager()): + self.exclude_gap = exclude_gap + self.history_manager = history_manager + + def select(self, users): + if not users: + raise ValueError('users list should not be empty') + + uniq_len = len(list(set(users))) + excluded_users = self.history_manager.history[ + len(self.history_manager.history) - + (uniq_len - 1 if uniq_len <= self.exclude_gap else self.exclude_gap): + ] + selected = random.choice(users) + + if selected in excluded_users: + logging.warning(f'{selected} was in excluded history: {excluded_users}, re-selecting...') + return self.select(users) + else: + logging.info(f'users: {users}, selected user is: {selected}') + self.history_manager.save_history((self.history_manager.history + [selected])[-10:]) + return selected + + def clear_history(self): + self.history_manager.save_history([]) diff --git a/test_util.py b/test_selection.py similarity index 72% rename from test_util.py rename to test_selection.py index c0833ec..fc4c9b8 100644 --- a/test_util.py +++ b/test_selection.py @@ -1,7 +1,7 @@ import logging import unittest -from util import UserSelector +from selection import UserSelector class TestUserSelector(unittest.TestCase): @@ -32,7 +32,7 @@ def test_select_single_user(self): self.assertEqual(selected, "Alice") def test_select_non_excluded_from_2_users_with_2_selection_gap(self): - selector = UserSelector(selection_gap=2) + selector = UserSelector(exclude_gap=2) users = ["Alice", "Bob", "Bob", "Bob", "Bob"] selector.history_manager.history = ["Alice", "Bob", "Alice"] @@ -42,7 +42,7 @@ def test_select_non_excluded_from_2_users_with_2_selection_gap(self): self.assertEqual(selected, expected_selection) def test_select_non_excluded_3_users_with_2_selection_gap(self): - selector = UserSelector(selection_gap=2) + selector = UserSelector(exclude_gap=2) users = ["Alice", "Alice", "Bob", "Bob", "Charlie", "Charlie", "Charlie"] selector.history_manager.history = ["Alice", "Bob", "Charlie"] @@ -52,17 +52,17 @@ def test_select_non_excluded_3_users_with_2_selection_gap(self): self.assertEqual(selected, expected_selection) def test_select_non_excluded_from_2_users_with_0_selection_gap(self): - selector = UserSelector(selection_gap=0) + selector = UserSelector(exclude_gap=0) users = ["Alice", "Bob", "Bob", "Bob", "Bob"] selector.history_manager.history = ["Alice", "Bob", "Alice"] selected = selector.select(users) self.assertIn(selected, ["Alice", "Bob"]) - def test_select_non_excluded_3_users_with_0_selection_gap(self): + def test_select_non_excluded_3_users_with_0_selection_gap__(self): count = 0 for _ in range(0, 100): - selector = UserSelector(selection_gap=0) + selector = UserSelector(exclude_gap=0) users = ["Alice", "Alice", "Bob", "Bob", "Charlie", "Charlie", "Charlie"] selector.history_manager.history = ["Alice", "Bob", "Charlie"] selected = selector.select(users) @@ -71,10 +71,22 @@ def test_select_non_excluded_3_users_with_0_selection_gap(self): self.assertGreater(count, 0) - def test_select_non_excluded_3_users_with_0_selection_gap_2(self): + def test_select_non_excluded_3_users_with_1_selection_gap__(self): count = 0 for _ in range(0, 100): - selector = UserSelector(selection_gap=2) + selector = UserSelector(exclude_gap=2) + users = ["Alice", "Alice", "Bob", "Bob", "Charlie", "Charlie", "Charlie"] + selector.history_manager.history = ["Alice", "Bob", "Charlie"] + selected = selector.select(users) + if selected in ["Charlie"]: + count += 1 + + self.assertEqual(count, 0) + + def test_select_non_excluded_3_users_with_2_selection_gap__(self): + count = 0 + for _ in range(0, 100): + selector = UserSelector(exclude_gap=2) users = ["Alice", "Alice", "Bob", "Bob", "Charlie", "Charlie", "Charlie"] selector.history_manager.history = ["Alice", "Bob", "Charlie"] selected = selector.select(users) @@ -83,8 +95,20 @@ def test_select_non_excluded_3_users_with_0_selection_gap_2(self): self.assertEqual(count, 0) + def test_select_non_excluded_3_users_with_3_selection_gap__(self): + count = 0 + for _ in range(0, 100): + selector = UserSelector(exclude_gap=3) + users = ["Alice", "Alice", "Bob", "Bob", "Charlie", "Charlie", "Charlie", "Dave"] + selector.history_manager.history = ["Alice", "Bob", "Charlie"] + selected = selector.select(users) + if selected in ["Alice", "Bob", "Charlie"]: + count += 1 + + self.assertEqual(count, 0) + def test_select_non_excluded_3_users_with_ids(self): - selector = UserSelector(selection_gap=2) + selector = UserSelector(exclude_gap=2) users = [(1, "Alice"), (1, "Alice"), (2, "Bob"), (2, "Bob"), (3, "Charlie"), (3, "Charlie"), (3, "Charlie")] selector.history_manager.history = [(1, "Alice"), (2, "Bob"), (3, "Charlie")] @@ -95,7 +119,7 @@ def test_select_non_excluded_3_users_with_ids(self): def test_select_never_picks_last_n_from_history(self): for _ in range(100): - selector = UserSelector(selection_gap=2) + selector = UserSelector(exclude_gap=2) users = \ ["Alice", "Alice", "Bob", "Bob", "Charlie", "Charlie", "Charlie", "Diana", "Diana", "David", "David"] selector.history_manager.history = ["Alice", "Bob", "Charlie", "Diana"] @@ -103,13 +127,13 @@ def test_select_never_picks_last_n_from_history(self): self.assertIn(selected, ["Alice", "Bob", "David"]) def test_history_maintenance(self): - selector = UserSelector(selection_gap=2) + selector = UserSelector(exclude_gap=2) users = ["Alice", "Bob", "Charlie", "Diana"] selections = [] for _ in range(100): selected = selector.select(users) selections.append(selected) - self.assertEqual(selector.history_manager.history, selections[-selector.selection_gap:]) + self.assertEqual(selector.history_manager.history, selections[-10:]) if __name__ == "__main__": diff --git a/util.py b/util.py index 582649f..d8ff664 100644 --- a/util.py +++ b/util.py @@ -1,9 +1,5 @@ -import json -import logging -import os import random import time -from abc import abstractmethod, ABC from typing import Any from typing import Callable @@ -38,76 +34,6 @@ async def is_admin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: return False -class HistoryManager(ABC): - def __init__(self): - self.history = [] - - @abstractmethod - def load_history(self): - pass - - def save_history(self, h): - self.history = h - - -class InMemoryHistoryManager(HistoryManager): - def load_history(self): - pass - - -class FileSystemHistoryManager(HistoryManager): - def __init__(self, file_path=None): - super().__init__() - if file_path is None: - file_path = os.path.expanduser('~/.lunchy/history.json') - self.file_path = file_path - self.load_history() - - def load_history(self): - directory = os.path.dirname(self.file_path) - if directory and not os.path.exists(directory): - os.makedirs(directory) - - if os.path.exists(self.file_path): - with open(self.file_path, 'r') as file: - self.history = json.load(file) - else: - self.history = [] - self.save_history(self.history) - - def save_history(self, h): - with open(self.file_path, 'w') as file: - json.dump(self.history, file) - - -class UserSelector: - def __init__(self, selection_gap=0, history_manager=InMemoryHistoryManager()): - self.selection_gap = selection_gap - self.history_manager = history_manager - - def select(self, users): - if not users: - raise ValueError('users list should not be empty') - - uniq_len = len(list(set(users))) - excluded_users = self.history_manager.history[ - len(self.history_manager.history) - - (uniq_len - 1 if uniq_len <= self.selection_gap else self.selection_gap): - ] - selected = random.choice(users) - - if selected in excluded_users: - logging.warning(f'{selected} was in excluded history: {excluded_users}, re-selecting...') - return self.select(users) - else: - logging.info(f'users: {users}, selected user is: {selected}') - self.history_manager.save_history((self.history_manager.history + [selected])[-2:]) - return selected - - def clear_history(self): - self.history_manager.save_history([]) - - def get_congrats_msg(): return random.choice([ "صاحب الحظ السعيد اليوم هو",