Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 187 additions & 14 deletions cogs/events/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,22 +127,195 @@ async def on_member_remove(self, member: discord.Member) -> None:

@commands.Cog.listener()
async def on_message(self, message: discord.Message) -> None:
if message.author.bot:
"""
Thin wrapper listener that delegates real work to helper methods to keep complexity low.
"""
# If it's an anon bot message from any bot that's not us, only process if it's an anon message.
if message.author.bot and message.author.id != self.client.user.id:
if not (message.embeds and len(message.embeds) > 0 and message.embeds[0].title == "Anon Message"):
return
# if it is an anon message (bot message with embed), fall through β€” other handlers may act on it
# (original logic only continued when it was an anon message)
# return

# Try to handle reply-to-anon flows (separate helper to reduce complexity)
if message.reference and message.reference.message_id:
try:
await self._process_reply_to_anon(message)
except (discord.NotFound, discord.Forbidden, discord.HTTPException):
# Could not fetch the replied message or other discord errors β€” ignore as before
pass

# Handle EC Campus keyword check separately
try:
await self._maybe_handle_ec_campus_keyword(message)
except Exception:
# Don't let a non-critical error here bubble up and break other things
pass

# End of on_message
return

async def _process_reply_to_anon(self, message: discord.Message) -> None:
"""
Handle replies to anon messages and DM the original anon sender if appropriate.
This is extracted from on_message to reduce the complexity of the listener.
"""
replied_message = await message.channel.fetch_message(message.reference.message_id)
if not self._is_anon_message(replied_message):
return

anon_cog = self.client.get_cog("SlashAnon")
if not anon_cog or not hasattr(anon_cog, "anon_cache"):
return

original_sender_id = self._find_sender_id(anon_cog, replied_message.id)
current_sender_id, is_current_anon = await self._identify_current_sender(anon_cog, message)

if not (original_sender_id and current_sender_id and original_sender_id != current_sender_id):
return

await self._notify_original_sender(original_sender_id, current_sender_id, message, is_current_anon)

# ---------- helpers for _process_reply_to_anon ----------

@staticmethod
def _is_anon_message(msg: discord.Message) -> bool:
return msg.author.bot and msg.embeds and len(msg.embeds) > 0 and msg.embeds[0].title == "Anon Message"

@staticmethod
# int and str are both accepted for target_message_id for flexibility
def _find_sender_id(anon_cog: commands.Cog, target_message_id: int | str) -> str | None:
for user_id, messages in anon_cog.anon_cache.items():
if any(str(target_message_id) == msg["message_id"] for msg in messages):
return user_id
return None

async def _identify_current_sender(
self, anon_cog: commands.Cog, message: discord.Message
) -> tuple[str | None, bool]:
"""Return (sender_id, is_current_anon)."""
is_current_anon = self._is_anon_message(message) and message.author == self.client.user
if is_current_anon:
for user_id, messages in anon_cog.anon_cache.items():
if any(str(message.id) == msg["message_id"] for msg in messages):
return user_id, True
return None, True
return str(message.author.id), False

async def _get_subscription_status(self, user_id: int) -> bool:
"""
Return whether a user is currently subscribed to anon notifications.
Defaults to True if no record exists.
"""
record = await self.client.link_collection.find_one({"userId": str(user_id)})
return record.get("anon_notifications", True) if record else True

async def _notify_original_sender(
self,
original_sender_id: str,
current_sender_id: str,
message: discord.Message,
is_current_anon: bool,
) -> None:
"""Build embed, button, and DM the original anon sender."""
try:
original_sender = await self.client.fetch_user(int(original_sender_id))
except (discord.NotFound, discord.Forbidden, discord.HTTPException):
return
if not original_sender:
return

link_record = await self.client.link_collection.find_one({"userId": str(original_sender.id)})
if link_record and link_record.get("anon_notifications", True) is False:
return

reply_type = "anon user" if is_current_anon else message.author.display_name

embed = discord.Embed(
title="Reply to Your Anon Message",
description=f"An {reply_type} replied to your anon message"
if is_current_anon
else f"{reply_type} replied to your anon message",
color=discord.Color.blue(),
)
embed.add_field(
name="Jump to Reply",
value=f"[Click here to view the reply]({message.jump_url})",
inline=False,
)
embed.set_footer(text="PESU Bot")
embed.timestamp = discord.utils.utcnow()

is_subscribed = await self._get_subscription_status(original_sender.id)

view = self._make_toggle_view(original_sender.id, is_subscribed)

try:
await original_sender.send(embed=embed, view=view)
except (discord.Forbidden, discord.HTTPException, discord.NotFound):
pass

def _make_toggle_view(self, user_id: int, is_subscribed: bool) -> discord.ui.View:
"""Create the subscribe/unsubscribe button view."""
view = discord.ui.View()
toggle_button = discord.ui.Button(
label="Unsubscribe from notifications" if is_subscribed else "Subscribe to notifications",
style=discord.ButtonStyle.secondary if is_subscribed else discord.ButtonStyle.primary,
custom_id=f"toggle_anon_notifications_{user_id}",
)

async def toggle_callback(interaction: discord.Interaction) -> None:
if interaction.user.id != user_id:
await interaction.response.send_message("You can't toggle someone else's subscription.", ephemeral=True)
return

# current_record = await self.client.link_collection.find_one({"userId": str(user_id)})
currently_subscribed = await self._get_subscription_status(user_id)

new_status = not currently_subscribed
await self.client.link_collection.update_one(
{"userId": str(user_id)},
{"$set": {"anon_notifications": new_status}},
upsert=True,
)

if new_status:
await interaction.response.send_message(
"βœ… You have been subscribed to anon reply notifications.", ephemeral=True
)
else:
await interaction.response.send_message(
"❌ You have been unsubscribed from anon reply notifications.", ephemeral=True
)

toggle_button.callback = toggle_callback
view.add_item(toggle_button)
return view

async def _maybe_handle_ec_campus_keyword(self, message: discord.Message) -> None:
"""
Handle the EC Campus keyword check (20% random chance in prod) extracted out to reduce complexity.
"""
if os.getenv("APP_ENV") != "prod":
return

if random.random() > 0.2:
return

# Only check text content
content = (message.content or "").lower()
if not content:
return

if os.getenv("APP_ENV") == "prod" and random.random() <= 0.2: # 20% chance and prod deployment
# Special EC Campus keyword patterns. Only check for words, not internal matches
patterns = [r"\becc\b", r"\bec campus\b", r"\bec\b"]
# Normalize message content to handle case insensitive matches
content = message.content.lower()
# Check for matches
if any(re.search(pattern, content) for pattern in patterns):
gif_url = "https://tenor.com/view/pes-pes-college-pesu-pes-univercity-pes-rr-gif-26661455"
reply_text = "Did someone mention EC Campus? πŸ‘€"
async with message.channel.typing():
await asyncio.sleep(1)
await message.reply(reply_text)
await message.channel.send(gif_url)
patterns = [r"\becc\b", r"\bec campus\b", r"\bec\b"]
if any(re.search(pattern, content) for pattern in patterns):
gif_url = "https://tenor.com/view/pes-pes-college-pesu-pes-univercity-pes-rr-gif-26661455"
reply_text = "Did someone mention EC Campus? πŸ‘€"
async with message.channel.typing():
await asyncio.sleep(1)
await message.reply(reply_text)
await message.channel.send(gif_url)

@commands.Cog.listener()
async def on_message_delete(self, message: discord.Message) -> None:
Expand Down