From 271f720286280876eeb2be1ba8082dbc1d93274e Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 9 Jul 2024 21:10:14 +0000 Subject: [PATCH] move mutex maps out of globals into respective service Signed-off-by: Jason Volk --- src/admin/debug/commands.rs | 2 +- src/admin/federation/commands.rs | 5 +-- src/api/client/membership.rs | 26 ++++----------- src/api/client/message.rs | 6 +--- src/api/client/profile.rs | 2 +- src/api/client/redact.rs | 6 +--- src/api/client/room.rs | 14 ++------ src/api/client/state.rs | 2 +- src/api/client/sync.rs | 6 ++-- src/api/server/make_join.rs | 6 +--- src/api/server/make_leave.rs | 6 +--- src/api/server/send.rs | 5 +-- src/api/server/send_join.rs | 5 +-- src/api/server/send_leave.rs | 5 +-- src/service/admin/create.rs | 2 +- src/service/admin/grant.rs | 2 +- src/service/admin/mod.rs | 4 +-- src/service/globals/mod.rs | 20 ++---------- src/service/rooms/event_handler/mod.rs | 43 ++++++++++++++----------- src/service/rooms/state/data.rs | 2 +- src/service/rooms/state/mod.rs | 14 ++++++-- src/service/rooms/state_accessor/mod.rs | 2 +- src/service/rooms/timeline/mod.rs | 29 ++++++++++------- 23 files changed, 93 insertions(+), 121 deletions(-) diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 53009566..62741f38 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -570,7 +570,7 @@ pub(super) async fn force_set_room_state_from_server( .state_compressor .save_state(room_id.clone().as_ref(), new_room_state)?; - let state_lock = services().globals.roomid_mutex_state.lock(&room_id).await; + let state_lock = services().rooms.state.mutex.lock(&room_id).await; services() .rooms .state diff --git a/src/admin/federation/commands.rs b/src/admin/federation/commands.rs index 24f4bc23..a97e7582 100644 --- a/src/admin/federation/commands.rs +++ b/src/admin/federation/commands.rs @@ -16,8 +16,9 @@ pub(super) async fn enable_room(_body: Vec<&str>, room_id: Box) -> Resul pub(super) async fn incoming_federation(_body: Vec<&str>) -> Result { let map = services() - .globals - .roomid_federationhandletime + .rooms + .event_handler + .federation_handletime .read() .expect("locked"); let mut msg = format!("Handling {} incoming pdus:\n", map.len()); diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index 2ac4c723..1b70fdec 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -40,8 +40,8 @@ use tokio::sync::RwLock; use crate::{ client::{update_avatar_url, update_displayname}, service::{ - globals::RoomMutexGuard, pdu::{gen_event_id_canonical_json, PduBuilder}, + rooms::state::RoomMutexGuard, sending::convert_to_outgoing_federation_event, server_is_ours, user_is_local, }, @@ -366,11 +366,7 @@ pub(crate) async fn invite_user_route( pub(crate) async fn kick_user_route(body: Ruma) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let state_lock = services() - .globals - .roomid_mutex_state - .lock(&body.room_id) - .await; + let state_lock = services().rooms.state.mutex.lock(&body.room_id).await; let mut event: RoomMemberEventContent = serde_json::from_str( services() @@ -417,11 +413,7 @@ pub(crate) async fn kick_user_route(body: Ruma) -> Resul pub(crate) async fn ban_user_route(body: Ruma) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let state_lock = services() - .globals - .roomid_mutex_state - .lock(&body.room_id) - .await; + let state_lock = services().rooms.state.mutex.lock(&body.room_id).await; let event = services() .rooms @@ -481,11 +473,7 @@ pub(crate) async fn ban_user_route(body: Ruma) -> Result< pub(crate) async fn unban_user_route(body: Ruma) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let state_lock = services() - .globals - .roomid_mutex_state - .lock(&body.room_id) - .await; + let state_lock = services().rooms.state.mutex.lock(&body.room_id).await; let mut event: RoomMemberEventContent = serde_json::from_str( services() @@ -1399,7 +1387,7 @@ pub(crate) async fn invite_helper( if !user_is_local(user_id) { let (pdu, pdu_json, invite_room_state) = { - let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; + let state_lock = services().rooms.state.mutex.lock(room_id).await; let content = to_raw_value(&RoomMemberEventContent { avatar_url: services().users.avatar_url(user_id)?, displayname: None, @@ -1511,7 +1499,7 @@ pub(crate) async fn invite_helper( )); } - let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; + let state_lock = services().rooms.state.mutex.lock(room_id).await; services() .rooms @@ -1605,7 +1593,7 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option, user_id: OwnedUserId) { for (pdu_builder, room_id) in all_joined_rooms { - let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; + let state_lock = services().rooms.state.mutex.lock(room_id).await; if let Err(e) = services() .rooms .timeline diff --git a/src/api/client/redact.rs b/src/api/client/redact.rs index 4cb24c33..308d12e5 100644 --- a/src/api/client/redact.rs +++ b/src/api/client/redact.rs @@ -15,11 +15,7 @@ pub(crate) async fn redact_event_route(body: Ruma) -> let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let body = body.body; - let state_lock = services() - .globals - .roomid_mutex_state - .lock(&body.room_id) - .await; + let state_lock = services().rooms.state.mutex.lock(&body.room_id).await; let event_id = services() .rooms diff --git a/src/api/client/room.rs b/src/api/client/room.rs index 7090fdc8..ccdf4dc0 100644 --- a/src/api/client/room.rs +++ b/src/api/client/room.rs @@ -90,7 +90,7 @@ pub(crate) async fn create_room_route(body: Ruma) -> R } let _short_id = services().rooms.short.get_or_create_shortroomid(&room_id)?; - let state_lock = services().globals.roomid_mutex_state.lock(&room_id).await; + let state_lock = services().rooms.state.mutex.lock(&room_id).await; let alias: Option = if let Some(alias) = &body.room_alias_name { Some(room_alias_check(alias, &body.appservice_info).await?) @@ -573,11 +573,7 @@ pub(crate) async fn upgrade_room_route(body: Ruma) -> .short .get_or_create_shortroomid(&replacement_room)?; - let state_lock = services() - .globals - .roomid_mutex_state - .lock(&body.room_id) - .await; + let state_lock = services().rooms.state.mutex.lock(&body.room_id).await; // Send a m.room.tombstone event to the old room to indicate that it is not // intended to be used any further Fail if the sender does not have the required @@ -605,11 +601,7 @@ pub(crate) async fn upgrade_room_route(body: Ruma) -> // Change lock to replacement room drop(state_lock); - let state_lock = services() - .globals - .roomid_mutex_state - .lock(&replacement_room) - .await; + let state_lock = services().rooms.state.mutex.lock(&replacement_room).await; // Get the old room creation event let mut create_event_content = serde_json::from_str::( diff --git a/src/api/client/state.rs b/src/api/client/state.rs index abff9218..25b77fe3 100644 --- a/src/api/client/state.rs +++ b/src/api/client/state.rs @@ -170,7 +170,7 @@ async fn send_state_event_for_key_helper( sender: &UserId, room_id: &RoomId, event_type: &StateEventType, json: &Raw, state_key: String, ) -> Result> { allowed_to_send_state_event(room_id, event_type, json).await?; - let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; + let state_lock = services().rooms.state.mutex.lock(room_id).await; let event_id = services() .rooms .timeline diff --git a/src/api/client/sync.rs b/src/api/client/sync.rs index 45a1c75b..dd1f3401 100644 --- a/src/api/client/sync.rs +++ b/src/api/client/sync.rs @@ -199,7 +199,7 @@ pub(crate) async fn sync_events_route( let (room_id, invite_state_events) = result?; // Get and drop the lock to wait for remaining operations to finish - let insert_lock = services().globals.roomid_mutex_insert.lock(&room_id).await; + let insert_lock = services().rooms.timeline.mutex_insert.lock(&room_id).await; drop(insert_lock); let invite_count = services() @@ -317,7 +317,7 @@ async fn handle_left_room( next_batch_string: &str, full_state: bool, lazy_load_enabled: bool, ) -> Result<()> { // Get and drop the lock to wait for remaining operations to finish - let insert_lock = services().globals.roomid_mutex_insert.lock(room_id).await; + let insert_lock = services().rooms.timeline.mutex_insert.lock(room_id).await; drop(insert_lock); let left_count = services() @@ -519,7 +519,7 @@ async fn load_joined_room( ) -> Result { // Get and drop the lock to wait for remaining operations to finish // This will make sure the we have all events until next_batch - let insert_lock = services().globals.roomid_mutex_insert.lock(room_id).await; + let insert_lock = services().rooms.timeline.mutex_insert.lock(room_id).await; drop(insert_lock); let (timeline_pdus, limited) = load_timeline(sender_user, room_id, sincecount, 10)?; diff --git a/src/api/server/make_join.rs b/src/api/server/make_join.rs index ca50dcbe..c909ea33 100644 --- a/src/api/server/make_join.rs +++ b/src/api/server/make_join.rs @@ -71,11 +71,7 @@ pub(crate) async fn create_join_event_template_route( let room_version_id = services().rooms.state.get_room_version(&body.room_id)?; - let state_lock = services() - .globals - .roomid_mutex_state - .lock(&body.room_id) - .await; + let state_lock = services().rooms.state.mutex.lock(&body.room_id).await; let join_authorized_via_users_server = if (services() .rooms diff --git a/src/api/server/make_leave.rs b/src/api/server/make_leave.rs index 62c09717..eea3e4f8 100644 --- a/src/api/server/make_leave.rs +++ b/src/api/server/make_leave.rs @@ -35,11 +35,7 @@ pub(crate) async fn create_leave_event_template_route( .acl_check(origin, &body.room_id)?; let room_version_id = services().rooms.state.get_room_version(&body.room_id)?; - let state_lock = services() - .globals - .roomid_mutex_state - .lock(&body.room_id) - .await; + let state_lock = services().rooms.state.mutex.lock(&body.room_id).await; let content = to_raw_value(&RoomMemberEventContent { avatar_url: None, blurhash: None, diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 90225a14..08caf1b4 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -127,8 +127,9 @@ async fn handle_pdus( for (event_id, value, room_id) in parsed_pdus { let pdu_start_time = Instant::now(); let mutex_lock = services() - .globals - .roomid_mutex_federation + .rooms + .event_handler + .mutex_federation .lock(&room_id) .await; resolved_map.insert( diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index ff362f64..577833d5 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -156,8 +156,9 @@ async fn create_join_event( .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "origin is not a server name."))?; let mutex_lock = services() - .globals - .roomid_mutex_federation + .rooms + .event_handler + .mutex_federation .lock(room_id) .await; let pdu_id: Vec = services() diff --git a/src/api/server/send_leave.rs b/src/api/server/send_leave.rs index 4fdde515..c4e17bbc 100644 --- a/src/api/server/send_leave.rs +++ b/src/api/server/send_leave.rs @@ -152,8 +152,9 @@ async fn create_leave_event(origin: &ServerName, room_id: &RoomId, pdu: &RawJson .await?; let mutex_lock = services() - .globals - .roomid_mutex_federation + .rooms + .event_handler + .mutex_federation .lock(room_id) .await; let pdu_id: Vec = services() diff --git a/src/service/admin/create.rs b/src/service/admin/create.rs index ad70fe0c..57df344a 100644 --- a/src/service/admin/create.rs +++ b/src/service/admin/create.rs @@ -34,7 +34,7 @@ pub async fn create_admin_room() -> Result<()> { let _short_id = services().rooms.short.get_or_create_shortroomid(&room_id)?; - let state_lock = services().globals.roomid_mutex_state.lock(&room_id).await; + let state_lock = services().rooms.state.mutex.lock(&room_id).await; // Create a user for the server let server_user = &services().globals.server_user; diff --git a/src/service/admin/grant.rs b/src/service/admin/grant.rs index ca48ce0d..9a4ef242 100644 --- a/src/service/admin/grant.rs +++ b/src/service/admin/grant.rs @@ -22,7 +22,7 @@ use crate::{pdu::PduBuilder, services}; /// In conduit, this is equivalent to granting admin privileges. pub async fn make_user_admin(user_id: &UserId, displayname: String) -> Result<()> { if let Some(room_id) = Service::get_admin_room()? { - let state_lock = services().globals.roomid_mutex_state.lock(&room_id).await; + let state_lock = services().rooms.state.mutex.lock(&room_id).await; // Use the server user to grant the new admin's power level let server_user = &services().globals.server_user; diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index e0dd1760..5c8a98bb 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -26,7 +26,7 @@ use tokio::{ task::JoinHandle, }; -use crate::{globals::RoomMutexGuard, pdu::PduBuilder, services, user_is_local, PduEvent}; +use crate::{pdu::PduBuilder, rooms::state::RoomMutexGuard, services, user_is_local, PduEvent}; const COMMAND_QUEUE_LIMIT: usize = 512; @@ -248,7 +248,7 @@ async fn respond_to_room(content: RoomMessageEventContent, room_id: &RoomId, use "sender is not admin" ); - let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; + let state_lock = services().rooms.state.mutex.lock(room_id).await; let response_pdu = PduBuilder { event_type: TimelineEventType::RoomMessage, content: to_raw_value(&content).expect("event is valid, we just created it"), diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index a5b70835..16830d87 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -12,19 +12,15 @@ use std::{ time::Instant, }; -use conduit::{ - error, trace, - utils::{MutexMap, MutexMapGuard}, - Config, Result, -}; +use conduit::{error, trace, Config, Result}; use data::Data; use ipaddress::IPAddress; use regex::RegexSet; use ruma::{ api::{client::discovery::discover_support::ContactRole, federation::discovery::VerifyKey}, serde::Base64, - DeviceId, OwnedEventId, OwnedRoomAliasId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, - RoomAliasId, RoomVersionId, ServerName, UserId, + DeviceId, OwnedEventId, OwnedRoomAliasId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomAliasId, + RoomVersionId, ServerName, UserId, }; use tokio::{sync::Mutex, task::JoinHandle}; use url::Url; @@ -45,18 +41,12 @@ pub struct Service { pub bad_event_ratelimiter: Arc>>, pub bad_signature_ratelimiter: Arc, RateLimitState>>>, pub bad_query_ratelimiter: Arc>>, - pub roomid_mutex_insert: RoomMutexMap, - pub roomid_mutex_state: RoomMutexMap, - pub roomid_mutex_federation: RoomMutexMap, - pub roomid_federationhandletime: RwLock>, pub updates_handle: Mutex>>, pub stateres_mutex: Arc>, pub server_user: OwnedUserId, pub admin_alias: OwnedRoomAliasId, } -pub type RoomMutexMap = MutexMap; -pub type RoomMutexGuard = MutexMapGuard; type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries impl crate::Service for Service { @@ -113,10 +103,6 @@ impl crate::Service for Service { bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())), bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())), bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())), - roomid_mutex_state: MutexMap::::new(), - roomid_mutex_insert: MutexMap::::new(), - roomid_mutex_federation: MutexMap::::new(), - roomid_federationhandletime: RwLock::new(HashMap::new()), updates_handle: Mutex::new(None), stateres_mutex: Arc::new(Mutex::new(())), admin_alias: RoomAliasId::parse(format!("#admins:{}", &config.server_name)) diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 53ce6f8d..bf553d5c 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -4,13 +4,14 @@ mod signing_keys; use std::{ collections::{hash_map, BTreeMap, HashMap, HashSet}, pin::Pin, - sync::Arc, + sync::{Arc, RwLock as StdRwLock}, time::Instant, }; use conduit::{ - debug, debug_error, debug_info, error, info, trace, utils::math::continue_exponential_backoff_secs, warn, Error, - Result, + debug, debug_error, debug_info, error, info, trace, + utils::{math::continue_exponential_backoff_secs, MutexMap}, + warn, Error, Result, }; use futures_util::Future; pub use parse_incoming_pdu::parse_incoming_pdu; @@ -28,14 +29,21 @@ use ruma::{ int, serde::Base64, state_res::{self, RoomVersion, StateMap}, - uint, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, RoomVersionId, ServerName, + uint, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, + RoomVersionId, ServerName, }; use tokio::sync::RwLock; use super::state_compressor::CompressedStateEvent; use crate::{pdu, services, PduEvent}; -pub struct Service; +pub struct Service { + pub federation_handletime: StdRwLock, + pub mutex_federation: RoomMutexMap, +} + +type RoomMutexMap = MutexMap; +type HandleTimeMap = HashMap; // We use some AsyncRecursiveType hacks here so we can call async funtion // recursively. @@ -46,7 +54,12 @@ type AsyncRecursiveCanonicalJsonResult<'a> = AsyncRecursiveType<'a, Result<(Arc, BTreeMap)>>; impl crate::Service for Service { - fn build(_args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self {})) } + fn build(_args: crate::Args<'_>) -> Result> { + Ok(Arc::new(Self { + federation_handletime: HandleTimeMap::new().into(), + mutex_federation: RoomMutexMap::new(), + })) + } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } @@ -200,9 +213,7 @@ impl Service { // Done with prev events, now handling the incoming event let start_time = Instant::now(); - services() - .globals - .roomid_federationhandletime + self.federation_handletime .write() .expect("locked") .insert(room_id.to_owned(), (event_id.to_owned(), start_time)); @@ -211,9 +222,7 @@ impl Service { .upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, room_id, pub_key_map) .await; - services() - .globals - .roomid_federationhandletime + self.federation_handletime .write() .expect("locked") .remove(&room_id.to_owned()); @@ -272,9 +281,7 @@ impl Service { } let start_time = Instant::now(); - services() - .globals - .roomid_federationhandletime + self.federation_handletime .write() .expect("locked") .insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time)); @@ -282,9 +289,7 @@ impl Service { self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id, pub_key_map) .await?; - services() - .globals - .roomid_federationhandletime + self.federation_handletime .write() .expect("locked") .remove(&room_id.to_owned()); @@ -579,7 +584,7 @@ impl Service { // We start looking at current room state now, so lets lock the room trace!("Locking the room"); - let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; + let state_lock = services().rooms.state.mutex.lock(room_id).await; // Now we calculate the set of extremities this room has after the incoming // event has been applied. We start with the previous extremities (aka leaves) diff --git a/src/service/rooms/state/data.rs b/src/service/rooms/state/data.rs index b62adf60..3c110afc 100644 --- a/src/service/rooms/state/data.rs +++ b/src/service/rooms/state/data.rs @@ -4,7 +4,7 @@ use conduit::{utils, Error, Result}; use database::{Database, Map}; use ruma::{EventId, OwnedEventId, RoomId}; -use crate::globals::RoomMutexGuard; +use super::RoomMutexGuard; pub(super) struct Data { shorteventid_shortstatehash: Arc, diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index b46a9d04..7d89ee33 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -5,7 +5,10 @@ use std::{ sync::Arc, }; -use conduit::{utils::calculate_hash, warn, Error, Result}; +use conduit::{ + utils::{calculate_hash, MutexMap, MutexMapGuard}, + warn, Error, Result, +}; use data::Data; use ruma::{ api::client::error::ErrorKind, @@ -15,20 +18,25 @@ use ruma::{ }, serde::Raw, state_res::{self, StateMap}, - EventId, OwnedEventId, RoomId, RoomVersionId, UserId, + EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId, }; use super::state_compressor::CompressedStateEvent; -use crate::{globals::RoomMutexGuard, services, PduEvent}; +use crate::{services, PduEvent}; pub struct Service { db: Data, + pub mutex: RoomMutexMap, } +type RoomMutexMap = MutexMap; +pub type RoomMutexGuard = MutexMapGuard; + impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { db: Data::new(args.db), + mutex: RoomMutexMap::new(), })) } diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 35719c15..389f12c3 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -33,7 +33,7 @@ use ruma::{ }; use serde_json::value::to_raw_value; -use crate::{globals::RoomMutexGuard, pdu::PduBuilder, services, PduEvent}; +use crate::{pdu::PduBuilder, rooms::state::RoomMutexGuard, services, PduEvent}; pub struct Service { db: Data, diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 9bfc2715..df2d46bd 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -6,7 +6,11 @@ use std::{ sync::Arc, }; -use conduit::{debug, error, info, utils, validated, warn, Error, Result}; +use conduit::{ + debug, error, info, utils, + utils::{MutexMap, MutexMapGuard}, + validated, warn, Error, Result, +}; use data::Data; use itertools::Itertools; use ruma::{ @@ -26,8 +30,8 @@ use ruma::{ push::{Action, Ruleset, Tweak}, serde::Base64, state_res::{self, Event, RoomVersion}, - uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedServerName, RoomId, - RoomVersionId, ServerName, UserId, + uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName, + RoomId, RoomVersionId, ServerName, UserId, }; use serde::Deserialize; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; @@ -36,7 +40,6 @@ use tokio::sync::RwLock; use crate::{ admin, appservice::NamespaceRegex, - globals::RoomMutexGuard, pdu::{EventHash, PduBuilder}, rooms::{event_handler::parse_incoming_pdu, state_compressor::CompressedStateEvent}, server_is_ours, services, PduCount, PduEvent, @@ -66,12 +69,17 @@ struct ExtractBody { pub struct Service { db: Data, + pub mutex_insert: RoomMutexMap, } +type RoomMutexMap = MutexMap; +pub type RoomMutexGuard = MutexMapGuard; + impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { db: Data::new(args.db), + mutex_insert: RoomMutexMap::new(), })) } @@ -269,11 +277,7 @@ impl Service { .state .set_forward_extremities(&pdu.room_id, leaves, state_lock)?; - let insert_lock = services() - .globals - .roomid_mutex_insert - .lock(&pdu.room_id) - .await; + let insert_lock = self.mutex_insert.lock(&pdu.room_id).await; let count1 = services().globals.next_count()?; // Mark as read first so the sending client doesn't get a notification even if @@ -1154,8 +1158,9 @@ impl Service { // Lock so we cannot backfill the same pdu twice at the same time let mutex_lock = services() - .globals - .roomid_mutex_federation + .rooms + .event_handler + .mutex_federation .lock(&room_id) .await; @@ -1187,7 +1192,7 @@ impl Service { .get_shortroomid(&room_id)? .expect("room exists"); - let insert_lock = services().globals.roomid_mutex_insert.lock(&room_id).await; + let insert_lock = self.mutex_insert.lock(&room_id).await; let max = u64::MAX; let count = services().globals.next_count()?;