From c903a718075ddef637529294ccd45554d11d767d Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 25 Nov 2024 01:55:57 +0000 Subject: [PATCH] refactor and optimize receipt service data Signed-off-by: Jason Volk --- src/api/client/sync/v3.rs | 6 ++-- src/api/client/sync/v4.rs | 8 ++--- src/service/rooms/read_receipt/data.rs | 43 ++++++++------------------ src/service/rooms/read_receipt/mod.rs | 8 ++--- src/service/sending/sender.rs | 6 ++-- 5 files changed, 26 insertions(+), 45 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 80aa8184..b69cbc87 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -559,8 +559,6 @@ async fn load_joined_room( .lazy_loading .lazy_load_confirm_delivery(sender_user, sender_device, room_id, sincecount); - // Database queries: - let current_shortstatehash = services .rooms .state @@ -983,9 +981,9 @@ async fn load_joined_room( .filter_map(|(read_user, _, edu)| async move { services .users - .user_is_ignored(&read_user, sender_user) + .user_is_ignored(read_user, sender_user) .await - .or_some((read_user, edu)) + .or_some((read_user.to_owned(), edu)) }) .collect() .await; diff --git a/src/api/client/sync/v4.rs b/src/api/client/sync/v4.rs index 78b0b277..0913336d 100644 --- a/src/api/client/sync/v4.rs +++ b/src/api/client/sync/v4.rs @@ -495,11 +495,11 @@ pub(crate) async fn sync_events_v4_route( .read_receipt .readreceipts_since(room_id, *roomsince) .filter_map(|(read_user, ts, v)| async move { - (!services + services .users - .user_is_ignored(&read_user, sender_user) - .await) - .then_some((read_user, ts, v)) + .user_is_ignored(read_user, sender_user) + .await + .or_some((read_user.to_owned(), ts, v)) }) .collect() .await; diff --git a/src/service/rooms/read_receipt/data.rs b/src/service/rooms/read_receipt/data.rs index 1194598d..34639e27 100644 --- a/src/service/rooms/read_receipt/data.rs +++ b/src/service/rooms/read_receipt/data.rs @@ -1,16 +1,15 @@ -use std::{mem::size_of, sync::Arc}; +use std::sync::Arc; use conduit::{ - utils, utils::{stream::TryIgnore, ReadyExt}, - Error, Result, + Result, }; use database::{Deserialized, Json, Map}; use futures::{Stream, StreamExt}; use ruma::{ events::{receipt::ReceiptEvent, AnySyncEphemeralRoomEvent}, serde::Raw, - CanonicalJsonObject, OwnedUserId, RoomId, UserId, + CanonicalJsonObject, RoomId, UserId, }; use crate::{globals, Dep}; @@ -26,7 +25,7 @@ struct Services { globals: Dep, } -pub(super) type ReceiptItem = (OwnedUserId, u64, Raw); +pub(super) type ReceiptItem<'a> = (&'a UserId, u64, Raw); impl Data { pub(super) fn new(args: &crate::Args<'_>) -> Self { @@ -59,39 +58,23 @@ impl Data { pub(super) fn readreceipts_since<'a>( &'a self, room_id: &'a RoomId, since: u64, - ) -> impl Stream + Send + 'a { + ) -> impl Stream> + Send + 'a { + type Key<'a> = (&'a RoomId, u64, &'a UserId); + type KeyVal<'a> = (Key<'a>, CanonicalJsonObject); + let after_since = since.saturating_add(1); // +1 so we don't send the event at since let first_possible_edu = (room_id, after_since); - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xFF); - let prefix2 = prefix.clone(); - self.readreceiptid_readreceipt - .stream_from_raw(&first_possible_edu) + .stream_from(&first_possible_edu) .ignore_err() - .ready_take_while(move |(k, _)| k.starts_with(&prefix2)) - .map(move |(k, v)| { - let count_offset = prefix.len().saturating_add(size_of::()); - let user_id_offset = count_offset.saturating_add(1); - - let count = utils::u64_from_bytes(&k[prefix.len()..count_offset]) - .map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?; - - let user_id_str = utils::string_from_bytes(&k[user_id_offset..]) - .map_err(|_| Error::bad_database("Invalid readreceiptid userid bytes in db."))?; - - let user_id = UserId::parse(user_id_str) - .map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?; - - let mut json = serde_json::from_slice::(v) - .map_err(|_| Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json."))?; - + .ready_take_while(move |((r, ..), _): &KeyVal<'_>| *r == room_id) + .map(move |((_, count, user_id), mut json): KeyVal<'_>| { json.remove("room_id"); - let event = Raw::from_json(serde_json::value::to_raw_value(&json)?); + let event = serde_json::value::to_raw_value(&json)?; - Ok((user_id, count, event)) + Ok((user_id, count, Raw::from_json(event))) }) .ignore_err() } diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index ec34361e..e089d369 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -7,10 +7,10 @@ use futures::Stream; use ruma::{ events::{ receipt::{ReceiptEvent, ReceiptEventContent}, - SyncEphemeralRoomEvent, + AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent, }, serde::Raw, - RoomId, UserId, + OwnedUserId, RoomId, UserId, }; use self::data::{Data, ReceiptItem}; @@ -55,7 +55,7 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub fn readreceipts_since<'a>( &'a self, room_id: &'a RoomId, since: u64, - ) -> impl Stream + Send + 'a { + ) -> impl Stream> + Send + 'a { self.db.readreceipts_since(room_id, since) } @@ -83,7 +83,7 @@ impl Service { #[must_use] pub fn pack_receipts(receipts: I) -> Raw> where - I: Iterator, + I: Iterator)>, { let mut json = BTreeMap::new(); for (_, _, value) in receipts { diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 0a0aae39..3c544725 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -376,7 +376,7 @@ impl Service { let mut read = BTreeMap::::new(); while let Some((user_id, count, read_receipt)) = receipts.next().await { *max_edu_count = cmp::max(count, *max_edu_count); - if !self.services.globals.user_is_local(&user_id) { + if !self.services.globals.user_is_local(user_id) { continue; } @@ -400,7 +400,7 @@ impl Service { let receipt = receipt .remove(&ReceiptType::Read) .expect("our read receipts always set this") - .remove(&user_id) + .remove(user_id) .expect("our read receipts always have the user here"); let receipt_data = ReceiptData { @@ -408,7 +408,7 @@ impl Service { event_ids: vec![event_id.clone()], }; - if read.insert(user_id, receipt_data).is_none() { + if read.insert(user_id.to_owned(), receipt_data).is_none() { *num = num.saturating_add(1); if *num >= SELECT_RECEIPT_LIMIT { break;