From 2e4d9cb37cf7d47a9506ee3697775ddfadcb1d56 Mon Sep 17 00:00:00 2001 From: Kirill Hmelnitski Date: Thu, 31 Oct 2024 23:39:20 +0300 Subject: [PATCH] fix thread pagination refactor logic increase fetch limit for first relates apply other format Co-authored-by: Jason Volk Signed-off-by: Jason Volk --- src/api/client/relations.rs | 12 +- src/service/rooms/pdu_metadata/data.rs | 39 ++--- src/service/rooms/pdu_metadata/mod.rs | 197 +++++++++++++------------ 3 files changed, 125 insertions(+), 123 deletions(-) diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index d4384730..0456924c 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -20,8 +20,8 @@ pub(crate) async fn get_relating_events_with_rel_type_and_event_type_route( &body.event_id, body.event_type.clone().into(), body.rel_type.clone().into(), - body.from.as_ref(), - body.to.as_ref(), + body.from.as_deref(), + body.to.as_deref(), body.limit, body.recurse, body.dir, @@ -51,8 +51,8 @@ pub(crate) async fn get_relating_events_with_rel_type_route( &body.event_id, None, body.rel_type.clone().into(), - body.from.as_ref(), - body.to.as_ref(), + body.from.as_deref(), + body.to.as_deref(), body.limit, body.recurse, body.dir, @@ -82,8 +82,8 @@ pub(crate) async fn get_relating_events_route( &body.event_id, None, None, - body.from.as_ref(), - body.to.as_ref(), + body.from.as_deref(), + body.to.as_deref(), body.limit, body.recurse, body.dir, diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index 4d570e6d..51a43714 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -8,7 +8,7 @@ use conduit::{ }; use database::Map; use futures::{Stream, StreamExt}; -use ruma::{EventId, RoomId, UserId}; +use ruma::{api::Direction, EventId, RoomId, UserId}; use crate::{rooms, Dep}; @@ -45,9 +45,9 @@ impl Data { self.tofrom_relation.aput_raw::(key, []); } - pub(super) fn relations_until<'a>( - &'a self, user_id: &'a UserId, shortroomid: u64, target: u64, until: PduCount, - ) -> impl Stream + Send + 'a + '_ { + pub(super) fn get_relations<'a>( + &'a self, user_id: &'a UserId, shortroomid: u64, target: u64, until: PduCount, dir: Direction, + ) -> impl Stream + Send + '_ { let prefix = target.to_be_bytes().to_vec(); let mut current = prefix.clone(); let count_raw = match until { @@ -59,22 +59,23 @@ impl Data { }; current.extend_from_slice(&count_raw.to_be_bytes()); - self.tofrom_relation - .rev_raw_keys_from(¤t) - .ignore_err() - .ready_take_while(move |key| key.starts_with(&prefix)) - .map(|to_from| utils::u64_from_u8(&to_from[(size_of::())..])) - .filter_map(move |from| async move { - let mut pduid = shortroomid.to_be_bytes().to_vec(); - pduid.extend_from_slice(&from.to_be_bytes()); - let mut pdu = self.services.timeline.get_pdu_from_id(&pduid).await.ok()?; + match dir { + Direction::Forward => self.tofrom_relation.raw_keys_from(¤t).boxed(), + Direction::Backward => self.tofrom_relation.rev_raw_keys_from(¤t).boxed(), + } + .ignore_err() + .ready_take_while(move |key| key.starts_with(&prefix)) + .map(|to_from| utils::u64_from_u8(&to_from[(size_of::())..])) + .filter_map(move |from| async move { + let mut pduid = shortroomid.to_be_bytes().to_vec(); + pduid.extend_from_slice(&from.to_be_bytes()); + let mut pdu = self.services.timeline.get_pdu_from_id(&pduid).await.ok()?; + if pdu.sender != user_id { + pdu.remove_transaction_id().log_err().ok(); + } - if pdu.sender != user_id { - pdu.remove_transaction_id().log_err().ok(); - } - - Some((PduCount::Normal(from), pdu)) - }) + Some((PduCount::Normal(from), pdu)) + }) } pub(super) fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc]) { diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/service/rooms/pdu_metadata/mod.rs index fb85d031..b1cf2049 100644 --- a/src/service/rooms/pdu_metadata/mod.rs +++ b/src/service/rooms/pdu_metadata/mod.rs @@ -1,12 +1,16 @@ mod data; use std::sync::Arc; -use conduit::{utils::stream::IterStream, PduCount, Result}; -use futures::StreamExt; +use conduit::{ + at, + utils::{result::FlatOk, stream::ReadyExt, IterStream}, + PduCount, Result, +}; +use futures::{FutureExt, StreamExt}; use ruma::{ api::{client::relations::get_relating_events, Direction}, events::{relation::RelationType, TimelineEventType}, - uint, EventId, RoomId, UInt, UserId, + EventId, RoomId, UInt, UserId, }; use serde::Deserialize; @@ -63,24 +67,24 @@ impl Service { #[allow(clippy::too_many_arguments)] pub async fn paginate_relations_with_filter( &self, sender_user: &UserId, room_id: &RoomId, target: &EventId, filter_event_type: Option, - filter_rel_type: Option, from: Option<&String>, to: Option<&String>, limit: Option, + filter_rel_type: Option, from: Option<&str>, to: Option<&str>, limit: Option, recurse: bool, dir: Direction, ) -> Result { - let from = match from { - Some(from) => PduCount::try_from_string(from)?, - None => match dir { + let from = from + .map(PduCount::try_from_string) + .transpose()? + .unwrap_or_else(|| match dir { Direction::Forward => PduCount::min(), Direction::Backward => PduCount::max(), - }, - }; + }); - let to = to.and_then(|t| PduCount::try_from_string(t).ok()); + let to = to.map(PduCount::try_from_string).flat_ok(); - // Use limit or else 10, with maximum 100 - let limit = limit - .unwrap_or_else(|| uint!(10)) - .try_into() - .unwrap_or(10) + // Use limit or else 30, with maximum 100 + let limit: usize = limit + .map(TryInto::try_into) + .flat_ok() + .unwrap_or(30) .min(100); // Spec (v1.10) recommends depth of at least 3 @@ -90,53 +94,96 @@ impl Service { 1 }; - let relations_until: Vec = self - .relations_until(sender_user, room_id, target, from, depth) - .await?; - - // TODO: should be relations_after - let events: Vec<_> = relations_until + let events: Vec = self + .get_relations(sender_user, room_id, target, from, limit, depth, dir) + .await .into_iter() - .filter(move |(_, pdu): &PdusIterItem| { - if !filter_event_type.as_ref().map_or(true, |t| pdu.kind == *t) { - return false; - } - - let Ok(content) = pdu.get_content::() else { - return false; - }; - - filter_rel_type + .filter(|(_, pdu)| { + filter_event_type .as_ref() - .map_or(true, |r| *r == content.relates_to.rel_type) + .is_none_or(|kind| *kind == pdu.kind) + }) + .filter(|(_, pdu)| { + filter_rel_type.as_ref().is_none_or(|rel_type| { + pdu.get_content() + .map(|c: ExtractRelatesToEventId| c.relates_to.rel_type) + .is_ok_and(|r| r == *rel_type) + }) }) - .take(limit) - .take_while(|(k, _)| Some(*k) != to) .stream() .filter_map(|item| self.visibility_filter(sender_user, item)) + .ready_take_while(|(count, _)| Some(*count) != to) + .take(limit) + .collect() + .boxed() + .await; + + let next_batch = match dir { + Direction::Backward => events.first(), + Direction::Forward => events.last(), + } + .map(at!(0)) + .map(|t| t.stringify()); + + Ok(get_relating_events::v1::Response { + next_batch, + prev_batch: Some(from.stringify()), + recursion_depth: recurse.then_some(depth.into()), + chunk: events + .into_iter() + .map(at!(1)) + .map(|pdu| pdu.to_message_like_event()) + .collect(), + }) + } + + #[allow(clippy::too_many_arguments)] + pub async fn get_relations( + &self, user_id: &UserId, room_id: &RoomId, target: &EventId, until: PduCount, limit: usize, max_depth: u8, + dir: Direction, + ) -> Vec { + let room_id = self.services.short.get_or_create_shortroomid(room_id).await; + + let target = match self.services.timeline.get_pdu_count(target).await { + Ok(PduCount::Normal(c)) => c, + // TODO: Support backfilled relations + _ => 0, // This will result in an empty iterator + }; + + let mut pdus: Vec<_> = self + .db + .get_relations(user_id, room_id, target, until, dir) .collect() .await; - let next_token = events.last().map(|(count, _)| count).copied(); + let mut stack: Vec<_> = pdus.iter().map(|pdu| (pdu.clone(), 1)).collect(); - let events_chunk: Vec<_> = match dir { - Direction::Forward => events - .into_iter() - .map(|(_, pdu)| pdu.to_message_like_event()) - .collect(), - Direction::Backward => events - .into_iter() - .rev() // relations are always most recent first - .map(|(_, pdu)| pdu.to_message_like_event()) - .collect(), - }; + 'limit: while let Some(stack_pdu) = stack.pop() { + let target = match stack_pdu.0 .0 { + PduCount::Normal(c) => c, + // TODO: Support backfilled relations + PduCount::Backfilled(_) => 0, // This will result in an empty iterator + }; - Ok(get_relating_events::v1::Response { - chunk: events_chunk, - next_batch: next_token.map(|t| t.stringify()), - prev_batch: Some(from.stringify()), - recursion_depth: recurse.then_some(depth.into()), - }) + let relations: Vec<_> = self + .db + .get_relations(user_id, room_id, target, until, dir) + .collect() + .await; + + for relation in relations { + if stack_pdu.1 < max_depth { + stack.push((relation.clone(), stack_pdu.1.saturating_add(1))); + } + + pdus.push(relation); + if pdus.len() >= limit { + break 'limit; + } + } + } + + pdus } async fn visibility_filter(&self, sender_user: &UserId, item: PdusIterItem) -> Option { @@ -149,52 +196,6 @@ impl Service { .then_some(item) } - pub async fn relations_until( - &self, user_id: &UserId, room_id: &RoomId, target: &EventId, until: PduCount, max_depth: u8, - ) -> Result> { - let room_id = self.services.short.get_or_create_shortroomid(room_id).await; - - let target = match self.services.timeline.get_pdu_count(target).await { - Ok(PduCount::Normal(c)) => c, - // TODO: Support backfilled relations - _ => 0, // This will result in an empty iterator - }; - - let mut pdus: Vec = self - .db - .relations_until(user_id, room_id, target, until) - .collect() - .await; - - let mut stack: Vec<_> = pdus.clone().into_iter().map(|pdu| (pdu, 1)).collect(); - - while let Some(stack_pdu) = stack.pop() { - let target = match stack_pdu.0 .0 { - PduCount::Normal(c) => c, - // TODO: Support backfilled relations - PduCount::Backfilled(_) => 0, // This will result in an empty iterator - }; - - let relations: Vec = self - .db - .relations_until(user_id, room_id, target, until) - .collect() - .await; - - for relation in relations { - if stack_pdu.1 < max_depth { - stack.push((relation.clone(), stack_pdu.1.saturating_add(1))); - } - - pdus.push(relation); - } - } - - pdus.sort_by(|a, b| a.0.cmp(&b.0)); - - Ok(pdus) - } - #[inline] #[tracing::instrument(skip_all, level = "debug")] pub fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc]) {