snapshot sync results at next_batch upper-bound

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-31 13:51:39 +00:00
parent a774afe837
commit 5e59ce37c4
7 changed files with 41 additions and 24 deletions

View file

@ -5,7 +5,7 @@ use conduwuit::{
utils::{result::LogErr, stream::TryIgnore, ReadyExt},
Err, Result,
};
use database::{Deserialized, Handle, Interfix, Json, Map};
use database::{Deserialized, Handle, Ignore, Json, Map};
use futures::{Stream, StreamExt, TryFutureExt};
use ruma::{
events::{
@ -131,18 +131,20 @@ pub fn changes_since<'a>(
room_id: Option<&'a RoomId>,
user_id: &'a UserId,
since: u64,
to: Option<u64>,
) -> impl Stream<Item = AnyRawAccountDataEvent> + Send + 'a {
let prefix = (room_id, user_id, Interfix);
let prefix = database::serialize_key(prefix).expect("failed to serialize prefix");
type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
// Skip the data that's exactly at since, because we sent that last time
let first_possible = (room_id, user_id, since.saturating_add(1));
self.db
.roomuserdataid_accountdata
.stream_from_raw(&first_possible)
.stream_from(&first_possible)
.ignore_err()
.ready_take_while(move |(k, _)| k.starts_with(&prefix))
.ready_take_while(move |((room_id_, user_id_, count, _), _): &(Key<'_>, _)| {
room_id == *room_id_ && user_id == *user_id_ && to.is_none_or(|to| *count <= to)
})
.map(move |(_, v)| {
match room_id {
| Some(_) => serde_json::from_slice::<Raw<AnyRoomAccountDataEvent>>(v)

View file

@ -1,7 +1,7 @@
use std::{collections::BTreeMap, mem, sync::Arc};
use conduwuit::{
debug_warn, err, trace,
at, debug_warn, err, trace,
utils::{self, stream::TryIgnore, string::Unquoted, ReadyExt},
Err, Error, Result, Server,
};
@ -790,13 +790,23 @@ impl Service {
&'a self,
user_id: &'a UserId,
device_id: &'a DeviceId,
since: Option<u64>,
to: Option<u64>,
) -> impl Stream<Item = Raw<AnyToDeviceEvent>> + Send + 'a {
let prefix = (user_id, device_id, Interfix);
type Key<'a> = (&'a UserId, &'a DeviceId, u64);
let from = (user_id, device_id, since.map_or(0, |since| since.saturating_add(1)));
self.db
.todeviceid_events
.stream_prefix(&prefix)
.stream_from(&from)
.ignore_err()
.map(|(_, val): (Ignore, Raw<AnyToDeviceEvent>)| val)
.ready_take_while(move |((user_id_, device_id_, count), _): &(Key<'_>, _)| {
user_id == *user_id_
&& device_id == *device_id_
&& to.is_none_or(|to| *count <= to)
})
.map(at!(1))
}
pub async fn remove_to_device_events<Until>(