de-global services() from api

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-07-16 08:05:25 +00:00
parent 463f1a1287
commit 8b6018d77d
61 changed files with 1485 additions and 1320 deletions

View file

@ -5,6 +5,7 @@ use std::{
time::Duration,
};
use axum::extract::State;
use conduit::{
error,
utils::math::{ruma_from_u64, ruma_from_usize, usize_from_ruma, usize_from_u64_truncated},
@ -17,7 +18,7 @@ use ruma::{
self,
v3::{
Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom, LeftRoom, Presence,
RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
RoomAccountData, RoomSummary, Rooms, State as RoomState, Timeline, ToDevice,
},
v4::SlidingOp,
DeviceLists, UnreadNotificationsCount,
@ -34,7 +35,10 @@ use ruma::{
};
use tracing::{Instrument as _, Span};
use crate::{service::pdu::EventHash, services, utils, Error, PduEvent, Result, Ruma, RumaResponse};
use crate::{
service::{pdu::EventHash, Services},
utils, Error, PduEvent, Result, Ruma, RumaResponse,
};
/// # `GET /_matrix/client/r0/sync`
///
@ -72,23 +76,23 @@ use crate::{service::pdu::EventHash, services, utils, Error, PduEvent, Result, R
/// - If the user left after `since`: `prev_batch` token, empty state (TODO:
/// subset of the state at the point of the leave)
pub(crate) async fn sync_events_route(
body: Ruma<sync_events::v3::Request>,
State(services): State<crate::State>, body: Ruma<sync_events::v3::Request>,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
let sender_user = body.sender_user.expect("user is authenticated");
let sender_device = body.sender_device.expect("user is authenticated");
let body = body.body;
// Presence update
if services().globals.allow_local_presence() {
services()
if services.globals.allow_local_presence() {
services
.presence
.ping_presence(&sender_user, &body.set_presence)?;
}
// Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device);
let watcher = services.globals.watch(&sender_user, &sender_device);
let next_batch = services().globals.current_count()?;
let next_batch = services.globals.current_count()?;
let next_batchcount = PduCount::Normal(next_batch);
let next_batch_string = next_batch.to_string();
@ -96,7 +100,7 @@ pub(crate) async fn sync_events_route(
let filter = match body.filter {
None => FilterDefinition::default(),
Some(Filter::FilterDefinition(filter)) => filter,
Some(Filter::FilterId(filter_id)) => services()
Some(Filter::FilterId(filter_id)) => services
.users
.get_filter(&sender_user, &filter_id)?
.unwrap_or_default(),
@ -126,28 +130,29 @@ pub(crate) async fn sync_events_route(
// Look for device list updates of this account
device_list_updates.extend(
services()
services
.users
.keys_changed(sender_user.as_ref(), since, None)
.filter_map(Result::ok),
);
if services().globals.allow_local_presence() {
process_presence_updates(&mut presence_updates, since, &sender_user).await?;
if services.globals.allow_local_presence() {
process_presence_updates(services, &mut presence_updates, since, &sender_user).await?;
}
let all_joined_rooms = services()
let all_joined_rooms = services
.rooms
.state_cache
.rooms_joined(&sender_user)
.collect::<Vec<_>>();
// Coalesce database writes for the remainder of this scope.
let _cork = services().db.cork_and_flush();
let _cork = services.db.cork_and_flush();
for room_id in all_joined_rooms {
let room_id = room_id?;
if let Ok(joined_room) = load_joined_room(
services,
&sender_user,
&sender_device,
&room_id,
@ -170,13 +175,14 @@ pub(crate) async fn sync_events_route(
}
let mut left_rooms = BTreeMap::new();
let all_left_rooms: Vec<_> = services()
let all_left_rooms: Vec<_> = services
.rooms
.state_cache
.rooms_left(&sender_user)
.collect();
for result in all_left_rooms {
handle_left_room(
services,
since,
&result?.0,
&sender_user,
@ -190,7 +196,7 @@ pub(crate) async fn sync_events_route(
}
let mut invited_rooms = BTreeMap::new();
let all_invited_rooms: Vec<_> = services()
let all_invited_rooms: Vec<_> = services
.rooms
.state_cache
.rooms_invited(&sender_user)
@ -199,10 +205,10 @@ pub(crate) async fn sync_events_route(
let (room_id, invite_state_events) = result?;
// Get and drop the lock to wait for remaining operations to finish
let insert_lock = services().rooms.timeline.mutex_insert.lock(&room_id).await;
let insert_lock = services.rooms.timeline.mutex_insert.lock(&room_id).await;
drop(insert_lock);
let invite_count = services()
let invite_count = services
.rooms
.state_cache
.get_invite_count(&room_id, &sender_user)?;
@ -223,14 +229,14 @@ pub(crate) async fn sync_events_route(
}
for user_id in left_encrypted_users {
let dont_share_encrypted_room = services()
let dont_share_encrypted_room = services
.rooms
.user
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
.filter_map(Result::ok)
.filter_map(|other_room_id| {
Some(
services()
services
.rooms
.state_accessor
.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
@ -247,7 +253,7 @@ pub(crate) async fn sync_events_route(
}
// Remove all to-device events the device received *last time*
services()
services
.users
.remove_to_device_events(&sender_user, &sender_device, since)?;
@ -266,7 +272,7 @@ pub(crate) async fn sync_events_route(
.collect(),
},
account_data: GlobalAccountData {
events: services()
events: services
.account_data
.changes_since(None, &sender_user, since)?
.into_iter()
@ -281,11 +287,11 @@ pub(crate) async fn sync_events_route(
changed: device_list_updates.into_iter().collect(),
left: device_list_left.into_iter().collect(),
},
device_one_time_keys_count: services()
device_one_time_keys_count: services
.users
.count_one_time_keys(&sender_user, &sender_device)?,
to_device: ToDevice {
events: services()
events: services
.users
.get_to_device_events(&sender_user, &sender_device)?,
},
@ -311,16 +317,18 @@ pub(crate) async fn sync_events_route(
Ok(response)
}
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip_all, fields(user_id = %sender_user, room_id = %room_id), name = "left_room")]
async fn handle_left_room(
since: u64, room_id: &RoomId, sender_user: &UserId, left_rooms: &mut BTreeMap<ruma::OwnedRoomId, LeftRoom>,
next_batch_string: &str, full_state: bool, lazy_load_enabled: bool,
services: &Services, since: u64, room_id: &RoomId, sender_user: &UserId,
left_rooms: &mut BTreeMap<ruma::OwnedRoomId, LeftRoom>, next_batch_string: &str, full_state: bool,
lazy_load_enabled: bool,
) -> Result<()> {
// Get and drop the lock to wait for remaining operations to finish
let insert_lock = services().rooms.timeline.mutex_insert.lock(room_id).await;
let insert_lock = services.rooms.timeline.mutex_insert.lock(room_id).await;
drop(insert_lock);
let left_count = services()
let left_count = services
.rooms
.state_cache
.get_left_count(room_id, sender_user)?;
@ -330,11 +338,11 @@ async fn handle_left_room(
return Ok(());
}
if !services().rooms.metadata.exists(room_id)? {
if !services.rooms.metadata.exists(room_id)? {
// This is just a rejected invite, not a room we know
// Insert a leave event anyways
let event = PduEvent {
event_id: EventId::new(services().globals.server_name()).into(),
event_id: EventId::new(services.globals.server_name()).into(),
sender: sender_user.to_owned(),
origin: None,
origin_server_ts: utils::millis_since_unix_epoch()
@ -367,7 +375,7 @@ async fn handle_left_room(
prev_batch: Some(next_batch_string.to_owned()),
events: Vec::new(),
},
state: State {
state: RoomState {
events: vec![event.to_sync_state_event()],
},
},
@ -377,27 +385,27 @@ async fn handle_left_room(
let mut left_state_events = Vec::new();
let since_shortstatehash = services()
let since_shortstatehash = services
.rooms
.user
.get_token_shortstatehash(room_id, since)?;
let since_state_ids = match since_shortstatehash {
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
Some(s) => services.rooms.state_accessor.state_full_ids(s).await?,
None => HashMap::new(),
};
let Some(left_event_id) = services().rooms.state_accessor.room_state_get_id(
room_id,
&StateEventType::RoomMember,
sender_user.as_str(),
)?
let Some(left_event_id) =
services
.rooms
.state_accessor
.room_state_get_id(room_id, &StateEventType::RoomMember, sender_user.as_str())?
else {
error!("Left room but no left state event");
return Ok(());
};
let Some(left_shortstatehash) = services()
let Some(left_shortstatehash) = services
.rooms
.state_accessor
.pdu_shortstatehash(&left_event_id)?
@ -406,13 +414,13 @@ async fn handle_left_room(
return Ok(());
};
let mut left_state_ids = services()
let mut left_state_ids = services
.rooms
.state_accessor
.state_full_ids(left_shortstatehash)
.await?;
let leave_shortstatekey = services()
let leave_shortstatekey = services
.rooms
.short
.get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?;
@ -422,7 +430,7 @@ async fn handle_left_room(
let mut i: u8 = 0;
for (key, id) in left_state_ids {
if full_state || since_state_ids.get(&key) != Some(&id) {
let (event_type, state_key) = services().rooms.short.get_statekey_from_short(key)?;
let (event_type, state_key) = services.rooms.short.get_statekey_from_short(key)?;
if !lazy_load_enabled
|| event_type != StateEventType::RoomMember
@ -430,7 +438,7 @@ async fn handle_left_room(
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|| (cfg!(feature = "element_hacks") && *sender_user == state_key)
{
let Some(pdu) = services().rooms.timeline.get_pdu(&id)? else {
let Some(pdu) = services.rooms.timeline.get_pdu(&id)? else {
error!("Pdu in state not found: {}", id);
continue;
};
@ -456,7 +464,7 @@ async fn handle_left_room(
prev_batch: Some(next_batch_string.to_owned()),
events: Vec::new(),
},
state: State {
state: RoomState {
events: left_state_events,
},
},
@ -465,13 +473,13 @@ async fn handle_left_room(
}
async fn process_presence_updates(
presence_updates: &mut HashMap<OwnedUserId, PresenceEvent>, since: u64, syncing_user: &UserId,
services: &Services, presence_updates: &mut HashMap<OwnedUserId, PresenceEvent>, since: u64, syncing_user: &UserId,
) -> Result<()> {
use crate::service::presence::Presence;
// Take presence updates
for (user_id, _, presence_bytes) in services().presence.presence_since(since) {
if !services()
for (user_id, _, presence_bytes) in services.presence.presence_since(since) {
if !services
.rooms
.state_cache
.user_sees_user(syncing_user, &user_id)?
@ -513,19 +521,20 @@ async fn process_presence_updates(
#[allow(clippy::too_many_arguments)]
async fn load_joined_room(
sender_user: &UserId, sender_device: &DeviceId, room_id: &RoomId, since: u64, sincecount: PduCount,
next_batch: u64, next_batchcount: PduCount, lazy_load_enabled: bool, lazy_load_send_redundant: bool,
full_state: bool, device_list_updates: &mut HashSet<OwnedUserId>, left_encrypted_users: &mut HashSet<OwnedUserId>,
services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: &RoomId, since: u64,
sincecount: PduCount, next_batch: u64, next_batchcount: PduCount, lazy_load_enabled: bool,
lazy_load_send_redundant: bool, full_state: bool, device_list_updates: &mut HashSet<OwnedUserId>,
left_encrypted_users: &mut HashSet<OwnedUserId>,
) -> Result<JoinedRoom> {
// Get and drop the lock to wait for remaining operations to finish
// This will make sure the we have all events until next_batch
let insert_lock = services().rooms.timeline.mutex_insert.lock(room_id).await;
let insert_lock = services.rooms.timeline.mutex_insert.lock(room_id).await;
drop(insert_lock);
let (timeline_pdus, limited) = load_timeline(sender_user, room_id, sincecount, 10)?;
let (timeline_pdus, limited) = load_timeline(services, sender_user, room_id, sincecount, 10)?;
let send_notification_counts = !timeline_pdus.is_empty()
|| services()
|| services
.rooms
.user
.last_notification_read(sender_user, room_id)?
@ -536,7 +545,7 @@ async fn load_joined_room(
timeline_users.insert(event.sender.as_str().to_owned());
}
services()
services
.rooms
.lazy_loading
.lazy_load_confirm_delivery(sender_user, sender_device, room_id, sincecount)
@ -544,11 +553,11 @@ async fn load_joined_room(
// Database queries:
let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? else {
let Some(current_shortstatehash) = services.rooms.state.get_room_shortstatehash(room_id)? else {
return Err!(Database(error!("Room {room_id} has no state")));
};
let since_shortstatehash = services()
let since_shortstatehash = services
.rooms
.user
.get_token_shortstatehash(room_id, since)?;
@ -560,12 +569,12 @@ async fn load_joined_room(
} else {
// Calculates joined_member_count, invited_member_count and heroes
let calculate_counts = || {
let joined_member_count = services()
let joined_member_count = services
.rooms
.state_cache
.room_joined_count(room_id)?
.unwrap_or(0);
let invited_member_count = services()
let invited_member_count = services
.rooms
.state_cache
.room_invited_count(room_id)?
@ -578,7 +587,7 @@ async fn load_joined_room(
// Go through all PDUs and for each member event, check if the user is still
// joined or invited until we have 5 or we reach the end
for hero in services()
for hero in services
.rooms
.timeline
.all_pdus(sender_user, room_id)?
@ -594,8 +603,8 @@ async fn load_joined_room(
// The membership was and still is invite or join
if matches!(content.membership, MembershipState::Join | MembershipState::Invite)
&& (services().rooms.state_cache.is_joined(&user_id, room_id)?
|| services().rooms.state_cache.is_invited(&user_id, room_id)?)
&& (services.rooms.state_cache.is_joined(&user_id, room_id)?
|| services.rooms.state_cache.is_invited(&user_id, room_id)?)
{
Ok::<_, Error>(Some(user_id))
} else {
@ -622,7 +631,7 @@ async fn load_joined_room(
let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
.and_then(|shortstatehash| {
services()
services
.rooms
.state_accessor
.state_get(shortstatehash, &StateEventType::RoomMember, sender_user.as_str())
@ -643,7 +652,7 @@ async fn load_joined_room(
let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
let current_state_ids = services()
let current_state_ids = services
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
@ -654,13 +663,13 @@ async fn load_joined_room(
let mut i: u8 = 0;
for (shortstatekey, id) in current_state_ids {
let (event_type, state_key) = services()
let (event_type, state_key) = services
.rooms
.short
.get_statekey_from_short(shortstatekey)?;
if event_type != StateEventType::RoomMember {
let Some(pdu) = services().rooms.timeline.get_pdu(&id)? else {
let Some(pdu) = services.rooms.timeline.get_pdu(&id)? else {
error!("Pdu in state not found: {}", id);
continue;
};
@ -676,7 +685,7 @@ async fn load_joined_room(
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|| (cfg!(feature = "element_hacks") && *sender_user == state_key)
{
let Some(pdu) = services().rooms.timeline.get_pdu(&id)? else {
let Some(pdu) = services.rooms.timeline.get_pdu(&id)? else {
error!("Pdu in state not found: {}", id);
continue;
};
@ -695,14 +704,14 @@ async fn load_joined_room(
}
// Reset lazy loading because this is an initial sync
services()
services
.rooms
.lazy_loading
.lazy_load_reset(sender_user, sender_device, room_id)?;
// The state_events above should contain all timeline_users, let's mark them as
// lazy loaded.
services()
services
.rooms
.lazy_loading
.lazy_load_mark_sent(sender_user, sender_device, room_id, lazy_loaded, next_batchcount)
@ -716,12 +725,12 @@ async fn load_joined_room(
let mut delta_state_events = Vec::new();
if since_shortstatehash != current_shortstatehash {
let current_state_ids = services()
let current_state_ids = services
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
let since_state_ids = services()
let since_state_ids = services
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
@ -729,7 +738,7 @@ async fn load_joined_room(
for (key, id) in current_state_ids {
if full_state || since_state_ids.get(&key) != Some(&id) {
let Some(pdu) = services().rooms.timeline.get_pdu(&id)? else {
let Some(pdu) = services.rooms.timeline.get_pdu(&id)? else {
error!("Pdu in state not found: {}", id);
continue;
};
@ -740,13 +749,13 @@ async fn load_joined_room(
}
}
let encrypted_room = services()
let encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
.is_some();
let since_encryption = services().rooms.state_accessor.state_get(
let since_encryption = services.rooms.state_accessor.state_get(
since_shortstatehash,
&StateEventType::RoomEncryption,
"",
@ -781,7 +790,7 @@ async fn load_joined_room(
match new_membership {
MembershipState::Join => {
// A new user joined an encrypted room
if !share_encrypted_room(sender_user, &user_id, room_id)? {
if !share_encrypted_room(services, sender_user, &user_id, room_id)? {
device_list_updates.insert(user_id);
}
},
@ -798,7 +807,7 @@ async fn load_joined_room(
if joined_since_last_sync && encrypted_room || new_encrypted_room {
// If the user is in a new encrypted room, give them all joined users
device_list_updates.extend(
services()
services
.rooms
.state_cache
.room_members(room_id)
@ -810,7 +819,7 @@ async fn load_joined_room(
.filter(|user_id| {
// Only send keys if the sender doesn't share an encrypted room with the target
// already
!share_encrypted_room(sender_user, user_id, room_id).unwrap_or(false)
!share_encrypted_room(services, sender_user, user_id, room_id).unwrap_or(false)
}),
);
}
@ -848,14 +857,14 @@ async fn load_joined_room(
continue;
}
if !services().rooms.lazy_loading.lazy_load_was_sent_before(
if !services.rooms.lazy_loading.lazy_load_was_sent_before(
sender_user,
sender_device,
room_id,
&event.sender,
)? || lazy_load_send_redundant
{
if let Some(member_event) = services().rooms.state_accessor.room_state_get(
if let Some(member_event) = services.rooms.state_accessor.room_state_get(
room_id,
&StateEventType::RoomMember,
event.sender.as_str(),
@ -866,7 +875,7 @@ async fn load_joined_room(
}
}
services()
services
.rooms
.lazy_loading
.lazy_load_mark_sent(sender_user, sender_device, room_id, lazy_loaded, next_batchcount)
@ -884,7 +893,7 @@ async fn load_joined_room(
// Look for device list updates in this room
device_list_updates.extend(
services()
services
.users
.keys_changed(room_id.as_ref(), since, None)
.filter_map(Result::ok),
@ -892,7 +901,7 @@ async fn load_joined_room(
let notification_count = if send_notification_counts {
Some(
services()
services
.rooms
.user
.notification_count(sender_user, room_id)?
@ -905,7 +914,7 @@ async fn load_joined_room(
let highlight_count = if send_notification_counts {
Some(
services()
services
.rooms
.user
.highlight_count(sender_user, room_id)?
@ -933,7 +942,7 @@ async fn load_joined_room(
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect();
let mut edus: Vec<_> = services()
let mut edus: Vec<_> = services
.rooms
.read_receipt
.readreceipts_since(room_id, since)
@ -941,10 +950,10 @@ async fn load_joined_room(
.map(|(_, _, v)| v)
.collect();
if services().rooms.typing.last_typing_update(room_id).await? > since {
if services.rooms.typing.last_typing_update(room_id).await? > since {
edus.push(
serde_json::from_str(
&serde_json::to_string(&services().rooms.typing.typings_all(room_id).await?)
&serde_json::to_string(&services.rooms.typing.typings_all(room_id).await?)
.expect("event is valid, we just created it"),
)
.expect("event is valid, we just created it"),
@ -953,14 +962,14 @@ async fn load_joined_room(
// Save the state after this sync so we can send the correct state diff next
// sync
services()
services
.rooms
.user
.associate_token_shortstatehash(room_id, next_batch, current_shortstatehash)?;
Ok(JoinedRoom {
account_data: RoomAccountData {
events: services()
events: services
.account_data
.changes_since(Some(room_id), sender_user, since)?
.into_iter()
@ -985,7 +994,7 @@ async fn load_joined_room(
prev_batch,
events: room_events,
},
state: State {
state: RoomState {
events: state_events
.iter()
.map(|pdu| pdu.to_sync_state_event())
@ -999,16 +1008,16 @@ async fn load_joined_room(
}
fn load_timeline(
sender_user: &UserId, room_id: &RoomId, roomsincecount: PduCount, limit: u64,
services: &Services, sender_user: &UserId, room_id: &RoomId, roomsincecount: PduCount, limit: u64,
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
let timeline_pdus;
let limited = if services()
let limited = if services
.rooms
.timeline
.last_timeline_count(sender_user, room_id)?
> roomsincecount
{
let mut non_timeline_pdus = services()
let mut non_timeline_pdus = services
.rooms
.timeline
.pdus_until(sender_user, room_id, PduCount::max())?
@ -1040,8 +1049,10 @@ fn load_timeline(
Ok((timeline_pdus, limited))
}
fn share_encrypted_room(sender_user: &UserId, user_id: &UserId, ignore_room: &RoomId) -> Result<bool> {
Ok(services()
fn share_encrypted_room(
services: &Services, sender_user: &UserId, user_id: &UserId, ignore_room: &RoomId,
) -> Result<bool> {
Ok(services
.rooms
.user
.get_shared_rooms(vec![sender_user.to_owned(), user_id.to_owned()])?
@ -1049,7 +1060,7 @@ fn share_encrypted_room(sender_user: &UserId, user_id: &UserId, ignore_room: &Ro
.filter(|room_id| room_id != ignore_room)
.filter_map(|other_room_id| {
Some(
services()
services
.rooms
.state_accessor
.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
@ -1064,15 +1075,15 @@ fn share_encrypted_room(sender_user: &UserId, user_id: &UserId, ignore_room: &Ro
///
/// Sliding Sync endpoint (future endpoint: `/_matrix/client/v4/sync`)
pub(crate) async fn sync_events_v4_route(
body: Ruma<sync_events::v4::Request>,
State(services): State<crate::State>, body: Ruma<sync_events::v4::Request>,
) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
let sender_user = body.sender_user.expect("user is authenticated");
let sender_device = body.sender_device.expect("user is authenticated");
let mut body = body.body;
// Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device);
let watcher = services.globals.watch(&sender_user, &sender_device);
let next_batch = services().globals.next_count()?;
let next_batch = services.globals.next_count()?;
let globalsince = body
.pos
@ -1082,21 +1093,19 @@ pub(crate) async fn sync_events_v4_route(
if globalsince == 0 {
if let Some(conn_id) = &body.conn_id {
services().users.forget_sync_request_connection(
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
);
services
.users
.forget_sync_request_connection(sender_user.clone(), sender_device.clone(), conn_id.clone());
}
}
// Get sticky parameters from cache
let known_rooms =
services()
services
.users
.update_sync_request_with_cache(sender_user.clone(), sender_device.clone(), &mut body);
let all_joined_rooms = services()
let all_joined_rooms = services
.rooms
.state_cache
.rooms_joined(&sender_user)
@ -1104,7 +1113,7 @@ pub(crate) async fn sync_events_v4_route(
.collect::<Vec<_>>();
if body.extensions.to_device.enabled.unwrap_or(false) {
services()
services
.users
.remove_to_device_events(&sender_user, &sender_device, globalsince)?;
}
@ -1116,26 +1125,26 @@ pub(crate) async fn sync_events_v4_route(
if body.extensions.e2ee.enabled.unwrap_or(false) {
// Look for device list updates of this account
device_list_changes.extend(
services()
services
.users
.keys_changed(sender_user.as_ref(), globalsince, None)
.filter_map(Result::ok),
);
for room_id in &all_joined_rooms {
let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? else {
let Some(current_shortstatehash) = services.rooms.state.get_room_shortstatehash(room_id)? else {
error!("Room {} has no state", room_id);
continue;
};
let since_shortstatehash = services()
let since_shortstatehash = services
.rooms
.user
.get_token_shortstatehash(room_id, globalsince)?;
let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
.and_then(|shortstatehash| {
services()
services
.rooms
.state_accessor
.state_get(shortstatehash, &StateEventType::RoomMember, sender_user.as_str())
@ -1148,7 +1157,7 @@ pub(crate) async fn sync_events_v4_route(
.ok()
});
let encrypted_room = services()
let encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
@ -1160,7 +1169,7 @@ pub(crate) async fn sync_events_v4_route(
continue;
}
let since_encryption = services().rooms.state_accessor.state_get(
let since_encryption = services.rooms.state_accessor.state_get(
since_shortstatehash,
&StateEventType::RoomEncryption,
"",
@ -1171,12 +1180,12 @@ pub(crate) async fn sync_events_v4_route(
let new_encrypted_room = encrypted_room && since_encryption.is_none();
if encrypted_room {
let current_state_ids = services()
let current_state_ids = services
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
let since_state_ids = services()
let since_state_ids = services
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
@ -1184,7 +1193,7 @@ pub(crate) async fn sync_events_v4_route(
for (key, id) in current_state_ids {
if since_state_ids.get(&key) != Some(&id) {
let Some(pdu) = services().rooms.timeline.get_pdu(&id)? else {
let Some(pdu) = services.rooms.timeline.get_pdu(&id)? else {
error!("Pdu in state not found: {}", id);
continue;
};
@ -1205,7 +1214,7 @@ pub(crate) async fn sync_events_v4_route(
match new_membership {
MembershipState::Join => {
// A new user joined an encrypted room
if !share_encrypted_room(&sender_user, &user_id, room_id)? {
if !share_encrypted_room(services, &sender_user, &user_id, room_id)? {
device_list_changes.insert(user_id);
}
},
@ -1222,7 +1231,7 @@ pub(crate) async fn sync_events_v4_route(
if joined_since_last_sync || new_encrypted_room {
// If the user is in a new encrypted room, give them all joined users
device_list_changes.extend(
services()
services
.rooms
.state_cache
.room_members(room_id)
@ -1234,7 +1243,7 @@ pub(crate) async fn sync_events_v4_route(
.filter(|user_id| {
// Only send keys if the sender doesn't share an encrypted room with the target
// already
!share_encrypted_room(&sender_user, user_id, room_id).unwrap_or(false)
!share_encrypted_room(services, &sender_user, user_id, room_id).unwrap_or(false)
}),
);
}
@ -1242,21 +1251,21 @@ pub(crate) async fn sync_events_v4_route(
}
// Look for device list updates in this room
device_list_changes.extend(
services()
services
.users
.keys_changed(room_id.as_ref(), globalsince, None)
.filter_map(Result::ok),
);
}
for user_id in left_encrypted_users {
let dont_share_encrypted_room = services()
let dont_share_encrypted_room = services
.rooms
.user
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
.filter_map(Result::ok)
.filter_map(|other_room_id| {
Some(
services()
services
.rooms
.state_accessor
.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
@ -1336,7 +1345,7 @@ pub(crate) async fn sync_events_v4_route(
);
if let Some(conn_id) = &body.conn_id {
services().users.update_sync_known_rooms(
services.users.update_sync_known_rooms(
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
@ -1349,7 +1358,7 @@ pub(crate) async fn sync_events_v4_route(
let mut known_subscription_rooms = BTreeSet::new();
for (room_id, room) in &body.room_subscriptions {
if !services().rooms.metadata.exists(room_id)? {
if !services.rooms.metadata.exists(room_id)? {
continue;
}
let todo_room = todo_rooms
@ -1375,7 +1384,7 @@ pub(crate) async fn sync_events_v4_route(
}
if let Some(conn_id) = &body.conn_id {
services().users.update_sync_known_rooms(
services.users.update_sync_known_rooms(
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
@ -1386,7 +1395,7 @@ pub(crate) async fn sync_events_v4_route(
}
if let Some(conn_id) = &body.conn_id {
services().users.update_sync_subscriptions(
services.users.update_sync_subscriptions(
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
@ -1398,7 +1407,7 @@ pub(crate) async fn sync_events_v4_route(
for (room_id, (required_state_request, timeline_limit, roomsince)) in &todo_rooms {
let roomsincecount = PduCount::Normal(*roomsince);
let (timeline_pdus, limited) = load_timeline(&sender_user, room_id, roomsincecount, *timeline_limit)?;
let (timeline_pdus, limited) = load_timeline(services, &sender_user, room_id, roomsincecount, *timeline_limit)?;
if roomsince != &0 && timeline_pdus.is_empty() {
continue;
@ -1431,7 +1440,7 @@ pub(crate) async fn sync_events_v4_route(
let required_state = required_state_request
.iter()
.map(|state| {
services()
services
.rooms
.state_accessor
.room_state_get(room_id, &state.0, &state.1)
@ -1442,7 +1451,7 @@ pub(crate) async fn sync_events_v4_route(
.collect();
// Heroes
let heroes = services()
let heroes = services
.rooms
.state_cache
.room_members(room_id)
@ -1450,7 +1459,7 @@ pub(crate) async fn sync_events_v4_route(
.filter(|member| member != &sender_user)
.map(|member| {
Ok::<_, Error>(
services()
services
.rooms
.state_accessor
.get_member(room_id, &member)?
@ -1491,11 +1500,11 @@ pub(crate) async fn sync_events_v4_route(
rooms.insert(
room_id.clone(),
sync_events::v4::SlidingSyncRoom {
name: services().rooms.state_accessor.get_name(room_id)?.or(name),
name: services.rooms.state_accessor.get_name(room_id)?.or(name),
avatar: if let Some(heroes_avatar) = heroes_avatar {
ruma::JsOption::Some(heroes_avatar)
} else {
match services().rooms.state_accessor.get_avatar(room_id)? {
match services.rooms.state_accessor.get_avatar(room_id)? {
ruma::JsOption::Some(avatar) => ruma::JsOption::from_option(avatar.url),
ruma::JsOption::Null => ruma::JsOption::Null,
ruma::JsOption::Undefined => ruma::JsOption::Undefined,
@ -1506,7 +1515,7 @@ pub(crate) async fn sync_events_v4_route(
invite_state: None,
unread_notifications: UnreadNotificationsCount {
highlight_count: Some(
services()
services
.rooms
.user
.highlight_count(&sender_user, room_id)?
@ -1514,7 +1523,7 @@ pub(crate) async fn sync_events_v4_route(
.expect("notification count can't go that high"),
),
notification_count: Some(
services()
services
.rooms
.user
.notification_count(&sender_user, room_id)?
@ -1527,7 +1536,7 @@ pub(crate) async fn sync_events_v4_route(
prev_batch,
limited,
joined_count: Some(
services()
services
.rooms
.state_cache
.room_joined_count(room_id)?
@ -1536,7 +1545,7 @@ pub(crate) async fn sync_events_v4_route(
.unwrap_or_else(|_| uint!(0)),
),
invited_count: Some(
services()
services
.rooms
.state_cache
.room_invited_count(room_id)?
@ -1571,7 +1580,7 @@ pub(crate) async fn sync_events_v4_route(
extensions: sync_events::v4::Extensions {
to_device: if body.extensions.to_device.enabled.unwrap_or(false) {
Some(sync_events::v4::ToDevice {
events: services()
events: services
.users
.get_to_device_events(&sender_user, &sender_device)?,
next_batch: next_batch.to_string(),
@ -1584,7 +1593,7 @@ pub(crate) async fn sync_events_v4_route(
changed: device_list_changes.into_iter().collect(),
left: device_list_left.into_iter().collect(),
},
device_one_time_keys_count: services()
device_one_time_keys_count: services
.users
.count_one_time_keys(&sender_user, &sender_device)?,
// Fallback keys are not yet supported
@ -1592,7 +1601,7 @@ pub(crate) async fn sync_events_v4_route(
},
account_data: sync_events::v4::AccountData {
global: if body.extensions.account_data.enabled.unwrap_or(false) {
services()
services
.account_data
.changes_since(None, &sender_user, globalsince)?
.into_iter()