From 5e59ce37c4799c24723997326e1ccc26bb3345b0 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 31 Jan 2025 13:51:39 +0000 Subject: [PATCH] snapshot sync results at next_batch upper-bound Signed-off-by: Jason Volk --- src/admin/query/account_data.rs | 2 +- src/admin/query/users.rs | 2 +- src/api/client/sync/v3.rs | 10 +++++----- src/api/client/sync/v4.rs | 13 +++++++++---- src/api/client/sync/v5.rs | 8 ++++---- src/service/account_data/mod.rs | 12 +++++++----- src/service/users/mod.rs | 18 ++++++++++++++---- 7 files changed, 41 insertions(+), 24 deletions(-) diff --git a/src/admin/query/account_data.rs b/src/admin/query/account_data.rs index b75d8234..bb8ddeff 100644 --- a/src/admin/query/account_data.rs +++ b/src/admin/query/account_data.rs @@ -41,7 +41,7 @@ async fn changes_since( let results: Vec<_> = self .services .account_data - .changes_since(room_id.as_deref(), &user_id, since) + .changes_since(room_id.as_deref(), &user_id, since, None) .collect() .await; let query_time = timer.elapsed(); diff --git a/src/admin/query/users.rs b/src/admin/query/users.rs index 3715ac25..c517d9dd 100644 --- a/src/admin/query/users.rs +++ b/src/admin/query/users.rs @@ -413,7 +413,7 @@ async fn get_to_device_events( let result = self .services .users - .get_to_device_events(&user_id, &device_id) + .get_to_device_events(&user_id, &device_id, None, None) .collect::>() .await; let query_time = timer.elapsed(); diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 49246514..b548aa23 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -290,20 +290,20 @@ pub(crate) async fn build_sync_events( let account_data = services .account_data - .changes_since(None, sender_user, since) + .changes_since(None, sender_user, since, Some(next_batch)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) .collect(); // Look for device list updates of this account let keys_changed = services .users - .keys_changed(sender_user, since, None) + .keys_changed(sender_user, since, Some(next_batch)) .map(ToOwned::to_owned) .collect::>(); let to_device_events = services .users - .get_to_device_events(sender_user, sender_device) + .get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch)) .collect::>(); let device_one_time_keys_count = services @@ -700,14 +700,14 @@ async fn load_joined_room( let account_data_events = services .account_data - .changes_since(Some(room_id), sender_user, since) + .changes_since(Some(room_id), sender_user, since, Some(next_batch)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect(); // Look for device list updates in this room let device_updates = services .users - .room_keys_changed(room_id, since, None) + .room_keys_changed(room_id, since, Some(next_batch)) .map(|(user_id, _)| user_id) .map(ToOwned::to_owned) .collect::>(); diff --git a/src/api/client/sync/v4.rs b/src/api/client/sync/v4.rs index b7967498..66793ba1 100644 --- a/src/api/client/sync/v4.rs +++ b/src/api/client/sync/v4.rs @@ -153,7 +153,7 @@ pub(crate) async fn sync_events_v4_route( if body.extensions.account_data.enabled.unwrap_or(false) { account_data.global = services .account_data - .changes_since(None, sender_user, globalsince) + .changes_since(None, sender_user, globalsince, Some(next_batch)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) .collect() .await; @@ -164,7 +164,7 @@ pub(crate) async fn sync_events_v4_route( room.clone(), services .account_data - .changes_since(Some(&room), sender_user, globalsince) + .changes_since(Some(&room), sender_user, globalsince, Some(next_batch)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect() .await, @@ -531,7 +531,7 @@ pub(crate) async fn sync_events_v4_route( room_id.to_owned(), services .account_data - .changes_since(Some(room_id), sender_user, *roomsince) + .changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect() .await, @@ -779,7 +779,12 @@ pub(crate) async fn sync_events_v4_route( Some(sync_events::v4::ToDevice { events: services .users - .get_to_device_events(sender_user, &sender_device) + .get_to_device_events( + sender_user, + &sender_device, + Some(globalsince), + Some(next_batch), + ) .collect() .await, next_batch: next_batch.to_string(), diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 66647f0e..e7b5fe74 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -390,7 +390,7 @@ async fn process_rooms( room_id.to_owned(), services .account_data - .changes_since(Some(room_id), sender_user, *roomsince) + .changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect() .await, @@ -644,7 +644,7 @@ async fn collect_account_data( account_data.global = services .account_data - .changes_since(None, sender_user, globalsince) + .changes_since(None, sender_user, globalsince, None) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) .collect() .await; @@ -655,7 +655,7 @@ async fn collect_account_data( room.clone(), services .account_data - .changes_since(Some(room), sender_user, globalsince) + .changes_since(Some(room), sender_user, globalsince, None) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect() .await, @@ -876,7 +876,7 @@ async fn collect_to_device( next_batch: next_batch.to_string(), events: services .users - .get_to_device_events(sender_user, sender_device) + .get_to_device_events(sender_user, sender_device, None, Some(next_batch)) .collect() .await, }) diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index ddbc15a4..5a943f88 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -5,7 +5,7 @@ use conduwuit::{ utils::{result::LogErr, stream::TryIgnore, ReadyExt}, Err, Result, }; -use database::{Deserialized, Handle, Interfix, Json, Map}; +use database::{Deserialized, Handle, Ignore, Json, Map}; use futures::{Stream, StreamExt, TryFutureExt}; use ruma::{ events::{ @@ -131,18 +131,20 @@ pub fn changes_since<'a>( room_id: Option<&'a RoomId>, user_id: &'a UserId, since: u64, + to: Option, ) -> impl Stream + Send + 'a { - let prefix = (room_id, user_id, Interfix); - let prefix = database::serialize_key(prefix).expect("failed to serialize prefix"); + type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore); // Skip the data that's exactly at since, because we sent that last time let first_possible = (room_id, user_id, since.saturating_add(1)); self.db .roomuserdataid_accountdata - .stream_from_raw(&first_possible) + .stream_from(&first_possible) .ignore_err() - .ready_take_while(move |(k, _)| k.starts_with(&prefix)) + .ready_take_while(move |((room_id_, user_id_, count, _), _): &(Key<'_>, _)| { + room_id == *room_id_ && user_id == *user_id_ && to.is_none_or(|to| *count <= to) + }) .map(move |(_, v)| { match room_id { | Some(_) => serde_json::from_slice::>(v) diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index e5caed47..68b87541 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -1,7 +1,7 @@ use std::{collections::BTreeMap, mem, sync::Arc}; use conduwuit::{ - debug_warn, err, trace, + at, debug_warn, err, trace, utils::{self, stream::TryIgnore, string::Unquoted, ReadyExt}, Err, Error, Result, Server, }; @@ -790,13 +790,23 @@ impl Service { &'a self, user_id: &'a UserId, device_id: &'a DeviceId, + since: Option, + to: Option, ) -> impl Stream> + Send + 'a { - let prefix = (user_id, device_id, Interfix); + type Key<'a> = (&'a UserId, &'a DeviceId, u64); + + let from = (user_id, device_id, since.map_or(0, |since| since.saturating_add(1))); + self.db .todeviceid_events - .stream_prefix(&prefix) + .stream_from(&from) .ignore_err() - .map(|(_, val): (Ignore, Raw)| val) + .ready_take_while(move |((user_id_, device_id_, count), _): &(Key<'_>, _)| { + user_id == *user_id_ + && device_id == *device_id_ + && to.is_none_or(|to| *count <= to) + }) + .map(at!(1)) } pub async fn remove_to_device_events(