reduce roomid_mutex_state

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-06-14 22:08:44 +00:00
parent 539aa27815
commit 08bf074cbb
18 changed files with 93 additions and 269 deletions

View file

@ -1,4 +1,4 @@
use std::{collections::BTreeMap, sync::Arc};
use std::collections::BTreeMap;
use conduit::{Error, Result};
use ruma::{
@ -32,18 +32,9 @@ use crate::{pdu::PduBuilder, services};
pub async fn create_admin_room() -> Result<()> {
let room_id = RoomId::new(services().globals.server_name());
services().rooms.short.get_or_create_shortroomid(&room_id)?;
let _short_id = services().rooms.short.get_or_create_shortroomid(&room_id)?;
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.await
.entry(room_id.clone())
.or_default(),
);
let state_lock = mutex_state.lock().await;
let state_lock = services().globals.roomid_mutex_state.lock(&room_id).await;
// Create a user for the server
let server_user = &services().globals.server_user;

View file

@ -1,4 +1,4 @@
use std::{collections::BTreeMap, sync::Arc};
use std::collections::BTreeMap;
use conduit::Result;
use ruma::{
@ -22,16 +22,7 @@ use crate::{pdu::PduBuilder, services};
/// In conduit, this is equivalent to granting admin privileges.
pub async fn make_user_admin(user_id: &UserId, displayname: String) -> Result<()> {
if let Some(room_id) = Service::get_admin_room()? {
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.await
.entry(room_id.clone())
.or_default(),
);
let state_lock = mutex_state.lock().await;
let state_lock = services().globals.roomid_mutex_state.lock(&room_id).await;
// Use the server user to grant the new admin's power level
let server_user = &services().globals.server_user;

View file

@ -4,7 +4,7 @@ mod grant;
use std::{future::Future, pin::Pin, sync::Arc};
use conduit::{Error, Result};
use conduit::{utils::mutex_map, Error, Result};
pub use create::create_admin_room;
pub use grant::make_user_admin;
use ruma::{
@ -15,10 +15,7 @@ use ruma::{
EventId, OwnedRoomId, RoomId, UserId,
};
use serde_json::value::to_raw_value;
use tokio::{
sync::{Mutex, MutexGuard},
task::JoinHandle,
};
use tokio::{sync::Mutex, task::JoinHandle};
use tracing::error;
use crate::{pdu::PduBuilder, services, PduEvent};
@ -218,17 +215,7 @@ async fn respond_to_room(content: &RoomMessageEventContent, room_id: &RoomId, us
"sender is not admin"
);
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.await
.entry(room_id.to_owned())
.or_default(),
);
let state_lock = mutex_state.lock().await;
let state_lock = services().globals.roomid_mutex_state.lock(room_id).await;
let response_pdu = PduBuilder {
event_type: TimelineEventType::RoomMessage,
content: to_raw_value(content).expect("event is valid, we just created it"),
@ -250,7 +237,7 @@ async fn respond_to_room(content: &RoomMessageEventContent, room_id: &RoomId, us
}
async fn handle_response_error(
e: &Error, room_id: &RoomId, user_id: &UserId, state_lock: &MutexGuard<'_, ()>,
e: &Error, room_id: &RoomId, user_id: &UserId, state_lock: &mutex_map::Guard<()>,
) -> Result<()> {
error!("Failed to build and append admin room response PDU: \"{e}\"");
let error_room_message = RoomMessageEventContent::text_plain(format!(

View file

@ -55,7 +55,7 @@ pub struct Service {
pub bad_signature_ratelimiter: Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
pub bad_query_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
pub roomid_mutex_insert: MutexMap<OwnedRoomId, ()>,
pub roomid_mutex_state: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
pub roomid_mutex_state: MutexMap<OwnedRoomId, ()>,
pub roomid_mutex_federation: MutexMap<OwnedRoomId, ()>,
pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>,
pub updates_handle: Mutex<Option<JoinHandle<()>>>,
@ -116,7 +116,7 @@ impl Service {
bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
roomid_mutex_state: RwLock::new(HashMap::new()),
roomid_mutex_state: MutexMap::<OwnedRoomId, ()>::new(),
roomid_mutex_insert: MutexMap::<OwnedRoomId, ()>::new(),
roomid_mutex_federation: MutexMap::<OwnedRoomId, ()>::new(),
roomid_federationhandletime: RwLock::new(HashMap::new()),

View file

@ -530,18 +530,8 @@ impl Service {
// 13. Use state resolution to find new room state
// We start looking at current room state now, so lets lock the room
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.await
.entry(room_id.to_owned())
.or_default(),
);
trace!("Locking the room");
let state_lock = mutex_state.lock().await;
let state_lock = services().globals.roomid_mutex_state.lock(room_id).await;
// Now we calculate the set of extremities this room has after the incoming
// event has been applied. We start with the previous extremities (aka leaves)

View file

@ -1,7 +1,7 @@
use std::{collections::HashSet, sync::Arc};
use conduit::utils::mutex_map;
use ruma::{EventId, OwnedEventId, RoomId};
use tokio::sync::MutexGuard;
use crate::{utils, Error, KeyValueDatabase, Result};
@ -14,7 +14,7 @@ pub trait Data: Send + Sync {
&self,
room_id: &RoomId,
new_shortstatehash: u64,
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
_mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()>;
/// Associates a state with an event.
@ -28,7 +28,7 @@ pub trait Data: Send + Sync {
&self,
room_id: &RoomId,
event_ids: Vec<OwnedEventId>,
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
_mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()>;
}
@ -47,7 +47,7 @@ impl Data for KeyValueDatabase {
&self,
room_id: &RoomId,
new_shortstatehash: u64,
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
_mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> {
self.roomid_shortstatehash
.insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?;
@ -80,7 +80,7 @@ impl Data for KeyValueDatabase {
&self,
room_id: &RoomId,
event_ids: Vec<OwnedEventId>,
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
_mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xFF);

View file

@ -4,6 +4,7 @@ use std::{
sync::Arc,
};
use conduit::utils::mutex_map;
use data::Data;
use ruma::{
api::client::error::ErrorKind,
@ -15,7 +16,6 @@ use ruma::{
state_res::{self, StateMap},
EventId, OwnedEventId, RoomId, RoomVersionId, UserId,
};
use tokio::sync::MutexGuard;
use tracing::warn;
use super::state_compressor::CompressedStateEvent;
@ -33,7 +33,7 @@ impl Service {
shortstatehash: u64,
statediffnew: Arc<HashSet<CompressedStateEvent>>,
_statediffremoved: Arc<HashSet<CompressedStateEvent>>,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> {
for event_id in statediffnew.iter().filter_map(|new| {
services()
@ -299,12 +299,12 @@ impl Service {
}
/// Set the state hash to a new version, but does not update state_cache.
#[tracing::instrument(skip(self))]
#[tracing::instrument(skip(self, mutex_lock))]
pub fn set_room_state(
&self,
room_id: &RoomId,
shortstatehash: u64,
mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> {
self.db.set_room_state(room_id, shortstatehash, mutex_lock)
}
@ -343,7 +343,7 @@ impl Service {
&self,
room_id: &RoomId,
event_ids: Vec<OwnedEventId>,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> {
self.db
.set_forward_extremities(room_id, event_ids, state_lock)

View file

@ -4,6 +4,7 @@ use std::{
sync::{Arc, Mutex},
};
use conduit::utils::mutex_map;
use data::Data;
use lru_cache::LruCache;
use ruma::{
@ -22,7 +23,6 @@ use ruma::{
EventId, OwnedRoomAliasId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
};
use serde_json::value::to_raw_value;
use tokio::sync::MutexGuard;
use tracing::{error, warn};
use crate::{service::pdu::PduBuilder, services, Error, PduEvent, Result};
@ -285,7 +285,7 @@ impl Service {
}
pub async fn user_can_invite(
&self, room_id: &RoomId, sender: &UserId, target_user: &UserId, state_lock: &MutexGuard<'_, ()>,
&self, room_id: &RoomId, sender: &UserId, target_user: &UserId, state_lock: &mutex_map::Guard<()>,
) -> Result<bool> {
let content = to_raw_value(&RoomMemberEventContent::new(MembershipState::Invite))
.expect("Event content always serializes");

View file

@ -30,7 +30,7 @@ use ruma::{
};
use serde::Deserialize;
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tokio::sync::{Mutex, MutexGuard, RwLock};
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, error, info, warn};
use super::state_compressor::CompressedStateEvent;
@ -44,7 +44,7 @@ use crate::{
rooms::event_handler::parse_incoming_pdu,
},
services,
utils::{self},
utils::{self, mutex_map},
Error,
PduCount,
PduEvent,
@ -200,13 +200,13 @@ impl Service {
/// happens in `append_pdu`.
///
/// Returns pdu id
#[tracing::instrument(skip(self, pdu, pdu_json, leaves))]
#[tracing::instrument(skip_all)]
pub async fn append_pdu(
&self,
pdu: &PduEvent,
mut pdu_json: CanonicalJsonObject,
leaves: Vec<OwnedEventId>,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<Vec<u8>> {
// Coalesce database writes for the remainder of this scope.
let _cork = services().globals.db.cork_and_flush();
@ -581,7 +581,7 @@ impl Service {
pdu_builder: PduBuilder,
sender: &UserId,
room_id: &RoomId,
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
_mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<(PduEvent, CanonicalJsonObject)> {
let PduBuilder {
event_type,
@ -768,7 +768,7 @@ impl Service {
pdu_builder: PduBuilder,
sender: &UserId,
room_id: &RoomId,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<Arc<EventId>> {
let (pdu, pdu_json) = self.create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)?;
if let Some(admin_room) = admin::Service::get_admin_room()? {
@ -909,7 +909,7 @@ impl Service {
new_room_leaves: Vec<OwnedEventId>,
state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
soft_fail: bool,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<Option<Vec<u8>>> {
// We append to state before appending the pdu, so we don't have a moment in
// time with the pdu without it's state. This is okay because append_pdu can't