From 925061b92de1a60c6d7af9d28bdd158b7cc9901b Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 4 Jan 2025 04:12:50 +0000 Subject: [PATCH] flatten timeline pdus iterations; increase concurrency Signed-off-by: Jason Volk --- src/admin/query/room_timeline.rs | 61 ++++++++ src/api/client/context.rs | 67 ++++----- src/api/client/membership.rs | 2 + src/api/client/message.rs | 6 +- src/api/client/room/initial_sync.rs | 15 +- src/api/client/sync/mod.rs | 20 ++- src/api/server/backfill.rs | 30 ++-- src/api/server/send.rs | 1 + .../rooms/event_handler/handle_prev_pdu.rs | 2 +- src/service/rooms/timeline/data.rs | 136 +++++++++--------- src/service/rooms/timeline/mod.rs | 135 ++++++----------- 11 files changed, 238 insertions(+), 237 deletions(-) create mode 100644 src/admin/query/room_timeline.rs diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs new file mode 100644 index 00000000..3fe653e3 --- /dev/null +++ b/src/admin/query/room_timeline.rs @@ -0,0 +1,61 @@ +use clap::Subcommand; +use conduwuit::{utils::stream::TryTools, PduCount, Result}; +use futures::TryStreamExt; +use ruma::{events::room::message::RoomMessageEventContent, OwnedRoomOrAliasId}; + +use crate::{admin_command, admin_command_dispatch}; + +#[admin_command_dispatch] +#[derive(Debug, Subcommand)] +/// Query tables from database +pub(crate) enum RoomTimelineCommand { + Pdus { + room_id: OwnedRoomOrAliasId, + + from: Option, + + #[arg(short, long)] + limit: Option, + }, + + Last { + room_id: OwnedRoomOrAliasId, + }, +} + +#[admin_command] +pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result { + let room_id = self.services.rooms.alias.resolve(&room_id).await?; + + let result = self + .services + .rooms + .timeline + .last_timeline_count(None, &room_id) + .await?; + + Ok(RoomMessageEventContent::notice_markdown(format!("{result:#?}"))) +} + +#[admin_command] +pub(super) async fn pdus( + &self, + room_id: OwnedRoomOrAliasId, + from: Option, + limit: Option, +) -> Result { + let room_id = self.services.rooms.alias.resolve(&room_id).await?; + + let from: Option = from.as_deref().map(str::parse).transpose()?; + + let result: Vec<_> = self + .services + .rooms + .timeline + .pdus_rev(None, &room_id, from) + .try_take(limit.unwrap_or(3)) + .try_collect() + .await?; + + Ok(RoomMessageEventContent::notice_markdown(format!("{result:#?}"))) +} diff --git a/src/api/client/context.rs b/src/api/client/context.rs index 30ba170d..b957561c 100644 --- a/src/api/client/context.rs +++ b/src/api/client/context.rs @@ -1,14 +1,12 @@ -use std::iter::once; - use axum::extract::State; use conduwuit::{ at, err, ref_at, utils::{ future::TryExtExt, - stream::{BroadbandExt, ReadyExt, WidebandExt}, + stream::{BroadbandExt, ReadyExt, TryIgnore, WidebandExt}, IterStream, }, - Err, Result, + Err, PduEvent, Result, }; use futures::{join, try_join, FutureExt, StreamExt, TryFutureExt}; use ruma::{ @@ -59,13 +57,13 @@ pub(crate) async fn get_context_route( false }; - let base_token = services + let base_id = services .rooms .timeline - .get_pdu_count(&body.event_id) + .get_pdu_id(&body.event_id) .map_err(|_| err!(Request(NotFound("Event not found.")))); - let base_event = services + let base_pdu = services .rooms .timeline .get_pdu(&body.event_id) @@ -77,48 +75,44 @@ pub(crate) async fn get_context_route( .user_can_see_event(sender_user, &body.room_id, &body.event_id) .map(Ok); - let (base_token, base_event, visible) = try_join!(base_token, base_event, visible)?; + let (base_id, base_pdu, visible) = try_join!(base_id, base_pdu, visible)?; - if base_event.room_id != body.room_id || base_event.event_id != body.event_id { + if base_pdu.room_id != body.room_id || base_pdu.event_id != body.event_id { return Err!(Request(NotFound("Base event not found."))); } - if !visible - || ignored_filter(&services, (base_token, base_event.clone()), sender_user) - .await - .is_none() - { + if !visible { return Err!(Request(Forbidden("You don't have permission to view this event."))); } - let events_before = - services - .rooms - .timeline - .pdus_rev(Some(sender_user), room_id, Some(base_token)); + let base_count = base_id.pdu_count(); + + let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user); + + let events_before = services + .rooms + .timeline + .pdus_rev(Some(sender_user), room_id, Some(base_count)) + .ignore_err() + .ready_filter_map(|item| event_filter(item, filter)) + .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) + .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) + .take(limit / 2) + .collect(); let events_after = services .rooms .timeline - .pdus(Some(sender_user), room_id, Some(base_token)); - - let (events_before, events_after) = try_join!(events_before, events_after)?; - - let events_before = events_before + .pdus(Some(sender_user), room_id, Some(base_count)) + .ignore_err() .ready_filter_map(|item| event_filter(item, filter)) .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) .take(limit / 2) .collect(); - let events_after = events_after - .ready_filter_map(|item| event_filter(item, filter)) - .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) - .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) - .take(limit / 2) - .collect(); - - let (events_before, events_after): (Vec<_>, Vec<_>) = join!(events_before, events_after); + let (base_event, events_before, events_after): (_, Vec<_>, Vec<_>) = + join!(base_event, events_before, events_after); let state_at = events_after .last() @@ -134,7 +128,8 @@ pub(crate) async fn get_context_route( .map_err(|e| err!(Database("State not found: {e}"))) .await?; - let lazy = once(&(base_token, base_event.clone())) + let lazy = base_event + .iter() .chain(events_before.iter()) .chain(events_after.iter()) .stream() @@ -175,19 +170,19 @@ pub(crate) async fn get_context_route( .await; Ok(get_context::v3::Response { - event: Some(base_event.to_room_event()), + event: base_event.map(at!(1)).as_ref().map(PduEvent::to_room_event), start: events_before .last() .map(at!(0)) - .or(Some(base_token)) + .or(Some(base_count)) .as_ref() .map(ToString::to_string), end: events_after .last() .map(at!(0)) - .or(Some(base_token)) + .or(Some(base_count)) .as_ref() .map(ToString::to_string), diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index 3eb52138..4046b493 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -1314,6 +1314,7 @@ async fn join_room_by_id_helper_local( .rooms .event_handler .handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true) + .boxed() .await?; } else { return Err(error); @@ -1491,6 +1492,7 @@ pub(crate) async fn invite_helper( .rooms .event_handler .handle_incoming_pdu(&origin, room_id, &event_id, value, true) + .boxed() .await? .ok_or_else(|| { err!(Request(InvalidParam("Could not accept incoming PDU as timeline event."))) diff --git a/src/api/client/message.rs b/src/api/client/message.rs index 58f4f916..ec9a14d5 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -5,7 +5,7 @@ use conduwuit::{ at, is_equal_to, utils::{ result::{FlatOk, LogErr}, - stream::{BroadbandExt, WidebandExt}, + stream::{BroadbandExt, TryIgnore, WidebandExt}, IterStream, ReadyExt, }, Event, PduCount, Result, @@ -107,14 +107,14 @@ pub(crate) async fn get_message_events_route( .rooms .timeline .pdus(Some(sender_user), room_id, Some(from)) - .await? + .ignore_err() .boxed(), | Direction::Backward => services .rooms .timeline .pdus_rev(Some(sender_user), room_id, Some(from)) - .await? + .ignore_err() .boxed(), }; diff --git a/src/api/client/room/initial_sync.rs b/src/api/client/room/initial_sync.rs index cc3c9420..301b6e8d 100644 --- a/src/api/client/room/initial_sync.rs +++ b/src/api/client/room/initial_sync.rs @@ -1,6 +1,10 @@ use axum::extract::State; -use conduwuit::{at, utils::BoolExt, Err, Result}; -use futures::StreamExt; +use conduwuit::{ + at, + utils::{stream::TryTools, BoolExt}, + Err, Result, +}; +use futures::TryStreamExt; use ruma::api::client::room::initial_sync::v3::{PaginationChunk, Request, Response}; use crate::Ruma; @@ -27,10 +31,9 @@ pub(crate) async fn room_initial_sync_route( .rooms .timeline .pdus_rev(None, room_id, None) - .await? - .take(limit) - .collect() - .await; + .try_take(limit) + .try_collect() + .await?; let state: Vec<_> = services .rooms diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index b772fbf1..79e4b1ca 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -2,10 +2,10 @@ mod v3; mod v4; use conduwuit::{ - utils::stream::{BroadbandExt, ReadyExt}, + utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, PduCount, }; -use futures::StreamExt; +use futures::{pin_mut, StreamExt}; use ruma::{RoomId, UserId}; pub(crate) use self::{v3::sync_events_route, v4::sync_events_v4_route}; @@ -29,23 +29,19 @@ async fn load_timeline( return Ok((Vec::new(), false)); } - let mut non_timeline_pdus = services + let non_timeline_pdus = services .rooms .timeline .pdus_rev(Some(sender_user), room_id, None) - .await? + .ignore_err() .ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max)) .ready_take_while(|&(pducount, _)| pducount > roomsincecount); // Take the last events for the timeline - let timeline_pdus: Vec<_> = non_timeline_pdus - .by_ref() - .take(limit) - .collect::>() - .await - .into_iter() - .rev() - .collect(); + pin_mut!(non_timeline_pdus); + let timeline_pdus: Vec<_> = non_timeline_pdus.by_ref().take(limit).collect().await; + + let timeline_pdus: Vec<_> = timeline_pdus.into_iter().rev().collect(); // They /sync response doesn't always return all messages, so we say the output // is limited unless there are events in non_timeline_pdus diff --git a/src/api/server/backfill.rs b/src/api/server/backfill.rs index fac0e540..b44db67c 100644 --- a/src/api/server/backfill.rs +++ b/src/api/server/backfill.rs @@ -2,10 +2,10 @@ use std::cmp; use axum::extract::State; use conduwuit::{ - utils::{IterStream, ReadyExt}, + utils::{stream::TryTools, IterStream, ReadyExt}, PduCount, Result, }; -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt, TryStreamExt}; use ruma::{api::federation::backfill::get_backfill, uint, MilliSecondsSinceUnixEpoch}; use super::AccessCheck; @@ -57,26 +57,30 @@ pub(crate) async fn get_backfill_route( .rooms .timeline .pdus_rev(None, &body.room_id, Some(from.saturating_add(1))) - .await? - .take(limit) - .filter_map(|(_, pdu)| async move { - services + .try_take(limit) + .try_filter_map(|(_, pdu)| async move { + Ok(services .rooms .state_accessor .server_can_see_event(body.origin(), &pdu.room_id, &pdu.event_id) .await - .then_some(pdu) + .then_some(pdu)) }) - .filter_map(|pdu| async move { - services + .try_filter_map(|pdu| async move { + Ok(services .rooms .timeline .get_pdu_json(&pdu.event_id) .await - .ok() + .ok()) }) - .then(|pdu| services.sending.convert_to_outgoing_federation_event(pdu)) - .collect() - .await, + .and_then(|pdu| { + services + .sending + .convert_to_outgoing_federation_event(pdu) + .map(Ok) + }) + .try_collect() + .await?, }) } diff --git a/src/api/server/send.rs b/src/api/server/send.rs index dbe0108f..c0c8a0c9 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -135,6 +135,7 @@ async fn handle_pdus( .rooms .event_handler .handle_incoming_pdu(origin, &room_id, &event_id, value, true) + .boxed() .await .map(|_| ()); diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index 0a5295dc..9bd4450e 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -26,7 +26,7 @@ pub(super) async fn handle_prev_pdu<'a>( (Arc, BTreeMap), >, create_event: &Arc, - first_pdu_in_room: &Arc, + first_pdu_in_room: &PduEvent, prev_id: &EventId, ) -> Result { // Check for disabled again because it might have changed diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 0be8aa52..457c1e8d 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -1,22 +1,15 @@ -use std::{ - borrow::Borrow, - collections::{hash_map, HashMap}, - sync::Arc, -}; +use std::{borrow::Borrow, sync::Arc}; use conduwuit::{ at, err, result::{LogErr, NotFound}, utils, - utils::{future::TryExtExt, stream::TryIgnore, ReadyExt}, + utils::stream::TryReadyExt, Err, PduCount, PduEvent, Result, }; use database::{Database, Deserialized, Json, KeyVal, Map}; -use futures::{future::select_ok, FutureExt, Stream, StreamExt}; -use ruma::{ - api::Direction, CanonicalJsonObject, EventId, OwnedRoomId, OwnedUserId, RoomId, UserId, -}; -use tokio::sync::Mutex; +use futures::{future::select_ok, pin_mut, FutureExt, Stream, TryFutureExt, TryStreamExt}; +use ruma::{api::Direction, CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId}; use super::{PduId, RawPduId}; use crate::{rooms, rooms::short::ShortRoomId, Dep}; @@ -27,7 +20,6 @@ pub(super) struct Data { pduid_pdu: Arc, userroomid_highlightcount: Arc, userroomid_notificationcount: Arc, - pub(super) lasttimelinecount_cache: LastTimelineCountCache, pub(super) db: Arc, services: Services, } @@ -37,7 +29,6 @@ struct Services { } pub type PdusIterItem = (PduCount, PduEvent); -type LastTimelineCountCache = Mutex>; impl Data { pub(super) fn new(args: &crate::Args<'_>) -> Self { @@ -48,7 +39,6 @@ impl Data { pduid_pdu: db["pduid_pdu"].clone(), userroomid_highlightcount: db["userroomid_highlightcount"].clone(), userroomid_notificationcount: db["userroomid_notificationcount"].clone(), - lasttimelinecount_cache: Mutex::new(HashMap::new()), db: args.db.clone(), services: Services { short: args.depend::("rooms::short"), @@ -56,27 +46,39 @@ impl Data { } } + #[inline] pub(super) async fn last_timeline_count( &self, sender_user: Option<&UserId>, room_id: &RoomId, ) -> Result { - match self - .lasttimelinecount_cache - .lock() - .await - .entry(room_id.into()) - { - | hash_map::Entry::Occupied(o) => Ok(*o.get()), - | hash_map::Entry::Vacant(v) => Ok(self - .pdus_rev(sender_user, room_id, PduCount::max()) - .await? - .next() - .await - .map(at!(0)) - .filter(|&count| matches!(count, PduCount::Normal(_))) - .map_or_else(PduCount::max, |count| *v.insert(count))), - } + let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max()); + + pin_mut!(pdus_rev); + let last_count = pdus_rev + .try_next() + .await? + .map(at!(0)) + .filter(|&count| matches!(count, PduCount::Normal(_))) + .unwrap_or_else(PduCount::max); + + Ok(last_count) + } + + #[inline] + pub(super) async fn latest_pdu_in_room( + &self, + sender_user: Option<&UserId>, + room_id: &RoomId, + ) -> Result { + let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max()); + + pin_mut!(pdus_rev); + pdus_rev + .try_next() + .await? + .map(at!(1)) + .ok_or_else(|| err!(Request(NotFound("no PDU's found in room")))) } /// Returns the `count` of this pdu's id. @@ -129,7 +131,7 @@ impl Data { pub(super) async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> Result { let pduid = self.get_pdu_id(event_id).await?; - self.pduid_pdu.get(&pduid).await.map(|_| ()) + self.pduid_pdu.exists(&pduid).await } /// Returns the pdu. @@ -148,17 +150,17 @@ impl Data { /// Like get_non_outlier_pdu(), but without the expense of fetching and /// parsing the PduEvent + #[inline] pub(super) async fn outlier_pdu_exists(&self, event_id: &EventId) -> Result { - self.eventid_outlierpdu.get(event_id).await.map(|_| ()) + self.eventid_outlierpdu.exists(event_id).await } /// Like get_pdu(), but without the expense of fetching and parsing the data - pub(super) async fn pdu_exists(&self, event_id: &EventId) -> bool { - let non_outlier = self.non_outlier_pdu_exists(event_id).is_ok(); - let outlier = self.outlier_pdu_exists(event_id).is_ok(); + pub(super) async fn pdu_exists(&self, event_id: &EventId) -> Result { + let non_outlier = self.non_outlier_pdu_exists(event_id).boxed(); + let outlier = self.outlier_pdu_exists(event_id).boxed(); - //TODO: parallelize - non_outlier.await || outlier.await + select_ok([non_outlier, outlier]).await.map(at!(0)) } /// Returns the pdu. @@ -186,11 +188,6 @@ impl Data { debug_assert!(matches!(count, PduCount::Normal(_)), "PduCount not Normal"); self.pduid_pdu.raw_put(pdu_id, Json(json)); - self.lasttimelinecount_cache - .lock() - .await - .insert(pdu.room_id.clone(), count); - self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id); self.eventid_outlierpdu.remove(pdu.event_id.as_bytes()); } @@ -225,49 +222,44 @@ impl Data { /// Returns an iterator over all events and their tokens in a room that /// happened before the event with id `until` in reverse-chronological /// order. - pub(super) async fn pdus_rev<'a>( + pub(super) fn pdus_rev<'a>( &'a self, user_id: Option<&'a UserId>, room_id: &'a RoomId, until: PduCount, - ) -> Result + Send + 'a> { - let current = self - .count_to_id(room_id, until, Direction::Backward) - .await?; - let prefix = current.shortroomid(); - let stream = self - .pduid_pdu - .rev_raw_stream_from(¤t) - .ignore_err() - .ready_take_while(move |(key, _)| key.starts_with(&prefix)) - .map(move |item| Self::each_pdu(item, user_id)); - - Ok(stream) + ) -> impl Stream> + Send + 'a { + self.count_to_id(room_id, until, Direction::Backward) + .map_ok(move |current| { + let prefix = current.shortroomid(); + self.pduid_pdu + .rev_raw_stream_from(¤t) + .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) + .ready_and_then(move |item| Self::each_pdu(item, user_id)) + }) + .try_flatten_stream() } - pub(super) async fn pdus<'a>( + pub(super) fn pdus<'a>( &'a self, user_id: Option<&'a UserId>, room_id: &'a RoomId, from: PduCount, - ) -> Result + Send + Unpin + 'a> { - let current = self.count_to_id(room_id, from, Direction::Forward).await?; - let prefix = current.shortroomid(); - let stream = self - .pduid_pdu - .raw_stream_from(¤t) - .ignore_err() - .ready_take_while(move |(key, _)| key.starts_with(&prefix)) - .map(move |item| Self::each_pdu(item, user_id)); - - Ok(stream) + ) -> impl Stream> + Send + 'a { + self.count_to_id(room_id, from, Direction::Forward) + .map_ok(move |current| { + let prefix = current.shortroomid(); + self.pduid_pdu + .raw_stream_from(¤t) + .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) + .ready_and_then(move |item| Self::each_pdu(item, user_id)) + }) + .try_flatten_stream() } - fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> PdusIterItem { + fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result { let pdu_id: RawPduId = pdu_id.into(); - let mut pdu = serde_json::from_slice::(pdu) - .expect("PduEvent in pduid_pdu database column is invalid JSON"); + let mut pdu = serde_json::from_slice::(pdu)?; if Some(pdu.sender.borrow()) != user_id { pdu.remove_transaction_id().log_err().ok(); @@ -275,7 +267,7 @@ impl Data { pdu.add_age().log_err().ok(); - (pdu_id.pdu_count(), pdu) + Ok((pdu_id.pdu_count(), pdu)) } pub(super) fn increment_notification_counts( diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 2a272c38..fe7f885a 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -9,14 +9,16 @@ use std::{ }; use conduwuit::{ - debug, debug_warn, err, error, implement, info, + at, debug, debug_warn, err, error, implement, info, pdu::{gen_event_id, EventHash, PduBuilder, PduCount, PduEvent}, - utils::{self, stream::TryIgnore, IterStream, MutexMap, MutexMapGuard, ReadyExt}, + utils::{ + self, future::TryExtExt, stream::TryIgnore, IterStream, MutexMap, MutexMapGuard, ReadyExt, + }, validated, warn, Err, Error, Result, Server, }; pub use conduwuit::{PduId, RawPduId}; use futures::{ - future, future::ready, Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, + future, future::ready, pin_mut, Future, FutureExt, Stream, StreamExt, TryStreamExt, }; use ruma::{ api::federation, @@ -34,7 +36,7 @@ use ruma::{ }, push::{Action, Ruleset, Tweak}, state_res::{self, Event, RoomVersion}, - uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, + uint, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, RoomVersionId, ServerName, UserId, }; use serde::Deserialize; @@ -139,53 +141,34 @@ impl crate::Service for Service { } fn memory_usage(&self, out: &mut dyn Write) -> Result<()> { - /* - let lasttimelinecount_cache = self - .db - .lasttimelinecount_cache - .lock() - .expect("locked") - .len(); - writeln!(out, "lasttimelinecount_cache: {lasttimelinecount_cache}")?; - */ - let mutex_insert = self.mutex_insert.len(); writeln!(out, "insert_mutex: {mutex_insert}")?; Ok(()) } - fn clear_cache(&self) { - /* - self.db - .lasttimelinecount_cache - .lock() - .expect("locked") - .clear(); - */ - } - fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } impl Service { #[tracing::instrument(skip(self), level = "debug")] - pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result> { - self.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id) - .next() - .await - .map(|(_, p)| Arc::new(p)) + pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result { + self.first_item_in_room(room_id).await.map(at!(1)) + } + + #[tracing::instrument(skip(self), level = "debug")] + pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> { + let pdus = self.pdus(None, room_id, None); + + pin_mut!(pdus); + pdus.try_next() + .await? .ok_or_else(|| err!(Request(NotFound("No PDU found in room")))) } #[tracing::instrument(skip(self), level = "debug")] - pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result> { - self.pdus_rev(None, room_id, None) - .await? - .next() - .await - .map(|(_, p)| Arc::new(p)) - .ok_or_else(|| err!(Request(NotFound("No PDU found in room")))) + pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { + self.db.latest_pdu_in_room(None, room_id).await } #[tracing::instrument(skip(self), level = "debug")] @@ -202,29 +185,6 @@ impl Service { self.db.get_pdu_count(event_id).await } - // TODO Is this the same as the function above? - /* - #[tracing::instrument(skip(self))] - pub fn latest_pdu_count(&self, room_id: &RoomId) -> Result { - let prefix = self - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - - let mut last_possible_key = prefix.clone(); - last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); - - self.pduid_pdu - .iter_from(&last_possible_key, true) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .next() - .map(|b| self.pdu_count(&b.0)) - .transpose() - .map(|op| op.unwrap_or_default()) - } - */ - /// Returns the json of a pdu. pub async fn get_pdu_json(&self, event_id: &EventId) -> Result { self.db.get_pdu_json(event_id).await @@ -260,16 +220,6 @@ impl Service { self.db.get_pdu(event_id).await } - /// Checks if pdu exists - /// - /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. - pub fn pdu_exists<'a>( - &'a self, - event_id: &'a EventId, - ) -> impl Future + Send + 'a { - self.db.pdu_exists(event_id) - } - /// Returns the pdu. /// /// This does __NOT__ check the outliers `Tree`. @@ -282,6 +232,16 @@ impl Service { self.db.get_pdu_json_from_id(pdu_id).await } + /// Checks if pdu exists + /// + /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. + pub fn pdu_exists<'a>( + &'a self, + event_id: &'a EventId, + ) -> impl Future + Send + 'a { + self.db.pdu_exists(event_id).is_ok() + } + /// Removes a pdu and creates a new one with the same id. #[tracing::instrument(skip(self), level = "debug")] pub async fn replace_pdu( @@ -1027,38 +987,32 @@ impl Service { &'a self, user_id: &'a UserId, room_id: &'a RoomId, - ) -> impl Stream + Send + Unpin + 'a { - self.pdus(Some(user_id), room_id, None) - .map_ok(|stream| stream.map(Ok)) - .try_flatten_stream() - .ignore_err() - .boxed() + ) -> impl Stream + Send + 'a { + self.pdus(Some(user_id), room_id, None).ignore_err() } /// Reverse iteration starting at from. #[tracing::instrument(skip(self), level = "debug")] - pub async fn pdus_rev<'a>( + pub fn pdus_rev<'a>( &'a self, user_id: Option<&'a UserId>, room_id: &'a RoomId, until: Option, - ) -> Result + Send + 'a> { + ) -> impl Stream> + Send + 'a { self.db .pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max)) - .await } /// Forward iteration starting at from. #[tracing::instrument(skip(self), level = "debug")] - pub async fn pdus<'a>( + pub fn pdus<'a>( &'a self, user_id: Option<&'a UserId>, room_id: &'a RoomId, from: Option, - ) -> Result + Send + 'a> { + ) -> impl Stream> + Send + 'a { self.db .pdus(user_id, room_id, from.unwrap_or_else(PduCount::min)) - .await } /// Replace a PDU with the redacted form. @@ -1117,8 +1071,7 @@ impl Service { } let first_pdu = self - .all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id) - .next() + .first_item_in_room(room_id) .await .expect("Room is not empty"); @@ -1232,20 +1185,14 @@ impl Service { self.services .event_handler .handle_incoming_pdu(origin, &room_id, &event_id, value, false) + .boxed() .await?; - let value = self - .get_pdu_json(&event_id) - .await - .expect("We just created it"); - let pdu = self.get_pdu(&event_id).await.expect("We just created it"); + let value = self.get_pdu_json(&event_id).await?; - let shortroomid = self - .services - .short - .get_shortroomid(&room_id) - .await - .expect("room exists"); + let pdu = self.get_pdu(&event_id).await?; + + let shortroomid = self.services.short.get_shortroomid(&room_id).await?; let insert_lock = self.mutex_insert.lock(&room_id).await;