diff --git a/src/admin/query/account_data.rs b/src/admin/query/account_data.rs index ea45eb16..91217334 100644 --- a/src/admin/query/account_data.rs +++ b/src/admin/query/account_data.rs @@ -1,5 +1,6 @@ use clap::Subcommand; use conduit::Result; +use futures::StreamExt; use ruma::{events::room::message::RoomMessageEventContent, RoomId, UserId}; use crate::Command; @@ -39,10 +40,11 @@ pub(super) async fn process(subcommand: AccountDataCommand, context: &Command<'_ room_id, } => { let timer = tokio::time::Instant::now(); - let results = services + let results: Vec<_> = services .account_data .changes_since(room_id.as_deref(), &user_id, since) - .await?; + .collect() + .await; let query_time = timer.elapsed(); Ok(RoomMessageEventContent::notice_markdown(format!( diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 7a78ea74..614970f0 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -275,10 +275,9 @@ pub(crate) async fn sync_events_route( events: services .account_data .changes_since(None, &sender_user, since) - .await? - .into_iter() - .filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) - .collect(), + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) + .collect() + .await, }, device_lists: DeviceLists { changed: device_list_updates.into_iter().collect(), @@ -1023,10 +1022,9 @@ async fn load_joined_room( events: services .account_data .changes_since(Some(room_id), sender_user, since) - .await? - .into_iter() - .filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) - .collect(), + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect() + .await, }, summary: RoomSummary { heroes, diff --git a/src/api/client/sync/v4.rs b/src/api/client/sync/v4.rs index 57edc953..78b0b277 100644 --- a/src/api/client/sync/v4.rs +++ b/src/api/client/sync/v4.rs @@ -136,10 +136,9 @@ pub(crate) async fn sync_events_v4_route( account_data.global = services .account_data .changes_since(None, sender_user, globalsince) - .await? - .into_iter() - .filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) - .collect(); + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) + .collect() + .await; if let Some(rooms) = body.extensions.account_data.rooms { for room in rooms { @@ -148,10 +147,9 @@ pub(crate) async fn sync_events_v4_route( services .account_data .changes_since(Some(&room), sender_user, globalsince) - .await? - .into_iter() - .filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) - .collect(), + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect() + .await, ); } } @@ -487,10 +485,9 @@ pub(crate) async fn sync_events_v4_route( services .account_data .changes_since(Some(room_id), sender_user, *roomsince) - .await? - .into_iter() - .filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) - .collect(), + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect() + .await, ); let vector: Vec<_> = services diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index ac3f5f83..b752f9b8 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -1,12 +1,12 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use conduit::{ - implement, - utils::{stream::TryIgnore, ReadyExt}, - Err, Error, Result, + err, implement, + utils::{result::LogErr, stream::TryIgnore, ReadyExt}, + Err, Result, }; -use database::{Deserialized, Handle, Json, Map}; -use futures::{StreamExt, TryFutureExt}; +use database::{Deserialized, Handle, Interfix, Json, Map}; +use futures::{Stream, StreamExt, TryFutureExt}; use ruma::{ events::{ AnyGlobalAccountDataEvent, AnyRawAccountDataEvent, AnyRoomAccountDataEvent, GlobalAccountDataEventType, @@ -112,46 +112,27 @@ pub async fn get_raw(&self, room_id: Option<&RoomId>, user_id: &UserId, kind: &s /// Returns all changes to the account data that happened after `since`. #[implement(Service)] -pub async fn changes_since( - &self, room_id: Option<&RoomId>, user_id: &UserId, since: u64, -) -> Result> { - let mut userdata = HashMap::new(); - - let mut prefix = room_id - .map(ToString::to_string) - .unwrap_or_default() - .as_bytes() - .to_vec(); - prefix.push(0xFF); - prefix.extend_from_slice(user_id.as_bytes()); - prefix.push(0xFF); +pub fn changes_since<'a>( + &'a self, room_id: Option<&'a RoomId>, user_id: &'a UserId, since: u64, +) -> impl Stream + Send + 'a { + let prefix = (room_id, user_id, Interfix); + let prefix = database::serialize_to_vec(prefix).expect("failed to serialize prefix"); // Skip the data that's exactly at since, because we sent that last time - let mut first_possible = prefix.clone(); - first_possible.extend_from_slice(&(since.saturating_add(1)).to_be_bytes()); + let first_possible = (room_id, user_id, since.saturating_add(1)); self.db .roomuserdataid_accountdata - .raw_stream_from(&first_possible) + .stream_from_raw(&first_possible) .ignore_err() .ready_take_while(move |(k, _)| k.starts_with(&prefix)) - .map(|(k, v)| { - let v = match room_id { - None => serde_json::from_slice::>(v) - .map(AnyRawAccountDataEvent::Global) - .map_err(|_| Error::bad_database("Database contains invalid account data."))?, - Some(_) => serde_json::from_slice::>(v) - .map(AnyRawAccountDataEvent::Room) - .map_err(|_| Error::bad_database("Database contains invalid account data."))?, - }; - - Ok((k.to_owned(), v)) + .map(move |(_, v)| { + match room_id { + Some(_) => serde_json::from_slice::>(v).map(AnyRawAccountDataEvent::Room), + None => serde_json::from_slice::>(v).map(AnyRawAccountDataEvent::Global), + } + .map_err(|e| err!(Database("Database contains invalid account data: {e}"))) + .log_err() }) .ignore_err() - .ready_for_each(|(kind, data)| { - userdata.insert(kind, data); - }) - .await; - - Ok(userdata.into_values().collect()) }