diff --git a/src/service/sending/data.rs b/src/service/sending/data.rs index 65725618..9cb1c267 100644 --- a/src/service/sending/data.rs +++ b/src/service/sending/data.rs @@ -14,7 +14,7 @@ pub struct Data { servercurrentevent_data: Arc, servernameevent_data: Arc, servername_educount: Arc, - _db: Arc, + pub(super) db: Arc, } impl Data { @@ -23,7 +23,7 @@ impl Data { servercurrentevent_data: db["servercurrentevent_data"].clone(), servernameevent_data: db["servernameevent_data"].clone(), servername_educount: db["servername_educount"].clone(), - _db: db, + db, } } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 1eacca77..a6c3411f 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -3,9 +3,9 @@ mod data; mod send; mod sender; -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; -use conduit::{err, Result}; +use conduit::{err, Result, Server}; use ruma::{ api::{appservice::Registration, OutgoingRequest}, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, @@ -18,12 +18,11 @@ use crate::{server_is_ours, services}; pub struct Service { pub db: data::Data, + server: Arc, /// The state for a given state hash. sender: loole::Sender, receiver: Mutex>, - startup_netburst: bool, - startup_netburst_keep: i64, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -53,7 +52,7 @@ impl Service { pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { let dest = Destination::Push(user.to_owned(), pushkey); let event = SendingEvent::Pdu(pdu_id.to_owned()); - let _cork = services().db.cork(); + let _cork = self.db.db.cork(); let keys = self.db.queue_requests(&[(&dest, event.clone())])?; self.dispatch(Msg { dest, @@ -66,7 +65,7 @@ impl Service { pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec) -> Result<()> { let dest = Destination::Appservice(appservice_id); let event = SendingEvent::Pdu(pdu_id); - let _cork = services().db.cork(); + let _cork = self.db.db.cork(); let keys = self.db.queue_requests(&[(&dest, event.clone())])?; self.dispatch(Msg { dest, @@ -93,7 +92,7 @@ impl Service { .into_iter() .map(|server| (Destination::Normal(server), SendingEvent::Pdu(pdu_id.to_owned()))) .collect::>(); - let _cork = services().db.cork(); + let _cork = self.db.db.cork(); let keys = self.db.queue_requests( &requests .iter() @@ -115,7 +114,7 @@ impl Service { pub fn send_edu_server(&self, server: &ServerName, serialized: Vec) -> Result<()> { let dest = Destination::Normal(server.to_owned()); let event = SendingEvent::Edu(serialized); - let _cork = services().db.cork(); + let _cork = self.db.db.cork(); let keys = self.db.queue_requests(&[(&dest, event.clone())])?; self.dispatch(Msg { dest, @@ -142,7 +141,7 @@ impl Service { .into_iter() .map(|server| (Destination::Normal(server), SendingEvent::Edu(serialized.clone()))) .collect::>(); - let _cork = services().db.cork(); + let _cork = self.db.db.cork(); let keys = self.db.queue_requests( &requests .iter() diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index e6b68e9e..a924ce55 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -49,14 +49,12 @@ const CLEANUP_TIMEOUT_MS: u64 = 3500; #[async_trait] impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { - let config = &args.server.config; let (sender, receiver) = loole::unbounded(); Ok(Arc::new(Self { db: Data::new(args.db.clone()), + server: args.server.clone(), sender, receiver: Mutex::new(receiver), - startup_netburst: config.startup_netburst, - startup_netburst_keep: config.startup_netburst_keep, })) } @@ -119,7 +117,7 @@ impl Service { fn handle_response_ok( &self, dest: &Destination, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus, ) { - let _cork = services().db.cork(); + let _cork = self.db.db.cork(); self.db .delete_all_active_requests_for(dest) .expect("all active requests deleted"); @@ -174,11 +172,11 @@ impl Service { } fn initial_requests(&self, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus) { - let keep = usize::try_from(self.startup_netburst_keep).unwrap_or(usize::MAX); + let keep = usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX); let mut txns = HashMap::>::new(); for (key, dest, event) in self.db.active_requests().filter_map(Result::ok) { let entry = txns.entry(dest.clone()).or_default(); - if self.startup_netburst_keep >= 0 && entry.len() >= keep { + if self.server.config.startup_netburst_keep >= 0 && entry.len() >= keep { warn!("Dropping unsent event {:?} {:?}", dest, String::from_utf8_lossy(&key)); self.db .delete_active_request(&key) @@ -189,7 +187,7 @@ impl Service { } for (dest, events) in txns { - if self.startup_netburst && !events.is_empty() { + if self.server.config.startup_netburst && !events.is_empty() { statuses.insert(dest.clone(), TransactionStatus::Running); futures.push(Box::pin(send_events(dest.clone(), events))); } @@ -210,7 +208,7 @@ impl Service { return Ok(None); } - let _cork = services().db.cork(); + let _cork = self.db.db.cork(); let mut events = Vec::new(); // Must retry any previous transaction for this remote. @@ -224,7 +222,7 @@ impl Service { } // Compose the next transaction - let _cork = services().db.cork(); + let _cork = self.db.db.cork(); if !new_events.is_empty() { self.db.mark_as_active(&new_events)?; for (e, _) in new_events { @@ -251,8 +249,8 @@ impl Service { .and_modify(|e| match e { TransactionStatus::Failed(tries, time) => { // Fail if a request has failed recently (exponential backoff) - let min = services().globals.config.sender_timeout; - let max = services().globals.config.sender_retry_backoff_limit; + let min = self.server.config.sender_timeout; + let max = self.server.config.sender_retry_backoff_limit; if continue_exponential_backoff_secs(min, max, time.elapsed(), *tries) { allow = false; } else { @@ -288,7 +286,7 @@ impl Service { .filter(|user_id| user_is_local(user_id)), ); - if services().globals.allow_outgoing_read_receipts() + if self.server.config.allow_outgoing_read_receipts && !select_edus_receipts(&room_id, since, &mut max_edu_count, &mut events)? { break; @@ -311,7 +309,7 @@ impl Service { events.push(serde_json::to_vec(&edu).expect("json can be serialized")); } - if services().globals.allow_outgoing_presence() { + if self.server.config.allow_outgoing_presence { select_edus_presence(server_name, since, &mut max_edu_count, &mut events)?; } @@ -617,7 +615,7 @@ async fn send_events_dest_normal( &services().client.sender, server, send_transaction_message::v1::Request { - origin: services().globals.server_name().to_owned(), + origin: services().server.config.server_name.clone(), pdus: pdu_jsons, edus: edu_jsons, origin_server_ts: MilliSecondsSinceUnixEpoch::now(), diff --git a/src/service/services.rs b/src/service/services.rs index 03f3a9ba..d0f74e13 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -52,9 +52,9 @@ impl Services { } Ok(Self { + globals: build!(globals::Service), resolver: build!(resolver::Service), client: build!(client::Service), - globals: build!(globals::Service), rooms: rooms::Service { alias: build!(rooms::alias::Service), auth_chain: build!(rooms::auth_chain::Service),