Revert "dont use loole for sending channel code"

This reverts commit d0a9666a29.
This commit is contained in:
strawberry 2024-04-17 15:16:01 -04:00
parent 002799177d
commit 7ecc570bb8

View file

@ -25,7 +25,7 @@ use ruma::{
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
}; };
use tokio::sync::{mpsc, Mutex, Semaphore}; use tokio::sync::{Mutex, Semaphore};
use tracing::{error, warn}; use tracing::{error, warn};
use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result}; use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result};
@ -42,8 +42,8 @@ pub struct Service {
/// The state for a given state hash. /// The state for a given state hash.
pub(super) maximum_requests: Arc<Semaphore>, pub(super) maximum_requests: Arc<Semaphore>,
pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>, pub sender: loole::Sender<(OutgoingKind, SendingEventType, Vec<u8>)>,
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>, receiver: Mutex<loole::Receiver<(OutgoingKind, SendingEventType, Vec<u8>)>>,
startup_netburst: bool, startup_netburst: bool,
startup_netburst_keep: i64, startup_netburst_keep: i64,
timeout: u64, timeout: u64,
@ -72,7 +72,7 @@ enum TransactionStatus {
impl Service { impl Service {
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> { pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = loole::unbounded();
Arc::new(Self { Arc::new(Self {
db, db,
sender, sender,
@ -274,7 +274,7 @@ impl Service {
#[tracing::instrument(skip(self), name = "sender")] #[tracing::instrument(skip(self), name = "sender")]
async fn handler(&self) -> Result<()> { async fn handler(&self) -> Result<()> {
let mut receiver = self.receiver.lock().await; let receiver = self.receiver.lock().await;
let mut futures = FuturesUnordered::new(); let mut futures = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new(); let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new();
@ -342,7 +342,9 @@ impl Service {
} }
}; };
}, },
Some((outgoing_kind, event, key)) = receiver.recv() => {
event = receiver.recv_async() => {
if let Ok((outgoing_kind, event, key)) = event {
if let Ok(Some(events)) = self.select_events( if let Ok(Some(events)) = self.select_events(
&outgoing_kind, &outgoing_kind,
vec![(event, key)], vec![(event, key)],
@ -354,6 +356,7 @@ impl Service {
} }
} }
} }
}
#[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))] #[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))]
fn select_events( fn select_events(