diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 31f8e73b..7c699e95 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -1,5 +1,6 @@ use std::{collections::BTreeMap, net::IpAddr, time::Instant}; +use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduit::{debug, debug_warn, err, trace, warn, Err}; use ruma::{ @@ -21,7 +22,7 @@ use tokio::sync::RwLock; use crate::{ service::rooms::event_handler::parse_incoming_pdu, - services, + services::Services, utils::{self}, Error, Result, Ruma, }; @@ -33,7 +34,8 @@ type ResolvedMap = BTreeMap>; /// Push EDUs and PDUs to this server. #[tracing::instrument(skip_all, fields(%client), name = "send")] pub(crate) async fn send_transaction_message_route( - InsecureClientIp(client): InsecureClientIp, body: Ruma, + State(services): State<&Services>, InsecureClientIp(client): InsecureClientIp, + body: Ruma, ) -> Result { let origin = body.origin.as_ref().expect("server is authenticated"); @@ -61,8 +63,8 @@ pub(crate) async fn send_transaction_message_route( "Starting txn", ); - let resolved_map = handle_pdus(&client, &body, origin, &txn_start_time).await?; - handle_edus(&client, &body, origin).await?; + let resolved_map = handle_pdus(services, &client, &body, origin, &txn_start_time).await?; + handle_edus(services, &client, &body, origin).await?; debug!( pdus = ?body.pdus.len(), @@ -82,7 +84,8 @@ pub(crate) async fn send_transaction_message_route( } async fn handle_pdus( - _client: &IpAddr, body: &Ruma, origin: &ServerName, txn_start_time: &Instant, + services: &Services, _client: &IpAddr, body: &Ruma, origin: &ServerName, + txn_start_time: &Instant, ) -> Result { let mut parsed_pdus = Vec::with_capacity(body.pdus.len()); for pdu in &body.pdus { @@ -102,7 +105,7 @@ async fn handle_pdus( // corresponding signing keys let pub_key_map = RwLock::new(BTreeMap::new()); if !parsed_pdus.is_empty() { - services() + services .rooms .event_handler .fetch_required_signing_keys(parsed_pdus.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map) @@ -118,7 +121,7 @@ async fn handle_pdus( let mut resolved_map = BTreeMap::new(); for (event_id, value, room_id) in parsed_pdus { let pdu_start_time = Instant::now(); - let mutex_lock = services() + let mutex_lock = services .rooms .event_handler .mutex_federation @@ -126,7 +129,7 @@ async fn handle_pdus( .await; resolved_map.insert( event_id.clone(), - services() + services .rooms .event_handler .handle_incoming_pdu(origin, &room_id, &event_id, value, true, &pub_key_map) @@ -154,7 +157,7 @@ async fn handle_pdus( } async fn handle_edus( - client: &IpAddr, body: &Ruma, origin: &ServerName, + services: &Services, client: &IpAddr, body: &Ruma, origin: &ServerName, ) -> Result<()> { for edu in body .edus @@ -162,12 +165,12 @@ async fn handle_edus( .filter_map(|edu| serde_json::from_str::(edu.json().get()).ok()) { match edu { - Edu::Presence(presence) => handle_edu_presence(client, origin, presence).await?, - Edu::Receipt(receipt) => handle_edu_receipt(client, origin, receipt).await?, - Edu::Typing(typing) => handle_edu_typing(client, origin, typing).await?, - Edu::DeviceListUpdate(content) => handle_edu_device_list_update(client, origin, content).await?, - Edu::DirectToDevice(content) => handle_edu_direct_to_device(client, origin, content).await?, - Edu::SigningKeyUpdate(content) => handle_edu_signing_key_update(client, origin, content).await?, + Edu::Presence(presence) => handle_edu_presence(services, client, origin, presence).await?, + Edu::Receipt(receipt) => handle_edu_receipt(services, client, origin, receipt).await?, + Edu::Typing(typing) => handle_edu_typing(services, client, origin, typing).await?, + Edu::DeviceListUpdate(content) => handle_edu_device_list_update(services, client, origin, content).await?, + Edu::DirectToDevice(content) => handle_edu_direct_to_device(services, client, origin, content).await?, + Edu::SigningKeyUpdate(content) => handle_edu_signing_key_update(services, client, origin, content).await?, Edu::_Custom(ref _custom) => { debug_warn!(?body.edus, "received custom/unknown EDU"); }, @@ -177,8 +180,10 @@ async fn handle_edus( Ok(()) } -async fn handle_edu_presence(_client: &IpAddr, origin: &ServerName, presence: PresenceContent) -> Result<()> { - if !services().globals.allow_incoming_presence() { +async fn handle_edu_presence( + services: &Services, _client: &IpAddr, origin: &ServerName, presence: PresenceContent, +) -> Result<()> { + if !services.globals.allow_incoming_presence() { return Ok(()); } @@ -191,7 +196,7 @@ async fn handle_edu_presence(_client: &IpAddr, origin: &ServerName, presence: Pr continue; } - services().presence.set_presence( + services.presence.set_presence( &update.user_id, &update.presence, Some(update.currently_active), @@ -203,13 +208,15 @@ async fn handle_edu_presence(_client: &IpAddr, origin: &ServerName, presence: Pr Ok(()) } -async fn handle_edu_receipt(_client: &IpAddr, origin: &ServerName, receipt: ReceiptContent) -> Result<()> { - if !services().globals.allow_incoming_read_receipts() { +async fn handle_edu_receipt( + services: &Services, _client: &IpAddr, origin: &ServerName, receipt: ReceiptContent, +) -> Result<()> { + if !services.globals.allow_incoming_read_receipts() { return Ok(()); } for (room_id, room_updates) in receipt.receipts { - if services() + if services .rooms .event_handler .acl_check(origin, &room_id) @@ -231,7 +238,7 @@ async fn handle_edu_receipt(_client: &IpAddr, origin: &ServerName, receipt: Rece continue; } - if services() + if services .rooms .state_cache .room_members(&room_id) @@ -247,7 +254,7 @@ async fn handle_edu_receipt(_client: &IpAddr, origin: &ServerName, receipt: Rece room_id: room_id.clone(), }; - services() + services .rooms .read_receipt .readreceipt_update(&user_id, &room_id, &event)?; @@ -265,8 +272,10 @@ async fn handle_edu_receipt(_client: &IpAddr, origin: &ServerName, receipt: Rece Ok(()) } -async fn handle_edu_typing(_client: &IpAddr, origin: &ServerName, typing: TypingContent) -> Result<()> { - if !services().globals.config.allow_incoming_typing { +async fn handle_edu_typing( + services: &Services, _client: &IpAddr, origin: &ServerName, typing: TypingContent, +) -> Result<()> { + if !services.globals.config.allow_incoming_typing { return Ok(()); } @@ -278,7 +287,7 @@ async fn handle_edu_typing(_client: &IpAddr, origin: &ServerName, typing: Typing return Ok(()); } - if services() + if services .rooms .event_handler .acl_check(typing.user_id.server_name(), &typing.room_id) @@ -291,26 +300,26 @@ async fn handle_edu_typing(_client: &IpAddr, origin: &ServerName, typing: Typing return Ok(()); } - if services() + if services .rooms .state_cache .is_joined(&typing.user_id, &typing.room_id)? { if typing.typing { let timeout = utils::millis_since_unix_epoch().saturating_add( - services() + services .globals .config .typing_federation_timeout_s .saturating_mul(1000), ); - services() + services .rooms .typing .typing_add(&typing.user_id, &typing.room_id, timeout) .await?; } else { - services() + services .rooms .typing .typing_remove(&typing.user_id, &typing.room_id) @@ -328,7 +337,7 @@ async fn handle_edu_typing(_client: &IpAddr, origin: &ServerName, typing: Typing } async fn handle_edu_device_list_update( - _client: &IpAddr, origin: &ServerName, content: DeviceListUpdateContent, + services: &Services, _client: &IpAddr, origin: &ServerName, content: DeviceListUpdateContent, ) -> Result<()> { let DeviceListUpdateContent { user_id, @@ -343,13 +352,13 @@ async fn handle_edu_device_list_update( return Ok(()); } - services().users.mark_device_key_update(&user_id)?; + services.users.mark_device_key_update(&user_id)?; Ok(()) } async fn handle_edu_direct_to_device( - _client: &IpAddr, origin: &ServerName, content: DirectDeviceContent, + services: &Services, _client: &IpAddr, origin: &ServerName, content: DirectDeviceContent, ) -> Result<()> { let DirectDeviceContent { sender, @@ -367,7 +376,7 @@ async fn handle_edu_direct_to_device( } // Check if this is a new transaction id - if services() + if services .transaction_ids .existing_txnid(&sender, None, &message_id)? .is_some() @@ -379,7 +388,7 @@ async fn handle_edu_direct_to_device( for (target_device_id_maybe, event) in map { match target_device_id_maybe { DeviceIdOrAllDevices::DeviceId(target_device_id) => { - services().users.add_to_device_event( + services.users.add_to_device_event( &sender, target_user_id, target_device_id, @@ -391,8 +400,8 @@ async fn handle_edu_direct_to_device( }, DeviceIdOrAllDevices::AllDevices => { - for target_device_id in services().users.all_device_ids(target_user_id) { - services().users.add_to_device_event( + for target_device_id in services.users.all_device_ids(target_user_id) { + services.users.add_to_device_event( &sender, target_user_id, &target_device_id?, @@ -408,7 +417,7 @@ async fn handle_edu_direct_to_device( } // Save transaction id with empty data - services() + services .transaction_ids .add_txnid(&sender, None, &message_id, &[])?; @@ -416,7 +425,7 @@ async fn handle_edu_direct_to_device( } async fn handle_edu_signing_key_update( - _client: &IpAddr, origin: &ServerName, content: SigningKeyUpdateContent, + services: &Services, _client: &IpAddr, origin: &ServerName, content: SigningKeyUpdateContent, ) -> Result<()> { let SigningKeyUpdateContent { user_id, @@ -433,7 +442,7 @@ async fn handle_edu_signing_key_update( } if let Some(master_key) = master_key { - services() + services .users .add_cross_signing_keys(&user_id, &master_key, &self_signing_key, &None, true)?; }