From 106bcd30b75b6846be197fc5431063b0b82c4336 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 2 Feb 2025 07:40:08 +0000 Subject: [PATCH] optimize incremental sync state diff Signed-off-by: Jason Volk --- src/api/client/sync/v3.rs | 366 +++++++++-------- src/service/rooms/state_accessor/mod.rs | 523 +++++++++++++----------- 2 files changed, 474 insertions(+), 415 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index b548aa23..a97e4329 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -7,13 +7,13 @@ use std::{ use axum::extract::State; use conduwuit::{ at, err, error, extract_variant, is_equal_to, pair_of, - pdu::EventHash, + pdu::{Event, EventHash}, + ref_at, result::FlatOk, utils::{ self, - future::OptionExt, math::ruma_from_u64, - stream::{BroadbandExt, Tools, WidebandExt}, + stream::{BroadbandExt, Tools, TryExpect, WidebandExt}, BoolExt, IterStream, ReadyExt, TryFutureExtExt, }, PduCount, PduEvent, Result, @@ -53,19 +53,16 @@ use ruma::{ serde::Raw, uint, DeviceId, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, }; +use service::rooms::short::{ShortEventId, ShortStateKey}; use super::{load_timeline, share_encrypted_room}; -use crate::{ - client::{ignored_filter, lazy_loading_witness}, - Ruma, RumaResponse, -}; +use crate::{client::ignored_filter, Ruma, RumaResponse}; #[derive(Default)] struct StateChanges { heroes: Option>, joined_member_count: Option, invited_member_count: Option, - joined_since_last_sync: bool, state_events: Vec, device_list_updates: HashSet, left_encrypted_users: HashSet, @@ -625,6 +622,40 @@ async fn load_joined_room( .await?; let (timeline_pdus, limited) = timeline; + let initial = since_shortstatehash.is_none(); + let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled() + || filter.room.timeline.lazy_load_options.is_enabled(); + + let lazy_loading_context = &lazy_loading::Context { + user_id: sender_user, + device_id: sender_device, + room_id, + token: Some(since), + options: Some(&filter.room.state.lazy_load_options), + }; + + // Reset lazy loading because this is an initial sync + let lazy_load_reset: OptionFuture<_> = initial + .then(|| services.rooms.lazy_loading.reset(lazy_loading_context)) + .into(); + + lazy_load_reset.await; + let witness: OptionFuture<_> = lazy_loading_enabled + .then(|| { + let witness: Witness = timeline_pdus + .iter() + .map(ref_at!(1)) + .map(Event::sender) + .map(Into::into) + .chain(receipt_events.keys().map(Into::into)) + .collect(); + + services + .rooms + .lazy_loading + .witness_retain(witness, lazy_loading_context) + }) + .into(); let last_notification_read: OptionFuture<_> = timeline_pdus .is_empty() @@ -646,41 +677,20 @@ async fn load_joined_room( }) .into(); + let (last_notification_read, since_sender_member, witness) = + join3(last_notification_read, since_sender_member, witness).await; + let joined_since_last_sync = since_sender_member - .await .flatten() .is_none_or(|content: RoomMemberEventContent| { content.membership != MembershipState::Join }); - let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled() - || filter.room.timeline.lazy_load_options.is_enabled(); - - let lazy_reset = since_shortstatehash.is_none(); - let lazy_loading_context = &lazy_loading::Context { - user_id: sender_user, - device_id: sender_device, - room_id, - token: None, - options: Some(&filter.room.state.lazy_load_options), - }; - - // Reset lazy loading because this is an initial sync - let lazy_load_reset: OptionFuture<_> = lazy_reset - .then(|| services.rooms.lazy_loading.reset(lazy_loading_context)) - .into(); - - lazy_load_reset.await; - let witness: OptionFuture<_> = lazy_loading_enabled - .then(|| lazy_loading_witness(services, lazy_loading_context, timeline_pdus.iter())) - .into(); - let StateChanges { heroes, joined_member_count, invited_member_count, - joined_since_last_sync, state_events, mut device_list_updates, left_encrypted_users, @@ -693,7 +703,7 @@ async fn load_joined_room( since_shortstatehash, current_shortstatehash, joined_since_last_sync, - witness.await.as_ref(), + witness.as_ref(), ) .boxed() .await?; @@ -719,28 +729,7 @@ async fn load_joined_room( .map(|(_, pdu)| pdu.to_sync_room_event()) .collect(); - let typing_events = services - .rooms - .typing - .last_typing_update(room_id) - .and_then(|count| async move { - if count <= since { - return Ok(Vec::>::new()); - } - - let typings = services - .rooms - .typing - .typings_all(room_id, sender_user) - .await?; - - Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?]) - }) - .unwrap_or(Vec::new()); - - let send_notification_counts = last_notification_read - .is_none_or(|&count| count > since) - .await; + let send_notification_counts = last_notification_read.is_none_or(|count| count > since); let notification_count: OptionFuture<_> = send_notification_counts .then(|| { @@ -764,8 +753,27 @@ async fn load_joined_room( }) .into(); - let events = join3(room_events, account_data_events, typing_events); + let typing_events = services + .rooms + .typing + .last_typing_update(room_id) + .and_then(|count| async move { + if count <= since { + return Ok(Vec::>::new()); + } + + let typings = services + .rooms + .typing + .typings_all(room_id, sender_user) + .await?; + + Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?]) + }) + .unwrap_or(Vec::new()); + let unread_notifications = join(notification_count, highlight_count); + let events = join3(room_events, account_data_events, typing_events); let (unread_notifications, events, device_updates) = join3(unread_notifications, events, device_updates) .boxed() @@ -942,7 +950,6 @@ async fn calculate_state_initial( heroes, joined_member_count, invited_member_count, - joined_since_last_sync: true, state_events, ..Default::default() }) @@ -952,7 +959,7 @@ async fn calculate_state_initial( #[allow(clippy::too_many_arguments)] async fn calculate_state_incremental<'a>( services: &Services, - sender_user: &UserId, + sender_user: &'a UserId, room_id: &RoomId, full_state: bool, _filter: &FilterDefinition, @@ -965,102 +972,130 @@ async fn calculate_state_incremental<'a>( let state_changed = since_shortstatehash != current_shortstatehash; - let state_get_id = |user_id: &'a UserId| { - services - .rooms - .state_accessor - .state_get_id(current_shortstatehash, &StateEventType::RoomMember, user_id.as_str()) - .ok() - }; - - let lazy_state_ids: OptionFuture<_> = witness - .map(|witness| { - witness - .iter() - .stream() - .broad_filter_map(|user_id| state_get_id(user_id)) - .collect::>() - }) - .into(); - - let current_state_ids: OptionFuture<_> = state_changed - .then(|| { - services - .rooms - .state_accessor - .state_full_ids(current_shortstatehash) - .collect::>() - }) - .into(); - - let since_state_ids: OptionFuture<_> = (state_changed && !full_state) - .then(|| { - services - .rooms - .state_accessor - .state_full_ids(since_shortstatehash) - .collect::>() - }) - .into(); - - let lazy_state_ids = lazy_state_ids - .map(Option::into_iter) - .map(|iter| iter.flat_map(Vec::into_iter)) - .map(IterStream::stream) - .flatten_stream(); - - let ref since_state_ids = since_state_ids.shared(); - let delta_state_events = current_state_ids - .map(Option::into_iter) - .map(|iter| iter.flat_map(Vec::into_iter)) - .map(IterStream::stream) - .flatten_stream() - .filter_map(|(shortstatekey, event_id): (u64, OwnedEventId)| async move { - since_state_ids - .clone() - .await - .is_none_or(|since_state| since_state.get(&shortstatekey) != Some(&event_id)) - .then_some(event_id) - }) - .chain(lazy_state_ids) - .broad_filter_map(|event_id: OwnedEventId| async move { - services - .rooms - .timeline - .get_pdu(&event_id) - .await - .map(move |pdu| (event_id, pdu)) - .ok() - }) - .collect::>(); - - let since_encryption = services - .rooms - .state_accessor - .state_get(since_shortstatehash, &StateEventType::RoomEncryption, "") - .is_ok(); - let encrypted_room = services .rooms .state_accessor .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") - .is_ok(); + .is_ok() + .await; - let (delta_state_events, encrypted_room) = join(delta_state_events, encrypted_room).await; + let state_get_shorteventid = |user_id: &'a UserId| { + services + .rooms + .state_accessor + .state_get_shortid( + current_shortstatehash, + &StateEventType::RoomMember, + user_id.as_str(), + ) + .ok() + }; - let (mut device_list_updates, left_encrypted_users) = delta_state_events - .values() + let lazy_state_ids: OptionFuture<_> = witness + .filter(|_| !full_state && !encrypted_room) + .map(|witness| { + witness + .iter() + .stream() + .broad_filter_map(|user_id| state_get_shorteventid(user_id)) + .into_future() + }) + .into(); + + let state_diff: OptionFuture<_> = (!full_state && state_changed) + .then(|| { + services + .rooms + .state_accessor + .state_added((since_shortstatehash, current_shortstatehash)) + .boxed() + .into_future() + }) + .into(); + + let current_state_ids: OptionFuture<_> = full_state + .then(|| { + services + .rooms + .state_accessor + .state_full_shortids(current_shortstatehash) + .expect_ok() + .boxed() + .into_future() + }) + .into(); + + let lazy_state_ids = lazy_state_ids + .map(|opt| { + opt.map(|(curr, next)| { + let opt = curr; + let iter = Option::into_iter(opt); + IterStream::stream(iter).chain(next) + }) + }) + .map(Option::into_iter) + .map(IterStream::stream) + .flatten_stream() + .flatten(); + + let state_diff_ids = state_diff + .map(|opt| { + opt.map(|(curr, next)| { + let opt = curr; + let iter = Option::into_iter(opt); + IterStream::stream(iter).chain(next) + }) + }) + .map(Option::into_iter) + .map(IterStream::stream) + .flatten_stream() + .flatten(); + + let state_events = current_state_ids + .map(|opt| { + opt.map(|(curr, next)| { + let opt = curr; + let iter = Option::into_iter(opt); + IterStream::stream(iter).chain(next) + }) + }) + .map(Option::into_iter) + .map(IterStream::stream) + .flatten_stream() + .flatten() + .chain(state_diff_ids) + .broad_filter_map(|(shortstatekey, shorteventid)| async move { + if witness.is_none() || encrypted_room { + return Some(shorteventid); + } + + lazy_filter(services, sender_user, shortstatekey, shorteventid).await + }) + .chain(lazy_state_ids) + .broad_filter_map(|shorteventid| { + services + .rooms + .short + .get_eventid_from_short(shorteventid) + .ok() + }) + .broad_filter_map(|event_id: OwnedEventId| async move { + services.rooms.timeline.get_pdu(&event_id).await.ok() + }) + .collect::>() + .await; + + let (device_list_updates, left_encrypted_users) = state_events + .iter() .stream() .ready_filter(|_| encrypted_room) .ready_filter(|state_event| state_event.kind == RoomMember) .ready_filter_map(|state_event| { - let content = state_event.get_content().ok()?; - let user_id = state_event.state_key.as_ref()?.parse().ok()?; + let content: RoomMemberEventContent = state_event.get_content().ok()?; + let user_id: OwnedUserId = state_event.state_key.as_ref()?.parse().ok()?; + Some((content, user_id)) }) - .ready_filter(|(_, user_id): &(RoomMemberEventContent, OwnedUserId)| { - user_id != sender_user - }) .fold_default(|(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| async move { use MembershipState::*; @@ -1068,8 +1103,9 @@ async fn calculate_state_incremental<'a>( |user_id| share_encrypted_room(services, sender_user, user_id, Some(room_id)); match content.membership { - | Join if !shares_encrypted_room(&user_id).await => dlu.insert(user_id), | Leave => leu.insert(user_id), + | Join if joined_since_last_sync || !shares_encrypted_room(&user_id).await => + dlu.insert(user_id), | _ => false, }; @@ -1077,29 +1113,7 @@ async fn calculate_state_incremental<'a>( }) .await; - // If the user is in a new encrypted room, give them all joined users - let new_encrypted_room = encrypted_room && !since_encryption.await; - if joined_since_last_sync && encrypted_room || new_encrypted_room { - services - .rooms - .state_cache - .room_members(room_id) - .ready_filter(|&user_id| sender_user != user_id) - .map(ToOwned::to_owned) - .broad_filter_map(|user_id| async move { - share_encrypted_room(services, sender_user, &user_id, Some(room_id)) - .await - .or_some(user_id) - }) - .ready_for_each(|user_id| { - device_list_updates.insert(user_id); - }) - .await; - } - - let send_member_count = delta_state_events - .values() - .any(|event| event.kind == RoomMember); + let send_member_count = state_events.iter().any(|event| event.kind == RoomMember); let (joined_member_count, invited_member_count, heroes) = if send_member_count { calculate_counts(services, room_id, sender_user).await? @@ -1111,13 +1125,29 @@ async fn calculate_state_incremental<'a>( heroes, joined_member_count, invited_member_count, - joined_since_last_sync, + state_events, device_list_updates, left_encrypted_users, - state_events: delta_state_events.into_values().collect(), }) } +async fn lazy_filter( + services: &Services, + sender_user: &UserId, + shortstatekey: ShortStateKey, + shorteventid: ShortEventId, +) -> Option { + let (event_type, state_key) = services + .rooms + .short + .get_statekey_from_short(shortstatekey) + .await + .ok()?; + + (event_type != StateEventType::RoomMember || state_key == sender_user.as_str()) + .then_some(shorteventid) +} + async fn calculate_counts( services: &Services, room_id: &RoomId, diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 8b56c8b6..bed8d210 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -6,7 +6,7 @@ use std::{ }; use conduwuit::{ - at, err, error, + at, err, error, pair_of, pdu::PduBuilder, utils, utils::{ @@ -17,7 +17,7 @@ use conduwuit::{ Err, Error, PduEvent, Result, }; use database::{Deserialized, Map}; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt}; +use futures::{future::try_join, FutureExt, Stream, StreamExt, TryFutureExt}; use lru_cache::LruCache; use ruma::{ events::{ @@ -48,7 +48,7 @@ use crate::{ rooms::{ short::{ShortEventId, ShortStateHash, ShortStateKey}, state::RoomMutexGuard, - state_compressor::{compress_state_event, parse_compressed_state_event}, + state_compressor::{compress_state_event, parse_compressed_state_event, CompressedState}, }, Dep, }; @@ -143,6 +143,256 @@ impl crate::Service for Service { } impl Service { + /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). + pub async fn room_state_get_content( + &self, + room_id: &RoomId, + event_type: &StateEventType, + state_key: &str, + ) -> Result + where + T: for<'de> Deserialize<'de>, + { + self.room_state_get(room_id, event_type, state_key) + .await + .and_then(|event| event.get_content()) + } + + /// Returns the full room state. + #[tracing::instrument(skip(self), level = "debug")] + pub fn room_state_full<'a>( + &'a self, + room_id: &'a RoomId, + ) -> impl Stream> + Send + 'a { + self.services + .state + .get_room_shortstatehash(room_id) + .map_ok(|shortstatehash| self.state_full(shortstatehash).map(Ok)) + .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}"))) + .try_flatten_stream() + } + + /// Returns the full room state pdus + #[tracing::instrument(skip(self), level = "debug")] + pub fn room_state_full_pdus<'a>( + &'a self, + room_id: &'a RoomId, + ) -> impl Stream> + Send + 'a { + self.services + .state + .get_room_shortstatehash(room_id) + .map_ok(|shortstatehash| self.state_full_pdus(shortstatehash).map(Ok)) + .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}"))) + .try_flatten_stream() + } + + /// Returns a single EventId from `room_id` with key (`event_type`, + /// `state_key`). + #[tracing::instrument(skip(self), level = "debug")] + pub async fn room_state_get_id( + &self, + room_id: &RoomId, + event_type: &StateEventType, + state_key: &str, + ) -> Result + where + Id: for<'de> Deserialize<'de> + Sized + ToOwned, + ::Owned: Borrow, + { + self.services + .state + .get_room_shortstatehash(room_id) + .and_then(|shortstatehash| self.state_get_id(shortstatehash, event_type, state_key)) + .await + } + + /// Returns a single PDU from `room_id` with key (`event_type`, + /// `state_key`). + #[tracing::instrument(skip(self), level = "debug")] + pub async fn room_state_get( + &self, + room_id: &RoomId, + event_type: &StateEventType, + state_key: &str, + ) -> Result { + self.services + .state + .get_room_shortstatehash(room_id) + .and_then(|shortstatehash| self.state_get(shortstatehash, event_type, state_key)) + .await + } + + /// The user was a joined member at this state (potentially in the past) + #[inline] + async fn user_was_joined(&self, shortstatehash: ShortStateHash, user_id: &UserId) -> bool { + self.user_membership(shortstatehash, user_id).await == MembershipState::Join + } + + /// The user was an invited or joined room member at this state (potentially + /// in the past) + #[inline] + async fn user_was_invited(&self, shortstatehash: ShortStateHash, user_id: &UserId) -> bool { + let s = self.user_membership(shortstatehash, user_id).await; + s == MembershipState::Join || s == MembershipState::Invite + } + + /// Get membership for given user in state + async fn user_membership( + &self, + shortstatehash: ShortStateHash, + user_id: &UserId, + ) -> MembershipState { + self.state_get_content(shortstatehash, &StateEventType::RoomMember, user_id.as_str()) + .await + .map_or(MembershipState::Leave, |c: RoomMemberEventContent| c.membership) + } + + /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). + pub async fn state_get_content( + &self, + shortstatehash: ShortStateHash, + event_type: &StateEventType, + state_key: &str, + ) -> Result + where + T: for<'de> Deserialize<'de>, + { + self.state_get(shortstatehash, event_type, state_key) + .await + .and_then(|event| event.get_content()) + } + + #[tracing::instrument(skip(self), level = "debug")] + pub async fn state_contains( + &self, + shortstatehash: ShortStateHash, + event_type: &StateEventType, + state_key: &str, + ) -> bool { + let Ok(shortstatekey) = self + .services + .short + .get_shortstatekey(event_type, state_key) + .await + else { + return false; + }; + + self.state_contains_shortstatekey(shortstatehash, shortstatekey) + .await + } + + #[tracing::instrument(skip(self), level = "debug")] + pub async fn state_contains_shortstatekey( + &self, + shortstatehash: ShortStateHash, + shortstatekey: ShortStateKey, + ) -> bool { + let start = compress_state_event(shortstatekey, 0); + let end = compress_state_event(shortstatekey, u64::MAX); + + self.load_full_state(shortstatehash) + .map_ok(|full_state| full_state.range(start..end).next().copied()) + .await + .flat_ok() + .is_some() + } + + /// Returns a single PDU from `room_id` with key (`event_type`, + /// `state_key`). + pub async fn state_get( + &self, + shortstatehash: ShortStateHash, + event_type: &StateEventType, + state_key: &str, + ) -> Result { + self.state_get_id(shortstatehash, event_type, state_key) + .and_then(|event_id: OwnedEventId| async move { + self.services.timeline.get_pdu(&event_id).await + }) + .await + } + + /// Returns a single EventId from `room_id` with key (`event_type`, + /// `state_key`). + #[tracing::instrument(skip(self), level = "debug")] + pub async fn state_get_id( + &self, + shortstatehash: ShortStateHash, + event_type: &StateEventType, + state_key: &str, + ) -> Result + where + Id: for<'de> Deserialize<'de> + Sized + ToOwned, + ::Owned: Borrow, + { + let shorteventid = self + .state_get_shortid(shortstatehash, event_type, state_key) + .await?; + + self.services + .short + .get_eventid_from_short(shorteventid) + .await + } + + /// Returns a single EventId from `room_id` with key (`event_type`, + /// `state_key`). + #[tracing::instrument(skip(self), level = "debug")] + pub async fn state_get_shortid( + &self, + shortstatehash: ShortStateHash, + event_type: &StateEventType, + state_key: &str, + ) -> Result { + let shortstatekey = self + .services + .short + .get_shortstatekey(event_type, state_key) + .await?; + + let start = compress_state_event(shortstatekey, 0); + let end = compress_state_event(shortstatekey, u64::MAX); + self.load_full_state(shortstatehash) + .map_ok(|full_state| { + full_state + .range(start..end) + .next() + .copied() + .map(parse_compressed_state_event) + .map(at!(1)) + .ok_or(err!(Request(NotFound("Not found in room state")))) + }) + .await? + } + + /// Returns the state events removed between the interval (present in .0 but + /// not in .1) + #[inline] + pub fn state_removed( + &self, + shortstatehash: pair_of!(ShortStateHash), + ) -> impl Stream + Send + '_ { + self.state_added((shortstatehash.1, shortstatehash.0)) + } + + /// Returns the state events added between the interval (present in .1 but + /// not in .0) + #[tracing::instrument(skip(self), level = "debug")] + pub fn state_added<'a>( + &'a self, + shortstatehash: pair_of!(ShortStateHash), + ) -> impl Stream + Send + 'a { + let a = self.load_full_state(shortstatehash.0); + let b = self.load_full_state(shortstatehash.1); + try_join(a, b) + .map_ok(|(a, b)| b.difference(&a).copied().collect::>()) + .map_ok(IterStream::try_stream) + .try_flatten_stream() + .expect_ok() + .map(parse_compressed_state_event) + } + pub fn state_full( &self, shortstatehash: ShortStateHash, @@ -208,110 +458,11 @@ impl Service { .ready_filter_map(|(event_id, shortstatekey)| Some((shortstatekey, event_id.ok()?))) } - /// Returns a single EventId from `room_id` with key (`event_type`, - /// `state_key`). - #[tracing::instrument(skip(self), level = "debug")] - pub async fn state_get_id( - &self, - shortstatehash: ShortStateHash, - event_type: &StateEventType, - state_key: &str, - ) -> Result - where - Id: for<'de> Deserialize<'de> + Sized + ToOwned, - ::Owned: Borrow, - { - let shorteventid = self - .state_get_shortid(shortstatehash, event_type, state_key) - .await?; - - self.services - .short - .get_eventid_from_short(shorteventid) - .await - } - - /// Returns a single EventId from `room_id` with key (`event_type`, - /// `state_key`). - #[tracing::instrument(skip(self), level = "debug")] - pub async fn state_get_shortid( - &self, - shortstatehash: ShortStateHash, - event_type: &StateEventType, - state_key: &str, - ) -> Result { - let shortstatekey = self - .services - .short - .get_shortstatekey(event_type, state_key) - .await?; - - let start = compress_state_event(shortstatekey, 0); - let end = compress_state_event(shortstatekey, u64::MAX); - self.services - .state_compressor - .load_shortstatehash_info(shortstatehash) - .map_ok(|vec| vec.last().expect("at least one layer").full_state.clone()) - .map_ok(|full_state| { - full_state - .range(start..end) - .next() - .copied() - .map(parse_compressed_state_event) - .map(at!(1)) - .ok_or(err!(Request(NotFound("Not found in room state")))) - }) - .await? - } - - #[tracing::instrument(skip(self), level = "debug")] - pub async fn state_contains( - &self, - shortstatehash: ShortStateHash, - event_type: &StateEventType, - state_key: &str, - ) -> bool { - let Ok(shortstatekey) = self - .services - .short - .get_shortstatekey(event_type, state_key) - .await - else { - return false; - }; - - self.state_contains_shortstatekey(shortstatehash, shortstatekey) - .await - } - - #[tracing::instrument(skip(self), level = "debug")] - pub async fn state_contains_shortstatekey( - &self, - shortstatehash: ShortStateHash, - shortstatekey: ShortStateKey, - ) -> bool { - let start = compress_state_event(shortstatekey, 0); - let end = compress_state_event(shortstatekey, u64::MAX); - - self.services - .state_compressor - .load_shortstatehash_info(shortstatehash) - .map_ok(|vec| vec.last().expect("at least one layer").full_state.clone()) - .map_ok(|full_state| full_state.range(start..end).next().copied()) - .await - .flat_ok() - .is_some() - } - pub fn state_full_shortids( &self, shortstatehash: ShortStateHash, ) -> impl Stream> + Send + '_ { - self.services - .state_compressor - .load_shortstatehash_info(shortstatehash) - .map_err(|e| err!(Database("Missing state IDs: {e}"))) - .map_ok(|vec| vec.last().expect("at least one layer").full_state.clone()) + self.load_full_state(shortstatehash) .map_ok(|full_state| { full_state .deref() @@ -324,59 +475,32 @@ impl Service { .try_flatten_stream() } - /// Returns a single PDU from `room_id` with key (`event_type`, - /// `state_key`). - pub async fn state_get( + async fn load_full_state( &self, shortstatehash: ShortStateHash, - event_type: &StateEventType, - state_key: &str, - ) -> Result { - self.state_get_id(shortstatehash, event_type, state_key) - .and_then(|event_id: OwnedEventId| async move { - self.services.timeline.get_pdu(&event_id).await + ) -> Result> { + self.services + .state_compressor + .load_shortstatehash_info(shortstatehash) + .map_err(|e| err!(Database("Missing state IDs: {e}"))) + .map_ok(|vec| vec.last().expect("at least one layer").full_state.clone()) + .await + } + + /// Returns the state hash for this pdu. + pub async fn pdu_shortstatehash(&self, event_id: &EventId) -> Result { + const BUFSIZE: usize = size_of::(); + + self.services + .short + .get_shorteventid(event_id) + .and_then(|shorteventid| { + self.db + .shorteventid_shortstatehash + .aqry::(&shorteventid) }) .await - } - - /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). - pub async fn state_get_content( - &self, - shortstatehash: ShortStateHash, - event_type: &StateEventType, - state_key: &str, - ) -> Result - where - T: for<'de> Deserialize<'de>, - { - self.state_get(shortstatehash, event_type, state_key) - .await - .and_then(|event| event.get_content()) - } - - /// Get membership for given user in state - async fn user_membership( - &self, - shortstatehash: ShortStateHash, - user_id: &UserId, - ) -> MembershipState { - self.state_get_content(shortstatehash, &StateEventType::RoomMember, user_id.as_str()) - .await - .map_or(MembershipState::Leave, |c: RoomMemberEventContent| c.membership) - } - - /// The user was a joined member at this state (potentially in the past) - #[inline] - async fn user_was_joined(&self, shortstatehash: ShortStateHash, user_id: &UserId) -> bool { - self.user_membership(shortstatehash, user_id).await == MembershipState::Join - } - - /// The user was an invited or joined room member at this state (potentially - /// in the past) - #[inline] - async fn user_was_invited(&self, shortstatehash: ShortStateHash, user_id: &UserId) -> bool { - let s = self.user_membership(shortstatehash, user_id).await; - s == MembershipState::Join || s == MembershipState::Invite + .deserialized() } /// Whether a server is allowed to see an event through federation, based on @@ -521,101 +645,6 @@ impl Service { } } - /// Returns the state hash for this pdu. - pub async fn pdu_shortstatehash(&self, event_id: &EventId) -> Result { - const BUFSIZE: usize = size_of::(); - - self.services - .short - .get_shorteventid(event_id) - .and_then(|shorteventid| { - self.db - .shorteventid_shortstatehash - .aqry::(&shorteventid) - }) - .await - .deserialized() - } - - /// Returns the full room state. - #[tracing::instrument(skip(self), level = "debug")] - pub fn room_state_full<'a>( - &'a self, - room_id: &'a RoomId, - ) -> impl Stream> + Send + 'a { - self.services - .state - .get_room_shortstatehash(room_id) - .map_ok(|shortstatehash| self.state_full(shortstatehash).map(Ok)) - .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}"))) - .try_flatten_stream() - } - - /// Returns the full room state pdus - #[tracing::instrument(skip(self), level = "debug")] - pub fn room_state_full_pdus<'a>( - &'a self, - room_id: &'a RoomId, - ) -> impl Stream> + Send + 'a { - self.services - .state - .get_room_shortstatehash(room_id) - .map_ok(|shortstatehash| self.state_full_pdus(shortstatehash).map(Ok)) - .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}"))) - .try_flatten_stream() - } - - /// Returns a single EventId from `room_id` with key (`event_type`, - /// `state_key`). - #[tracing::instrument(skip(self), level = "debug")] - pub async fn room_state_get_id( - &self, - room_id: &RoomId, - event_type: &StateEventType, - state_key: &str, - ) -> Result - where - Id: for<'de> Deserialize<'de> + Sized + ToOwned, - ::Owned: Borrow, - { - self.services - .state - .get_room_shortstatehash(room_id) - .and_then(|shortstatehash| self.state_get_id(shortstatehash, event_type, state_key)) - .await - } - - /// Returns a single PDU from `room_id` with key (`event_type`, - /// `state_key`). - #[tracing::instrument(skip(self), level = "debug")] - pub async fn room_state_get( - &self, - room_id: &RoomId, - event_type: &StateEventType, - state_key: &str, - ) -> Result { - self.services - .state - .get_room_shortstatehash(room_id) - .and_then(|shortstatehash| self.state_get(shortstatehash, event_type, state_key)) - .await - } - - /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). - pub async fn room_state_get_content( - &self, - room_id: &RoomId, - event_type: &StateEventType, - state_key: &str, - ) -> Result - where - T: for<'de> Deserialize<'de>, - { - self.room_state_get(room_id, event_type, state_key) - .await - .and_then(|event| event.get_content()) - } - pub async fn get_name(&self, room_id: &RoomId) -> Result { self.room_state_get_content(room_id, &StateEventType::RoomName, "") .await