replace tokio channels with loole (#256)

* rewrite admin handler to use loole channels

* apply correct formatting

* move all other services to loole channels

* fix ci
This commit is contained in:
raizo 2024-04-07 18:33:07 +02:00 committed by June
parent c82c548cbf
commit 579d3ce865
5 changed files with 110 additions and 84 deletions

7
Cargo.lock generated
View file

@ -484,6 +484,7 @@ dependencies = [
"itertools 0.12.1", "itertools 0.12.1",
"jsonwebtoken", "jsonwebtoken",
"log", "log",
"loole",
"lru-cache", "lru-cache",
"nix", "nix",
"num_cpus", "num_cpus",
@ -1479,6 +1480,12 @@ version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
[[package]]
name = "loole"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6725f0feab07fcf90f6de5417c06d7fef976fa6e5912fa9e21cb5e4dc6ae5da"
[[package]] [[package]]
name = "lru-cache" name = "lru-cache"
version = "0.1.2" version = "0.1.2"

View file

@ -70,6 +70,9 @@ cyborgtime = "2.1.1"
bytes = "1.6.0" bytes = "1.6.0"
http = "0.2.12" http = "0.2.12"
# used to replace the channels of the tokio runtime
loole = "0.3.0"
# standard date and time tools # standard date and time tools
[dependencies.chrono] [dependencies.chrono]
version = "0.4.37" version = "0.4.37"

View file

@ -23,7 +23,7 @@ use ruma::{
EventId, MxcUri, OwnedRoomAliasId, OwnedRoomId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, EventId, MxcUri, OwnedRoomAliasId, OwnedRoomId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
}; };
use serde_json::value::to_raw_value; use serde_json::value::to_raw_value;
use tokio::sync::{mpsc, Mutex}; use tokio::sync::Mutex;
use tracing::{error, warn}; use tracing::{error, warn};
use super::pdu::PduBuilder; use super::pdu::PduBuilder;
@ -91,13 +91,13 @@ pub enum AdminRoomEvent {
} }
pub struct Service { pub struct Service {
pub sender: mpsc::UnboundedSender<AdminRoomEvent>, pub sender: loole::Sender<AdminRoomEvent>,
receiver: Mutex<mpsc::UnboundedReceiver<AdminRoomEvent>>, receiver: Mutex<loole::Receiver<AdminRoomEvent>>,
} }
impl Service { impl Service {
pub fn build() -> Arc<Self> { pub fn build() -> Arc<Self> {
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = loole::unbounded();
Arc::new(Self { Arc::new(Self {
sender, sender,
receiver: Mutex::new(receiver), receiver: Mutex::new(receiver),
@ -115,7 +115,7 @@ impl Service {
} }
async fn handler(&self) -> Result<()> { async fn handler(&self) -> Result<()> {
let mut receiver = self.receiver.lock().await; let receiver = self.receiver.lock().await;
// TODO: Use futures when we have long admin commands // TODO: Use futures when we have long admin commands
//let mut futures = FuturesUnordered::new(); //let mut futures = FuturesUnordered::new();
@ -125,7 +125,9 @@ impl Service {
if let Ok(Some(conduit_room)) = Self::get_admin_room() { if let Ok(Some(conduit_room)) = Self::get_admin_room() {
loop { loop {
tokio::select! { tokio::select! {
Some(event) = receiver.recv() => { event = receiver.recv_async() => {
match event {
Ok(event) => {
let (mut message_content, reply) = match event { let (mut message_content, reply) = match event {
AdminRoomEvent::SendMessage(content) => (content, None), AdminRoomEvent::SendMessage(content) => (content, None),
AdminRoomEvent::ProcessMessage(room_message, reply_id) => { AdminRoomEvent::ProcessMessage(room_message, reply_id) => {
@ -179,10 +181,17 @@ impl Service {
&state_lock) &state_lock)
.await?; .await?;
} }
drop(state_lock); drop(state_lock);
} }
Err(_) => {
// TODO: Handle error, Im too unfamiliar with the codebase to know what to do here
// recv_async returns an error if all senders have been dropped. If the channel is empty, the returned future will yield to the async runtime.
}
}
}
} }
} }
} }

View file

@ -10,10 +10,7 @@ use ruma::{
OwnedUserId, UInt, UserId, OwnedUserId, UInt, UserId,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{ use tokio::{sync::Mutex, time::sleep};
sync::{mpsc, Mutex},
time::sleep,
};
use tracing::{debug, error}; use tracing::{debug, error};
use crate::{services, utils, Config, Error, Result}; use crate::{services, utils, Config, Error, Result};
@ -71,14 +68,14 @@ impl Presence {
pub struct Service { pub struct Service {
pub db: &'static dyn Data, pub db: &'static dyn Data,
pub timer_sender: mpsc::UnboundedSender<(OwnedUserId, Duration)>, pub timer_sender: loole::Sender<(OwnedUserId, Duration)>,
timer_receiver: Mutex<mpsc::UnboundedReceiver<(OwnedUserId, Duration)>>, timer_receiver: Mutex<loole::Receiver<(OwnedUserId, Duration)>>,
timeout_remote_users: bool, timeout_remote_users: bool,
} }
impl Service { impl Service {
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> { pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
let (timer_sender, timer_receiver) = mpsc::unbounded_channel(); let (timer_sender, timer_receiver) = loole::unbounded();
Arc::new(Self { Arc::new(Self {
db, db,
@ -173,13 +170,22 @@ impl Service {
async fn handler(&self) -> Result<()> { async fn handler(&self) -> Result<()> {
let mut presence_timers = FuturesUnordered::new(); let mut presence_timers = FuturesUnordered::new();
let mut receiver = self.timer_receiver.lock().await; let receiver = self.timer_receiver.lock().await;
loop { loop {
tokio::select! { tokio::select! {
Some((user_id, timeout)) = receiver.recv() => { event = receiver.recv_async() => {
match event {
Ok((user_id, timeout)) => {
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len()); debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
presence_timers.push(presence_timer(user_id, timeout)); presence_timers.push(presence_timer(user_id, timeout));
} }
Err(e) => {
// TODO: Handle error better? I have no idea what to do here.
error!("Failed to receive presence timer: {}", e);
}
}
}
Some(user_id) = presence_timers.next() => { Some(user_id) = presence_timers.next() => {
process_presence_timer(&user_id)?; process_presence_timer(&user_id)?;

View file

@ -25,10 +25,7 @@ use ruma::{
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
}; };
use tokio::{ use tokio::sync::{Mutex, Semaphore};
select,
sync::{mpsc, Mutex, Semaphore},
};
use tracing::{error, warn}; use tracing::{error, warn};
use crate::{services, utils::calculate_hash, Config, Error, PduEvent, Result}; use crate::{services, utils::calculate_hash, Config, Error, PduEvent, Result};
@ -43,8 +40,8 @@ pub struct Service {
/// The state for a given state hash. /// The state for a given state hash.
pub(super) maximum_requests: Arc<Semaphore>, pub(super) maximum_requests: Arc<Semaphore>,
pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>, pub sender: loole::Sender<(OutgoingKind, SendingEventType, Vec<u8>)>,
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>, receiver: Mutex<loole::Receiver<(OutgoingKind, SendingEventType, Vec<u8>)>>,
startup_netburst: bool, startup_netburst: bool,
startup_netburst_keep: i64, startup_netburst_keep: i64,
timeout: u64, timeout: u64,
@ -73,7 +70,7 @@ enum TransactionStatus {
impl Service { impl Service {
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> { pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = loole::unbounded();
Arc::new(Self { Arc::new(Self {
db, db,
sender, sender,
@ -275,7 +272,7 @@ impl Service {
#[tracing::instrument(skip(self), name = "sender")] #[tracing::instrument(skip(self), name = "sender")]
async fn handler(&self) -> Result<()> { async fn handler(&self) -> Result<()> {
let mut receiver = self.receiver.lock().await; let receiver = self.receiver.lock().await;
let mut futures = FuturesUnordered::new(); let mut futures = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new(); let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new();
@ -306,7 +303,7 @@ impl Service {
} }
loop { loop {
select! { tokio::select! {
Some(response) = futures.next() => { Some(response) = futures.next() => {
match response { match response {
Ok(outgoing_kind) => { Ok(outgoing_kind) => {
@ -343,7 +340,10 @@ impl Service {
} }
}; };
}, },
Some((outgoing_kind, event, key)) = receiver.recv() => {
event = receiver.recv_async() => {
// TODO: Error handling for this
if let Ok((outgoing_kind, event, key)) = event {
if let Ok(Some(events)) = self.select_events( if let Ok(Some(events)) = self.select_events(
&outgoing_kind, &outgoing_kind,
vec![(event, key)], vec![(event, key)],
@ -355,6 +355,7 @@ impl Service {
} }
} }
} }
}
#[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))] #[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))]
fn select_events( fn select_events(