refactor account_data.changes_since to stream

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-11-19 03:08:09 +00:00
parent fd4c447a2d
commit 5da42fb859
4 changed files with 39 additions and 61 deletions

View file

@ -1,5 +1,6 @@
use clap::Subcommand; use clap::Subcommand;
use conduit::Result; use conduit::Result;
use futures::StreamExt;
use ruma::{events::room::message::RoomMessageEventContent, RoomId, UserId}; use ruma::{events::room::message::RoomMessageEventContent, RoomId, UserId};
use crate::Command; use crate::Command;
@ -39,10 +40,11 @@ pub(super) async fn process(subcommand: AccountDataCommand, context: &Command<'_
room_id, room_id,
} => { } => {
let timer = tokio::time::Instant::now(); let timer = tokio::time::Instant::now();
let results = services let results: Vec<_> = services
.account_data .account_data
.changes_since(room_id.as_deref(), &user_id, since) .changes_since(room_id.as_deref(), &user_id, since)
.await?; .collect()
.await;
let query_time = timer.elapsed(); let query_time = timer.elapsed();
Ok(RoomMessageEventContent::notice_markdown(format!( Ok(RoomMessageEventContent::notice_markdown(format!(

View file

@ -275,10 +275,9 @@ pub(crate) async fn sync_events_route(
events: services events: services
.account_data .account_data
.changes_since(None, &sender_user, since) .changes_since(None, &sender_user, since)
.await? .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.into_iter() .collect()
.filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) .await,
.collect(),
}, },
device_lists: DeviceLists { device_lists: DeviceLists {
changed: device_list_updates.into_iter().collect(), changed: device_list_updates.into_iter().collect(),
@ -1023,10 +1022,9 @@ async fn load_joined_room(
events: services events: services
.account_data .account_data
.changes_since(Some(room_id), sender_user, since) .changes_since(Some(room_id), sender_user, since)
.await? .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.into_iter() .collect()
.filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .await,
.collect(),
}, },
summary: RoomSummary { summary: RoomSummary {
heroes, heroes,

View file

@ -136,10 +136,9 @@ pub(crate) async fn sync_events_v4_route(
account_data.global = services account_data.global = services
.account_data .account_data
.changes_since(None, sender_user, globalsince) .changes_since(None, sender_user, globalsince)
.await? .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.into_iter() .collect()
.filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) .await;
.collect();
if let Some(rooms) = body.extensions.account_data.rooms { if let Some(rooms) = body.extensions.account_data.rooms {
for room in rooms { for room in rooms {
@ -148,10 +147,9 @@ pub(crate) async fn sync_events_v4_route(
services services
.account_data .account_data
.changes_since(Some(&room), sender_user, globalsince) .changes_since(Some(&room), sender_user, globalsince)
.await? .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.into_iter() .collect()
.filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .await,
.collect(),
); );
} }
} }
@ -487,10 +485,9 @@ pub(crate) async fn sync_events_v4_route(
services services
.account_data .account_data
.changes_since(Some(room_id), sender_user, *roomsince) .changes_since(Some(room_id), sender_user, *roomsince)
.await? .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.into_iter() .collect()
.filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .await,
.collect(),
); );
let vector: Vec<_> = services let vector: Vec<_> = services

View file

@ -1,12 +1,12 @@
use std::{collections::HashMap, sync::Arc}; use std::sync::Arc;
use conduit::{ use conduit::{
implement, err, implement,
utils::{stream::TryIgnore, ReadyExt}, utils::{result::LogErr, stream::TryIgnore, ReadyExt},
Err, Error, Result, Err, Result,
}; };
use database::{Deserialized, Handle, Json, Map}; use database::{Deserialized, Handle, Interfix, Json, Map};
use futures::{StreamExt, TryFutureExt}; use futures::{Stream, StreamExt, TryFutureExt};
use ruma::{ use ruma::{
events::{ events::{
AnyGlobalAccountDataEvent, AnyRawAccountDataEvent, AnyRoomAccountDataEvent, GlobalAccountDataEventType, AnyGlobalAccountDataEvent, AnyRawAccountDataEvent, AnyRoomAccountDataEvent, GlobalAccountDataEventType,
@ -112,46 +112,27 @@ pub async fn get_raw(&self, room_id: Option<&RoomId>, user_id: &UserId, kind: &s
/// Returns all changes to the account data that happened after `since`. /// Returns all changes to the account data that happened after `since`.
#[implement(Service)] #[implement(Service)]
pub async fn changes_since( pub fn changes_since<'a>(
&self, room_id: Option<&RoomId>, user_id: &UserId, since: u64, &'a self, room_id: Option<&'a RoomId>, user_id: &'a UserId, since: u64,
) -> Result<Vec<AnyRawAccountDataEvent>> { ) -> impl Stream<Item = AnyRawAccountDataEvent> + Send + 'a {
let mut userdata = HashMap::new(); let prefix = (room_id, user_id, Interfix);
let prefix = database::serialize_to_vec(prefix).expect("failed to serialize prefix");
let mut prefix = room_id
.map(ToString::to_string)
.unwrap_or_default()
.as_bytes()
.to_vec();
prefix.push(0xFF);
prefix.extend_from_slice(user_id.as_bytes());
prefix.push(0xFF);
// Skip the data that's exactly at since, because we sent that last time // Skip the data that's exactly at since, because we sent that last time
let mut first_possible = prefix.clone(); let first_possible = (room_id, user_id, since.saturating_add(1));
first_possible.extend_from_slice(&(since.saturating_add(1)).to_be_bytes());
self.db self.db
.roomuserdataid_accountdata .roomuserdataid_accountdata
.raw_stream_from(&first_possible) .stream_from_raw(&first_possible)
.ignore_err() .ignore_err()
.ready_take_while(move |(k, _)| k.starts_with(&prefix)) .ready_take_while(move |(k, _)| k.starts_with(&prefix))
.map(|(k, v)| { .map(move |(_, v)| {
let v = match room_id { match room_id {
None => serde_json::from_slice::<Raw<AnyGlobalAccountDataEvent>>(v) Some(_) => serde_json::from_slice::<Raw<AnyRoomAccountDataEvent>>(v).map(AnyRawAccountDataEvent::Room),
.map(AnyRawAccountDataEvent::Global) None => serde_json::from_slice::<Raw<AnyGlobalAccountDataEvent>>(v).map(AnyRawAccountDataEvent::Global),
.map_err(|_| Error::bad_database("Database contains invalid account data."))?, }
Some(_) => serde_json::from_slice::<Raw<AnyRoomAccountDataEvent>>(v) .map_err(|e| err!(Database("Database contains invalid account data: {e}")))
.map(AnyRawAccountDataEvent::Room) .log_err()
.map_err(|_| Error::bad_database("Database contains invalid account data."))?,
};
Ok((k.to_owned(), v))
}) })
.ignore_err() .ignore_err()
.ready_for_each(|(kind, data)| {
userdata.insert(kind, data);
})
.await;
Ok(userdata.into_values().collect())
} }