Skip to content

Commit

Permalink
💥 Replace load_all_speaker_phone
Browse files Browse the repository at this point in the history
  • Loading branch information
H1rono committed Feb 5, 2025
1 parent 62ec6db commit 968e934
Showing 1 changed file with 11 additions and 155 deletions.
166 changes: 11 additions & 155 deletions server/src/speaker_phone/impl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use futures::{FutureExt, TryStreamExt};
use futures::FutureExt;
use serde::{Deserialize, Serialize};
use sqlx::{FromRow, MySqlPool};

Expand All @@ -13,9 +13,10 @@ where
Context: AsRef<MySqlPool>
+ AsRef<crate::task::TaskManager>
+ crate::event::ProvideEventService
+ crate::traq::channel::ProvideTraqChannelService
+ crate::traq::message::ProvideTraqMessageService
+ crate::traq::bot::ProvideTraqBotService
+ crate::traq::user::ProvideTraqUserService
+ crate::traq::message::ProvideTraqMessageService
+ crate::traq::channel::ProvideTraqChannelService
+ crate::world::ProvideWorldService,
{
type Error = super::Error;
Expand Down Expand Up @@ -208,168 +209,23 @@ async fn load_all_speaker_phones<Context>(
where
Context: AsRef<MySqlPool>
+ AsRef<crate::task::TaskManager>
+ crate::traq::channel::ProvideTraqChannelService
+ crate::traq::message::ProvideTraqMessageService
+ crate::traq::bot::ProvideTraqBotService
+ crate::traq::user::ProvideTraqUserService
+ crate::traq::message::ProvideTraqMessageService
+ crate::traq::channel::ProvideTraqChannelService
+ crate::event::ProvideEventService,
{
let pool: &MySqlPool = (*ctx).as_ref();

let speaker_phones: Vec<super::SpeakerPhone> =
let speaker_phones: Vec<_> =
sqlx::query_as::<_, SpeakerPhoneRow>(r#"SELECT * FROM `speaker_phones`"#)
.fetch_all(pool)
.await?
.into_iter()
.map(Into::into)
.collect::<Vec<super::SpeakerPhone>>();

let channels = ctx
.get_all_channels(crate::traq::channel::GetAllChannelsParams {})
.await
.map_err(IntoStatus::into_status)?;

let channel_map: std::collections::HashMap<
super::SpeakerPhoneId,
crate::traq::channel::TraqChannel,
> = speaker_phones
.iter()
.filter_map(|speaker_phone| {
channels
.iter()
.find(|channel| channel.path == speaker_phone.name.0)
.map(|channel| (speaker_phone.id, channel.clone()))
})
.collect();

let ctx_clone = ctx.clone();
let task_manager: &crate::task::TaskManager = (*ctx_clone).as_ref();
task_manager
.spawn(|_cancellation_token| async move {
let traq_user_service = &*ctx;
let traq_message_service = &*ctx;
let event_service = &*ctx;
run_subscription_loop(
traq_user_service,
traq_message_service,
event_service,
speaker_phones,
channel_map,
)
.await;
})
.await;

Ok(())
}

async fn run_subscription_loop(
traq_user_service: &impl crate::traq::user::ProvideTraqUserService,
traq_message_service: &impl crate::traq::message::ProvideTraqMessageService,
event_service: &impl crate::event::ProvideEventService,
mut speaker_phones: Vec<super::SpeakerPhone>,
channel_map: HashMap<super::SpeakerPhoneId, crate::traq::channel::TraqChannel>,
) {
let mut speaker_phone_rx = event_service
.subscribe_speaker_phones()
.map_err(|e| super::Error::from(e.into_status()));
let mut message_rx = event_service
.subscribe_messages()
.map_err(|e| super::Error::from(e.into_status()));
.collect();

loop {
let channel_map = channel_map.clone();
tokio::select! {
speaker_phone = speaker_phone_rx.try_next() => {
let speaker_phone = match speaker_phone {
Ok(Some(speaker_phone)) => speaker_phone,
Ok(None) => break,
Err(err) => {
tracing::error!(error = %err, "Failed to receive a speaker phone");
continue;
}
};

speaker_phones.push(speaker_phone);
}

message = message_rx.try_next() => {
let message = match message {
Ok(Some(msg)) => msg,
Ok(None) => break,
Err(err) => {
tracing::error!(error = %err, "Failed to receive a message");
continue;
}
};

for speaker_phone in &speaker_phones {
post_message_to_traq(
traq_user_service,
traq_message_service,
speaker_phone,
&channel_map,
&message,
).await;
}
}
}
}
}

async fn post_message_to_traq(
traq_user_service: &impl crate::traq::user::ProvideTraqUserService,
traq_message_service: &impl crate::traq::message::ProvideTraqMessageService,
speaker_phone: &super::SpeakerPhone,
channel_map: &HashMap<super::SpeakerPhoneId, crate::traq::channel::TraqChannel>,
message: &crate::message::Message,
) {
if message.content.starts_with("[]()") {
return;
}

if !message
.position
.is_inside_circle(speaker_phone.position, speaker_phone.receive_range)
{
return;
}

let channel = channel_map
.get(&speaker_phone.id)
.expect("SpeakerPhoneのチャンネルが存在する");

let traq_user = traq_user_service
.find_traq_user_by_app_user_id(crate::traq::user::FindTraqUserByAppUserIdParams {
id: message.user_id,
})
.await
.map_err(IntoStatus::into_status);
let traq_user = match traq_user {
Ok(Some(u)) => u,
Ok(None) => {
tracing::error!("User not found");
return;
}
Err(err) => {
tracing::error!(error = %err, "Failed to find a user");
return;
}
};

let mut modified_message = message.clone();
modified_message.content = format!("[](){}", message.content);
let res = traq_message_service
.send_message(crate::traq::message::SendMessageParams {
inner: modified_message,
channel_id: channel.id,
user_id: traq_user.id,
})
.await
.map_err(IntoStatus::into_status);
match res {
Ok(_) => tracing::info!("Sent a message"),
Err(err) => tracing::error!(error = %err, "Failed to send a message"),
}
super::load::init(Arc::clone(&ctx), speaker_phones).await
}

async fn get_available_channels(
Expand Down

0 comments on commit 968e934

Please sign in to comment.