From a774afe8370bd6eed3deed6e663229e8457d73c7 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 2 Feb 2025 08:59:14 +0000 Subject: [PATCH] modernize remove_to_device_events Signed-off-by: Jason Volk --- src/service/users/mod.rs | 43 ++++++++++++++++------------------------ 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index b2d3a94a..e5caed47 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -1,12 +1,12 @@ -use std::{collections::BTreeMap, mem, mem::size_of, sync::Arc}; +use std::{collections::BTreeMap, mem, sync::Arc}; use conduwuit::{ debug_warn, err, trace, utils::{self, stream::TryIgnore, string::Unquoted, ReadyExt}, Err, Error, Result, Server, }; -use database::{Database, Deserialized, Ignore, Interfix, Json, Map}; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt}; +use database::{Deserialized, Ignore, Interfix, Json, Map}; +use futures::{Stream, StreamExt, TryFutureExt}; use ruma::{ api::client::{device::Device, error::ErrorKind, filter::FilterDefinition}, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, @@ -28,7 +28,6 @@ pub struct Service { struct Services { server: Arc, - db: Arc, account_data: Dep, admin: Dep, globals: Dep, @@ -64,7 +63,6 @@ impl crate::Service for Service { Ok(Arc::new(Self { services: Services { server: args.server.clone(), - db: args.db.clone(), account_data: args.depend::("account_data"), admin: args.depend::("admin"), globals: args.depend::("globals"), @@ -801,35 +799,28 @@ impl Service { .map(|(_, val): (Ignore, Raw)| val) } - pub async fn remove_to_device_events( + pub async fn remove_to_device_events( &self, user_id: &UserId, device_id: &DeviceId, - until: u64, - ) { - let mut prefix = user_id.as_bytes().to_vec(); - prefix.push(0xFF); - prefix.extend_from_slice(device_id.as_bytes()); - prefix.push(0xFF); + until: Until, + ) where + Until: Into> + Send, + { + type Key<'a> = (&'a UserId, &'a DeviceId, u64); - let mut last = prefix.clone(); - last.extend_from_slice(&until.to_be_bytes()); - - let _cork = self.services.db.cork_and_flush(); + let until = until.into().unwrap_or(u64::MAX); + let from = (user_id, device_id, until); self.db .todeviceid_events - .rev_raw_keys_from(&last) // this includes last + .rev_keys_from(&from) .ignore_err() - .ready_take_while(move |key| key.starts_with(&prefix)) - .map(|key| { - let len = key.len(); - let start = len.saturating_sub(size_of::()); - let count = utils::u64_from_u8(&key[start..len]); - (key, count) + .ready_take_while(move |(user_id_, device_id_, _): &Key<'_>| { + user_id == *user_id_ && device_id == *device_id_ + }) + .ready_for_each(|key: Key<'_>| { + self.db.todeviceid_events.del(key); }) - .ready_take_while(move |(_, count)| *count <= until) - .ready_for_each(|(key, _)| self.db.todeviceid_events.remove(&key)) - .boxed() .await; }