From 03f2ac9cafd9b070c36ca6fdf0fccf3d5fb8e953 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 22 Dec 2024 22:58:37 +0000 Subject: [PATCH] simplify usage of mpmc channels which don't require receiver lock Signed-off-by: Jason Volk --- src/service/admin/mod.rs | 19 +++++++++---------- src/service/presence/mod.rs | 21 +++++++++++---------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 59639e58..399055aa 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -20,14 +20,13 @@ use ruma::{ events::room::message::{Relation, RoomMessageEventContent}, OwnedEventId, OwnedRoomId, RoomId, UserId, }; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::RwLock; use crate::{account_data, globals, rooms, rooms::state::RoomMutexGuard, Dep}; pub struct Service { services: Services, - sender: Sender, - receiver: Mutex>, + channel: (Sender, Receiver), pub handle: RwLock>, pub complete: StdRwLock>, #[cfg(feature = "console")] @@ -78,7 +77,6 @@ const COMMAND_QUEUE_LIMIT: usize = 512; #[async_trait] impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { - let (sender, receiver) = loole::bounded(COMMAND_QUEUE_LIMIT); Ok(Arc::new(Self { services: Services { server: args.server.clone(), @@ -90,8 +88,7 @@ impl crate::Service for Service { account_data: args.depend::("account_data"), services: None.into(), }, - sender, - receiver: Mutex::new(receiver), + channel: loole::bounded(COMMAND_QUEUE_LIMIT), handle: RwLock::new(None), complete: StdRwLock::new(None), #[cfg(feature = "console")] @@ -100,8 +97,8 @@ impl crate::Service for Service { } async fn worker(self: Arc) -> Result<()> { - let receiver = self.receiver.lock().await; let mut signals = self.services.server.signal.subscribe(); + let receiver = self.channel.1.clone(); self.startup_execute().await?; self.console_auto_start().await; @@ -128,8 +125,9 @@ impl crate::Service for Service { #[cfg(feature = "console")] self.console.interrupt(); - if !self.sender.is_closed() { - self.sender.close(); + let (sender, _) = &self.channel; + if !sender.is_closed() { + sender.close(); } } @@ -159,7 +157,8 @@ impl Service { /// will take place on the service worker's task asynchronously. Errors if /// the queue is full. pub fn command(&self, command: String, reply_id: Option) -> Result<()> { - self.sender + self.channel + .0 .send(CommandInput { command, reply_id }) .map_err(|e| err!("Failed to enqueue admin command: {e:?}")) } diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index 1f9f63d9..bf5258e1 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -6,15 +6,15 @@ use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use conduwuit::{checked, debug, error, result::LogErr, Error, Result, Server}; use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt}; +use loole::{Receiver, Sender}; use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId}; -use tokio::{sync::Mutex, time::sleep}; +use tokio::time::sleep; use self::{data::Data, presence::Presence}; use crate::{globals, users, Dep}; pub struct Service { - timer_sender: loole::Sender, - timer_receiver: Mutex>, + timer_channel: (Sender, Receiver), timeout_remote_users: bool, idle_timeout: u64, offline_timeout: u64, @@ -36,10 +36,8 @@ impl crate::Service for Service { let config = &args.server.config; let idle_timeout_s = config.presence_idle_timeout_s; let offline_timeout_s = config.presence_offline_timeout_s; - let (timer_sender, timer_receiver) = loole::unbounded(); Ok(Arc::new(Self { - timer_sender, - timer_receiver: Mutex::new(timer_receiver), + timer_channel: loole::unbounded(), timeout_remote_users: config.presence_timeout_remote_users, idle_timeout: checked!(idle_timeout_s * 1_000)?, offline_timeout: checked!(offline_timeout_s * 1_000)?, @@ -53,8 +51,9 @@ impl crate::Service for Service { } async fn worker(self: Arc) -> Result<()> { + let receiver = self.timer_channel.1.clone(); + let mut presence_timers = FuturesUnordered::new(); - let receiver = self.timer_receiver.lock().await; while !receiver.is_closed() { tokio::select! { Some(user_id) = presence_timers.next() => { @@ -74,8 +73,9 @@ impl crate::Service for Service { } fn interrupt(&self) { - if !self.timer_sender.is_closed() { - self.timer_sender.close(); + let (timer_sender, _) = &self.timer_channel; + if !timer_sender.is_closed() { + timer_sender.close(); } } @@ -150,7 +150,8 @@ impl Service { | _ => self.services.server.config.presence_offline_timeout_s, }; - self.timer_sender + self.timer_channel + .0 .send((user_id.to_owned(), Duration::from_secs(timeout))) .map_err(|e| { error!("Failed to add presence timer: {}", e);