move mutex maps out of globals into respective service

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-07-09 21:10:14 +00:00
parent 2d251eb19c
commit 271f720286
23 changed files with 93 additions and 121 deletions

View file

@ -34,7 +34,7 @@ pub async fn create_admin_room() -> Result<()> {
let _short_id = services().rooms.short.get_or_create_shortroomid(&room_id)?;
let state_lock = services().globals.roomid_mutex_state.lock(&room_id).await;
let state_lock = services().rooms.state.mutex.lock(&room_id).await;
// Create a user for the server
let server_user = &services().globals.server_user;

View file

@ -22,7 +22,7 @@ use crate::{pdu::PduBuilder, services};
/// In conduit, this is equivalent to granting admin privileges.
pub async fn make_user_admin(user_id: &UserId, displayname: String) -> Result<()> {
if let Some(room_id) = Service::get_admin_room()? {
let state_lock = services().globals.roomid_mutex_state.lock(&room_id).await;
let state_lock = services().rooms.state.mutex.lock(&room_id).await;
// Use the server user to grant the new admin's power level
let server_user = &services().globals.server_user;

View file

@ -26,7 +26,7 @@ use tokio::{
task::JoinHandle,
};
use crate::{globals::RoomMutexGuard, pdu::PduBuilder, services, user_is_local, PduEvent};
use crate::{pdu::PduBuilder, rooms::state::RoomMutexGuard, services, user_is_local, PduEvent};
const COMMAND_QUEUE_LIMIT: usize = 512;
@ -248,7 +248,7 @@ async fn respond_to_room(content: RoomMessageEventContent, room_id: &RoomId, use
"sender is not admin"
);
let state_lock = services().globals.roomid_mutex_state.lock(room_id).await;
let state_lock = services().rooms.state.mutex.lock(room_id).await;
let response_pdu = PduBuilder {
event_type: TimelineEventType::RoomMessage,
content: to_raw_value(&content).expect("event is valid, we just created it"),

View file

@ -12,19 +12,15 @@ use std::{
time::Instant,
};
use conduit::{
error, trace,
utils::{MutexMap, MutexMapGuard},
Config, Result,
};
use conduit::{error, trace, Config, Result};
use data::Data;
use ipaddress::IPAddress;
use regex::RegexSet;
use ruma::{
api::{client::discovery::discover_support::ContactRole, federation::discovery::VerifyKey},
serde::Base64,
DeviceId, OwnedEventId, OwnedRoomAliasId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId,
RoomAliasId, RoomVersionId, ServerName, UserId,
DeviceId, OwnedEventId, OwnedRoomAliasId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomAliasId,
RoomVersionId, ServerName, UserId,
};
use tokio::{sync::Mutex, task::JoinHandle};
use url::Url;
@ -45,18 +41,12 @@ pub struct Service {
pub bad_event_ratelimiter: Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
pub bad_signature_ratelimiter: Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
pub bad_query_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
pub roomid_mutex_insert: RoomMutexMap,
pub roomid_mutex_state: RoomMutexMap,
pub roomid_mutex_federation: RoomMutexMap,
pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>,
pub updates_handle: Mutex<Option<JoinHandle<()>>>,
pub stateres_mutex: Arc<Mutex<()>>,
pub server_user: OwnedUserId,
pub admin_alias: OwnedRoomAliasId,
}
pub type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
pub type RoomMutexGuard = MutexMapGuard<OwnedRoomId, ()>;
type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries
impl crate::Service for Service {
@ -113,10 +103,6 @@ impl crate::Service for Service {
bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
roomid_mutex_state: MutexMap::<OwnedRoomId, ()>::new(),
roomid_mutex_insert: MutexMap::<OwnedRoomId, ()>::new(),
roomid_mutex_federation: MutexMap::<OwnedRoomId, ()>::new(),
roomid_federationhandletime: RwLock::new(HashMap::new()),
updates_handle: Mutex::new(None),
stateres_mutex: Arc::new(Mutex::new(())),
admin_alias: RoomAliasId::parse(format!("#admins:{}", &config.server_name))

View file

@ -4,13 +4,14 @@ mod signing_keys;
use std::{
collections::{hash_map, BTreeMap, HashMap, HashSet},
pin::Pin,
sync::Arc,
sync::{Arc, RwLock as StdRwLock},
time::Instant,
};
use conduit::{
debug, debug_error, debug_info, error, info, trace, utils::math::continue_exponential_backoff_secs, warn, Error,
Result,
debug, debug_error, debug_info, error, info, trace,
utils::{math::continue_exponential_backoff_secs, MutexMap},
warn, Error, Result,
};
use futures_util::Future;
pub use parse_incoming_pdu::parse_incoming_pdu;
@ -28,14 +29,21 @@ use ruma::{
int,
serde::Base64,
state_res::{self, RoomVersion, StateMap},
uint, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, RoomVersionId, ServerName,
uint, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId,
RoomVersionId, ServerName,
};
use tokio::sync::RwLock;
use super::state_compressor::CompressedStateEvent;
use crate::{pdu, services, PduEvent};
pub struct Service;
pub struct Service {
pub federation_handletime: StdRwLock<HandleTimeMap>,
pub mutex_federation: RoomMutexMap,
}
type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
type HandleTimeMap = HashMap<OwnedRoomId, (OwnedEventId, Instant)>;
// We use some AsyncRecursiveType hacks here so we can call async funtion
// recursively.
@ -46,7 +54,12 @@ type AsyncRecursiveCanonicalJsonResult<'a> =
AsyncRecursiveType<'a, Result<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>)>>;
impl crate::Service for Service {
fn build(_args: crate::Args<'_>) -> Result<Arc<Self>> { Ok(Arc::new(Self {})) }
fn build(_args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
federation_handletime: HandleTimeMap::new().into(),
mutex_federation: RoomMutexMap::new(),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
@ -200,9 +213,7 @@ impl Service {
// Done with prev events, now handling the incoming event
let start_time = Instant::now();
services()
.globals
.roomid_federationhandletime
self.federation_handletime
.write()
.expect("locked")
.insert(room_id.to_owned(), (event_id.to_owned(), start_time));
@ -211,9 +222,7 @@ impl Service {
.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, room_id, pub_key_map)
.await;
services()
.globals
.roomid_federationhandletime
self.federation_handletime
.write()
.expect("locked")
.remove(&room_id.to_owned());
@ -272,9 +281,7 @@ impl Service {
}
let start_time = Instant::now();
services()
.globals
.roomid_federationhandletime
self.federation_handletime
.write()
.expect("locked")
.insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time));
@ -282,9 +289,7 @@ impl Service {
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id, pub_key_map)
.await?;
services()
.globals
.roomid_federationhandletime
self.federation_handletime
.write()
.expect("locked")
.remove(&room_id.to_owned());
@ -579,7 +584,7 @@ impl Service {
// We start looking at current room state now, so lets lock the room
trace!("Locking the room");
let state_lock = services().globals.roomid_mutex_state.lock(room_id).await;
let state_lock = services().rooms.state.mutex.lock(room_id).await;
// Now we calculate the set of extremities this room has after the incoming
// event has been applied. We start with the previous extremities (aka leaves)

View file

@ -4,7 +4,7 @@ use conduit::{utils, Error, Result};
use database::{Database, Map};
use ruma::{EventId, OwnedEventId, RoomId};
use crate::globals::RoomMutexGuard;
use super::RoomMutexGuard;
pub(super) struct Data {
shorteventid_shortstatehash: Arc<Map>,

View file

@ -5,7 +5,10 @@ use std::{
sync::Arc,
};
use conduit::{utils::calculate_hash, warn, Error, Result};
use conduit::{
utils::{calculate_hash, MutexMap, MutexMapGuard},
warn, Error, Result,
};
use data::Data;
use ruma::{
api::client::error::ErrorKind,
@ -15,20 +18,25 @@ use ruma::{
},
serde::Raw,
state_res::{self, StateMap},
EventId, OwnedEventId, RoomId, RoomVersionId, UserId,
EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId,
};
use super::state_compressor::CompressedStateEvent;
use crate::{globals::RoomMutexGuard, services, PduEvent};
use crate::{services, PduEvent};
pub struct Service {
db: Data,
pub mutex: RoomMutexMap,
}
type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
pub type RoomMutexGuard = MutexMapGuard<OwnedRoomId, ()>;
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
mutex: RoomMutexMap::new(),
}))
}

View file

@ -33,7 +33,7 @@ use ruma::{
};
use serde_json::value::to_raw_value;
use crate::{globals::RoomMutexGuard, pdu::PduBuilder, services, PduEvent};
use crate::{pdu::PduBuilder, rooms::state::RoomMutexGuard, services, PduEvent};
pub struct Service {
db: Data,

View file

@ -6,7 +6,11 @@ use std::{
sync::Arc,
};
use conduit::{debug, error, info, utils, validated, warn, Error, Result};
use conduit::{
debug, error, info, utils,
utils::{MutexMap, MutexMapGuard},
validated, warn, Error, Result,
};
use data::Data;
use itertools::Itertools;
use ruma::{
@ -26,8 +30,8 @@ use ruma::{
push::{Action, Ruleset, Tweak},
serde::Base64,
state_res::{self, Event, RoomVersion},
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedServerName, RoomId,
RoomVersionId, ServerName, UserId,
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName,
RoomId, RoomVersionId, ServerName, UserId,
};
use serde::Deserialize;
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
@ -36,7 +40,6 @@ use tokio::sync::RwLock;
use crate::{
admin,
appservice::NamespaceRegex,
globals::RoomMutexGuard,
pdu::{EventHash, PduBuilder},
rooms::{event_handler::parse_incoming_pdu, state_compressor::CompressedStateEvent},
server_is_ours, services, PduCount, PduEvent,
@ -66,12 +69,17 @@ struct ExtractBody {
pub struct Service {
db: Data,
pub mutex_insert: RoomMutexMap,
}
type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
pub type RoomMutexGuard = MutexMapGuard<OwnedRoomId, ()>;
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
mutex_insert: RoomMutexMap::new(),
}))
}
@ -269,11 +277,7 @@ impl Service {
.state
.set_forward_extremities(&pdu.room_id, leaves, state_lock)?;
let insert_lock = services()
.globals
.roomid_mutex_insert
.lock(&pdu.room_id)
.await;
let insert_lock = self.mutex_insert.lock(&pdu.room_id).await;
let count1 = services().globals.next_count()?;
// Mark as read first so the sending client doesn't get a notification even if
@ -1154,8 +1158,9 @@ impl Service {
// Lock so we cannot backfill the same pdu twice at the same time
let mutex_lock = services()
.globals
.roomid_mutex_federation
.rooms
.event_handler
.mutex_federation
.lock(&room_id)
.await;
@ -1187,7 +1192,7 @@ impl Service {
.get_shortroomid(&room_id)?
.expect("room exists");
let insert_lock = services().globals.roomid_mutex_insert.lock(&room_id).await;
let insert_lock = self.mutex_insert.lock(&room_id).await;
let max = u64::MAX;
let count = services().globals.next_count()?;