reduce roomid_mutex_insert
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
8b68d6306c
commit
22272bdc16
3 changed files with 20 additions and 66 deletions
|
@ -1,7 +1,6 @@
|
||||||
use std::{
|
use std::{
|
||||||
cmp::Ordering,
|
cmp::Ordering,
|
||||||
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
|
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
|
||||||
sync::Arc,
|
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -197,20 +196,9 @@ pub(crate) async fn sync_events_route(
|
||||||
for result in all_invited_rooms {
|
for result in all_invited_rooms {
|
||||||
let (room_id, invite_state_events) = result?;
|
let (room_id, invite_state_events) = result?;
|
||||||
|
|
||||||
{
|
// Get and drop the lock to wait for remaining operations to finish
|
||||||
// Get and drop the lock to wait for remaining operations to finish
|
let insert_lock = services().globals.roomid_mutex_insert.lock(&room_id).await;
|
||||||
let mutex_insert = Arc::clone(
|
drop(insert_lock);
|
||||||
services()
|
|
||||||
.globals
|
|
||||||
.roomid_mutex_insert
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.entry(room_id.clone())
|
|
||||||
.or_default(),
|
|
||||||
);
|
|
||||||
let insert_lock = mutex_insert.lock().await;
|
|
||||||
drop(insert_lock);
|
|
||||||
};
|
|
||||||
|
|
||||||
let invite_count = services()
|
let invite_count = services()
|
||||||
.rooms
|
.rooms
|
||||||
|
@ -332,20 +320,9 @@ async fn handle_left_room(
|
||||||
since: u64, room_id: &RoomId, sender_user: &UserId, left_rooms: &mut BTreeMap<ruma::OwnedRoomId, LeftRoom>,
|
since: u64, room_id: &RoomId, sender_user: &UserId, left_rooms: &mut BTreeMap<ruma::OwnedRoomId, LeftRoom>,
|
||||||
next_batch_string: &str, full_state: bool, lazy_load_enabled: bool,
|
next_batch_string: &str, full_state: bool, lazy_load_enabled: bool,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
{
|
// Get and drop the lock to wait for remaining operations to finish
|
||||||
// Get and drop the lock to wait for remaining operations to finish
|
let insert_lock = services().globals.roomid_mutex_insert.lock(room_id).await;
|
||||||
let mutex_insert = Arc::clone(
|
drop(insert_lock);
|
||||||
services()
|
|
||||||
.globals
|
|
||||||
.roomid_mutex_insert
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.entry(room_id.to_owned())
|
|
||||||
.or_default(),
|
|
||||||
);
|
|
||||||
let insert_lock = mutex_insert.lock().await;
|
|
||||||
drop(insert_lock);
|
|
||||||
};
|
|
||||||
|
|
||||||
let left_count = services()
|
let left_count = services()
|
||||||
.rooms
|
.rooms
|
||||||
|
@ -544,21 +521,10 @@ async fn load_joined_room(
|
||||||
next_batch: u64, next_batchcount: PduCount, lazy_load_enabled: bool, lazy_load_send_redundant: bool,
|
next_batch: u64, next_batchcount: PduCount, lazy_load_enabled: bool, lazy_load_send_redundant: bool,
|
||||||
full_state: bool, device_list_updates: &mut HashSet<OwnedUserId>, left_encrypted_users: &mut HashSet<OwnedUserId>,
|
full_state: bool, device_list_updates: &mut HashSet<OwnedUserId>, left_encrypted_users: &mut HashSet<OwnedUserId>,
|
||||||
) -> Result<JoinedRoom> {
|
) -> Result<JoinedRoom> {
|
||||||
{
|
// Get and drop the lock to wait for remaining operations to finish
|
||||||
// Get and drop the lock to wait for remaining operations to finish
|
// This will make sure the we have all events until next_batch
|
||||||
// This will make sure the we have all events until next_batch
|
let insert_lock = services().globals.roomid_mutex_insert.lock(room_id).await;
|
||||||
let mutex_insert = Arc::clone(
|
drop(insert_lock);
|
||||||
services()
|
|
||||||
.globals
|
|
||||||
.roomid_mutex_insert
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.entry(room_id.to_owned())
|
|
||||||
.or_default(),
|
|
||||||
);
|
|
||||||
let insert_lock = mutex_insert.lock().await;
|
|
||||||
drop(insert_lock);
|
|
||||||
};
|
|
||||||
|
|
||||||
let (timeline_pdus, limited) = load_timeline(sender_user, room_id, sincecount, 10)?;
|
let (timeline_pdus, limited) = load_timeline(sender_user, room_id, sincecount, 10)?;
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use base64::{engine::general_purpose, Engine as _};
|
use base64::{engine::general_purpose, Engine as _};
|
||||||
|
use conduit::utils;
|
||||||
use data::Data;
|
use data::Data;
|
||||||
use hickory_resolver::TokioAsyncResolver;
|
use hickory_resolver::TokioAsyncResolver;
|
||||||
use ipaddress::IPAddress;
|
use ipaddress::IPAddress;
|
||||||
|
@ -33,6 +34,7 @@ use tokio::{
|
||||||
};
|
};
|
||||||
use tracing::{error, trace};
|
use tracing::{error, trace};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
use utils::MutexMap;
|
||||||
|
|
||||||
use crate::{services, Config, Result};
|
use crate::{services, Config, Result};
|
||||||
|
|
||||||
|
@ -52,7 +54,7 @@ pub struct Service {
|
||||||
pub bad_event_ratelimiter: Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
|
pub bad_event_ratelimiter: Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
|
||||||
pub bad_signature_ratelimiter: Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
|
pub bad_signature_ratelimiter: Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
|
||||||
pub bad_query_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
|
pub bad_query_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
|
||||||
pub roomid_mutex_insert: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
|
pub roomid_mutex_insert: MutexMap<OwnedRoomId, ()>,
|
||||||
pub roomid_mutex_state: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
|
pub roomid_mutex_state: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
|
||||||
pub roomid_mutex_federation: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>, // this lock will be held longer
|
pub roomid_mutex_federation: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>, // this lock will be held longer
|
||||||
pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>,
|
pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>,
|
||||||
|
@ -115,7 +117,7 @@ impl Service {
|
||||||
bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
|
bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
|
||||||
bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
|
bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
|
||||||
roomid_mutex_state: RwLock::new(HashMap::new()),
|
roomid_mutex_state: RwLock::new(HashMap::new()),
|
||||||
roomid_mutex_insert: RwLock::new(HashMap::new()),
|
roomid_mutex_insert: MutexMap::<OwnedRoomId, ()>::new(),
|
||||||
roomid_mutex_federation: RwLock::new(HashMap::new()),
|
roomid_mutex_federation: RwLock::new(HashMap::new()),
|
||||||
roomid_federationhandletime: RwLock::new(HashMap::new()),
|
roomid_federationhandletime: RwLock::new(HashMap::new()),
|
||||||
updates_handle: Mutex::new(None),
|
updates_handle: Mutex::new(None),
|
||||||
|
|
|
@ -271,16 +271,11 @@ impl Service {
|
||||||
.state
|
.state
|
||||||
.set_forward_extremities(&pdu.room_id, leaves, state_lock)?;
|
.set_forward_extremities(&pdu.room_id, leaves, state_lock)?;
|
||||||
|
|
||||||
let mutex_insert = Arc::clone(
|
let insert_lock = services()
|
||||||
services()
|
.globals
|
||||||
.globals
|
.roomid_mutex_insert
|
||||||
.roomid_mutex_insert
|
.lock(&pdu.room_id)
|
||||||
.write()
|
.await;
|
||||||
.await
|
|
||||||
.entry(pdu.room_id.clone())
|
|
||||||
.or_default(),
|
|
||||||
);
|
|
||||||
let insert_lock = mutex_insert.lock().await;
|
|
||||||
|
|
||||||
let count1 = services().globals.next_count()?;
|
let count1 = services().globals.next_count()?;
|
||||||
// Mark as read first so the sending client doesn't get a notification even if
|
// Mark as read first so the sending client doesn't get a notification even if
|
||||||
|
@ -1165,16 +1160,7 @@ impl Service {
|
||||||
.get_shortroomid(&room_id)?
|
.get_shortroomid(&room_id)?
|
||||||
.expect("room exists");
|
.expect("room exists");
|
||||||
|
|
||||||
let mutex_insert = Arc::clone(
|
let insert_lock = services().globals.roomid_mutex_insert.lock(&room_id).await;
|
||||||
services()
|
|
||||||
.globals
|
|
||||||
.roomid_mutex_insert
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.entry(room_id.clone())
|
|
||||||
.or_default(),
|
|
||||||
);
|
|
||||||
let insert_lock = mutex_insert.lock().await;
|
|
||||||
|
|
||||||
let count = services().globals.next_count()?;
|
let count = services().globals.next_count()?;
|
||||||
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
|
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue