simplify usage of mpmc channels which don't require receiver lock

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-12-22 22:58:37 +00:00 committed by strawberry
parent 5fdb8895b1
commit 03f2ac9caf
2 changed files with 20 additions and 20 deletions

View file

@ -20,14 +20,13 @@ use ruma::{
events::room::message::{Relation, RoomMessageEventContent}, events::room::message::{Relation, RoomMessageEventContent},
OwnedEventId, OwnedRoomId, RoomId, UserId, OwnedEventId, OwnedRoomId, RoomId, UserId,
}; };
use tokio::sync::{Mutex, RwLock}; use tokio::sync::RwLock;
use crate::{account_data, globals, rooms, rooms::state::RoomMutexGuard, Dep}; use crate::{account_data, globals, rooms, rooms::state::RoomMutexGuard, Dep};
pub struct Service { pub struct Service {
services: Services, services: Services,
sender: Sender<CommandInput>, channel: (Sender<CommandInput>, Receiver<CommandInput>),
receiver: Mutex<Receiver<CommandInput>>,
pub handle: RwLock<Option<Processor>>, pub handle: RwLock<Option<Processor>>,
pub complete: StdRwLock<Option<Completer>>, pub complete: StdRwLock<Option<Completer>>,
#[cfg(feature = "console")] #[cfg(feature = "console")]
@ -78,7 +77,6 @@ const COMMAND_QUEUE_LIMIT: usize = 512;
#[async_trait] #[async_trait]
impl crate::Service for Service { impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> { fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let (sender, receiver) = loole::bounded(COMMAND_QUEUE_LIMIT);
Ok(Arc::new(Self { Ok(Arc::new(Self {
services: Services { services: Services {
server: args.server.clone(), server: args.server.clone(),
@ -90,8 +88,7 @@ impl crate::Service for Service {
account_data: args.depend::<account_data::Service>("account_data"), account_data: args.depend::<account_data::Service>("account_data"),
services: None.into(), services: None.into(),
}, },
sender, channel: loole::bounded(COMMAND_QUEUE_LIMIT),
receiver: Mutex::new(receiver),
handle: RwLock::new(None), handle: RwLock::new(None),
complete: StdRwLock::new(None), complete: StdRwLock::new(None),
#[cfg(feature = "console")] #[cfg(feature = "console")]
@ -100,8 +97,8 @@ impl crate::Service for Service {
} }
async fn worker(self: Arc<Self>) -> Result<()> { async fn worker(self: Arc<Self>) -> Result<()> {
let receiver = self.receiver.lock().await;
let mut signals = self.services.server.signal.subscribe(); let mut signals = self.services.server.signal.subscribe();
let receiver = self.channel.1.clone();
self.startup_execute().await?; self.startup_execute().await?;
self.console_auto_start().await; self.console_auto_start().await;
@ -128,8 +125,9 @@ impl crate::Service for Service {
#[cfg(feature = "console")] #[cfg(feature = "console")]
self.console.interrupt(); self.console.interrupt();
if !self.sender.is_closed() { let (sender, _) = &self.channel;
self.sender.close(); if !sender.is_closed() {
sender.close();
} }
} }
@ -159,7 +157,8 @@ impl Service {
/// will take place on the service worker's task asynchronously. Errors if /// will take place on the service worker's task asynchronously. Errors if
/// the queue is full. /// the queue is full.
pub fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result<()> { pub fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result<()> {
self.sender self.channel
.0
.send(CommandInput { command, reply_id }) .send(CommandInput { command, reply_id })
.map_err(|e| err!("Failed to enqueue admin command: {e:?}")) .map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
} }

View file

@ -6,15 +6,15 @@ use std::{sync::Arc, time::Duration};
use async_trait::async_trait; use async_trait::async_trait;
use conduwuit::{checked, debug, error, result::LogErr, Error, Result, Server}; use conduwuit::{checked, debug, error, result::LogErr, Error, Result, Server};
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt}; use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
use loole::{Receiver, Sender};
use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId}; use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId};
use tokio::{sync::Mutex, time::sleep}; use tokio::time::sleep;
use self::{data::Data, presence::Presence}; use self::{data::Data, presence::Presence};
use crate::{globals, users, Dep}; use crate::{globals, users, Dep};
pub struct Service { pub struct Service {
timer_sender: loole::Sender<TimerType>, timer_channel: (Sender<TimerType>, Receiver<TimerType>),
timer_receiver: Mutex<loole::Receiver<TimerType>>,
timeout_remote_users: bool, timeout_remote_users: bool,
idle_timeout: u64, idle_timeout: u64,
offline_timeout: u64, offline_timeout: u64,
@ -36,10 +36,8 @@ impl crate::Service for Service {
let config = &args.server.config; let config = &args.server.config;
let idle_timeout_s = config.presence_idle_timeout_s; let idle_timeout_s = config.presence_idle_timeout_s;
let offline_timeout_s = config.presence_offline_timeout_s; let offline_timeout_s = config.presence_offline_timeout_s;
let (timer_sender, timer_receiver) = loole::unbounded();
Ok(Arc::new(Self { Ok(Arc::new(Self {
timer_sender, timer_channel: loole::unbounded(),
timer_receiver: Mutex::new(timer_receiver),
timeout_remote_users: config.presence_timeout_remote_users, timeout_remote_users: config.presence_timeout_remote_users,
idle_timeout: checked!(idle_timeout_s * 1_000)?, idle_timeout: checked!(idle_timeout_s * 1_000)?,
offline_timeout: checked!(offline_timeout_s * 1_000)?, offline_timeout: checked!(offline_timeout_s * 1_000)?,
@ -53,8 +51,9 @@ impl crate::Service for Service {
} }
async fn worker(self: Arc<Self>) -> Result<()> { async fn worker(self: Arc<Self>) -> Result<()> {
let receiver = self.timer_channel.1.clone();
let mut presence_timers = FuturesUnordered::new(); let mut presence_timers = FuturesUnordered::new();
let receiver = self.timer_receiver.lock().await;
while !receiver.is_closed() { while !receiver.is_closed() {
tokio::select! { tokio::select! {
Some(user_id) = presence_timers.next() => { Some(user_id) = presence_timers.next() => {
@ -74,8 +73,9 @@ impl crate::Service for Service {
} }
fn interrupt(&self) { fn interrupt(&self) {
if !self.timer_sender.is_closed() { let (timer_sender, _) = &self.timer_channel;
self.timer_sender.close(); if !timer_sender.is_closed() {
timer_sender.close();
} }
} }
@ -150,7 +150,8 @@ impl Service {
| _ => self.services.server.config.presence_offline_timeout_s, | _ => self.services.server.config.presence_offline_timeout_s,
}; };
self.timer_sender self.timer_channel
.0
.send((user_id.to_owned(), Duration::from_secs(timeout))) .send((user_id.to_owned(), Duration::from_secs(timeout)))
.map_err(|e| { .map_err(|e| {
error!("Failed to add presence timer: {}", e); error!("Failed to add presence timer: {}", e);