diff --git a/Cargo.lock b/Cargo.lock index 8b41bf35..0404f778 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,8 +95,7 @@ checksum = "5f093eed78becd229346bf859eec0aa4dd7ddde0757287b2b4107a1f09c80002" [[package]] name = "async-channel" version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +source = "git+https://github.com/jevolk/async-channel?rev=fefa543ca5eddf21237d75776fce98b7e09e924a#fefa543ca5eddf21237d75776fce98b7e09e924a" dependencies = [ "concurrent-queue", "event-listener-strategy", @@ -1259,8 +1258,7 @@ dependencies = [ [[package]] name = "event-listener" version = "5.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +source = "git+https://github.com/jevolk/event-listener?rev=96d7e0fc026d8f708b19bc9267a382676a50354c#96d7e0fc026d8f708b19bc9267a382676a50354c" dependencies = [ "concurrent-queue", "parking", diff --git a/Cargo.toml b/Cargo.toml index ea153fda..36f6c1ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -529,6 +529,14 @@ rev = "ccc4fbd8238c2d5ba354e61ec17ac610af11401d" git = "https://github.com/girlbossceo/rustyline-async" rev = "deaeb0694e2083f53d363b648da06e10fc13900c" +# adds LIFO queue scheduling; this should be updated with PR progress. +[patch.crates-io.event-listener] +git = "https://github.com/jevolk/event-listener" +rev = "96d7e0fc026d8f708b19bc9267a382676a50354c" +[patch.crates-io.async-channel] +git = "https://github.com/jevolk/async-channel" +rev = "fefa543ca5eddf21237d75776fce98b7e09e924a" + # # Our crates # diff --git a/src/database/pool.rs b/src/database/pool.rs index 51e705ce..1c55c456 100644 --- a/src/database/pool.rs +++ b/src/database/pool.rs @@ -8,7 +8,7 @@ use std::{ }, }; -use async_channel::{Receiver, RecvError, Sender}; +use async_channel::{Receiver, RecvError, Sched, Sender}; use conduwuit::{ debug, debug_warn, defer, err, implement, result::DebugInspect, @@ -65,9 +65,14 @@ const QUEUE_LIMIT: (usize, usize) = (1, 2048); #[implement(Pool)] pub(crate) async fn new(server: &Arc) -> Result> { + const CHAN_SCHED: (Sched, Sched) = (Sched::Fifo, Sched::Lifo); + let (total_workers, queue_sizes, topology) = configure(server); - let (senders, receivers) = queue_sizes.into_iter().map(async_channel::bounded).unzip(); + let (senders, receivers) = queue_sizes + .into_iter() + .map(|cap| async_channel::bounded_with_sched(cap, CHAN_SCHED)) + .unzip(); let pool = Arc::new(Self { server: server.clone(),