diff --git a/src/api/client/context.rs b/src/api/client/context.rs index d07f6ac1..f5f981ba 100644 --- a/src/api/client/context.rs +++ b/src/api/client/context.rs @@ -82,7 +82,7 @@ pub(crate) async fn get_context_route( let events_before: Vec<_> = services .rooms .timeline - .pdus_rev(sender_user, room_id, base_token.saturating_sub(1)) + .pdus_rev(Some(sender_user), room_id, Some(base_token.saturating_sub(1))) .await? .ready_filter_map(|item| event_filter(item, filter)) .filter_map(|item| ignored_filter(&services, item, sender_user)) @@ -94,7 +94,7 @@ pub(crate) async fn get_context_route( let events_after: Vec<_> = services .rooms .timeline - .pdus(sender_user, room_id, base_token.saturating_add(1)) + .pdus(Some(sender_user), room_id, Some(base_token.saturating_add(1))) .await? .ready_filter_map(|item| event_filter(item, filter)) .filter_map(|item| ignored_filter(&services, item, sender_user)) diff --git a/src/api/client/message.rs b/src/api/client/message.rs index e76325aa..e8306de9 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -100,14 +100,14 @@ pub(crate) async fn get_message_events_route( Direction::Forward => services .rooms .timeline - .pdus(sender_user, room_id, from) + .pdus(Some(sender_user), room_id, Some(from)) .await? .boxed(), Direction::Backward => services .rooms .timeline - .pdus_rev(sender_user, room_id, from) + .pdus_rev(Some(sender_user), room_id, Some(from)) .await? .boxed(), }; diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index f047d176..3201b827 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -14,7 +14,7 @@ async fn load_timeline( let last_timeline_count = services .rooms .timeline - .last_timeline_count(sender_user, room_id) + .last_timeline_count(Some(sender_user), room_id) .await?; if last_timeline_count <= roomsincecount { @@ -24,7 +24,7 @@ async fn load_timeline( let mut non_timeline_pdus = services .rooms .timeline - .pdus_rev(sender_user, room_id, PduCount::max()) + .pdus_rev(Some(sender_user), room_id, None) .await? .ready_take_while(|(pducount, _)| *pducount > roomsincecount); diff --git a/src/api/server/backfill.rs b/src/api/server/backfill.rs index 47f02841..be770ee8 100644 --- a/src/api/server/backfill.rs +++ b/src/api/server/backfill.rs @@ -6,7 +6,7 @@ use conduit::{ PduCount, Result, }; use futures::{FutureExt, StreamExt}; -use ruma::{api::federation::backfill::get_backfill, uint, user_id, MilliSecondsSinceUnixEpoch}; +use ruma::{api::federation::backfill::get_backfill, uint, MilliSecondsSinceUnixEpoch}; use super::AccessCheck; use crate::Ruma; @@ -51,7 +51,7 @@ pub(crate) async fn get_backfill_route( let pdus = services .rooms .timeline - .pdus_rev(user_id!("@doesntmatter:conduit.rs"), &body.room_id, until) + .pdus_rev(None, &body.room_id, Some(until)) .await? .take(limit) .filter_map(|(_, pdu)| async move { diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index f320e6a0..7f1873ab 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -1,4 +1,5 @@ use std::{ + borrow::Borrow, collections::{hash_map, HashMap}, sync::Arc, }; @@ -53,7 +54,7 @@ impl Data { } } - pub(super) async fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result { + pub(super) async fn last_timeline_count(&self, sender_user: Option<&UserId>, room_id: &RoomId) -> Result { match self .lasttimelinecount_cache .lock() @@ -202,7 +203,7 @@ impl Data { /// happened before the event with id `until` in reverse-chronological /// order. pub(super) async fn pdus_rev<'a>( - &'a self, user_id: &'a UserId, room_id: &'a RoomId, until: PduCount, + &'a self, user_id: Option<&'a UserId>, room_id: &'a RoomId, until: PduCount, ) -> Result + Send + 'a> { let current = self.count_to_id(room_id, until).await?; let prefix = current.shortroomid(); @@ -211,13 +212,13 @@ impl Data { .rev_raw_stream_from(¤t) .ignore_err() .ready_take_while(move |(key, _)| key.starts_with(&prefix)) - .map(|item| Self::each_pdu(item, user_id)); + .map(move |item| Self::each_pdu(item, user_id)); Ok(stream) } pub(super) async fn pdus<'a>( - &'a self, user_id: &'a UserId, room_id: &'a RoomId, from: PduCount, + &'a self, user_id: Option<&'a UserId>, room_id: &'a RoomId, from: PduCount, ) -> Result + Send + 'a> { let current = self.count_to_id(room_id, from).await?; let prefix = current.shortroomid(); @@ -231,13 +232,13 @@ impl Data { Ok(stream) } - fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: &UserId) -> PdusIterItem { + fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> PdusIterItem { let pdu_id: RawPduId = pdu_id.into(); let mut pdu = serde_json::from_slice::(pdu).expect("PduEvent in pduid_pdu database column is invalid JSON"); - if pdu.sender != user_id { + if Some(pdu.sender.borrow()) != user_id { pdu.remove_transaction_id().log_err().ok(); } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 81d372d7..281879d2 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -177,7 +177,7 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result> { - self.pdus_rev(user_id!("@placeholder:conduwuit.placeholder"), room_id, PduCount::max()) + self.pdus_rev(None, room_id, None) .await? .next() .await @@ -186,7 +186,7 @@ impl Service { } #[tracing::instrument(skip(self), level = "debug")] - pub async fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result { + pub async fn last_timeline_count(&self, sender_user: Option<&UserId>, room_id: &RoomId) -> Result { self.db.last_timeline_count(sender_user, room_id).await } @@ -976,23 +976,27 @@ impl Service { pub async fn all_pdus<'a>( &'a self, user_id: &'a UserId, room_id: &'a RoomId, ) -> Result + Send + 'a> { - self.pdus(user_id, room_id, PduCount::min()).await + self.pdus(Some(user_id), room_id, None).await } /// Reverse iteration starting at from. #[tracing::instrument(skip(self), level = "debug")] pub async fn pdus_rev<'a>( - &'a self, user_id: &'a UserId, room_id: &'a RoomId, until: PduCount, + &'a self, user_id: Option<&'a UserId>, room_id: &'a RoomId, until: Option, ) -> Result + Send + 'a> { - self.db.pdus_rev(user_id, room_id, until).await + self.db + .pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max)) + .await } /// Forward iteration starting at from. #[tracing::instrument(skip(self), level = "debug")] pub async fn pdus<'a>( - &'a self, user_id: &'a UserId, room_id: &'a RoomId, from: PduCount, + &'a self, user_id: Option<&'a UserId>, room_id: &'a RoomId, from: Option, ) -> Result + Send + 'a> { - self.db.pdus(user_id, room_id, from).await + self.db + .pdus(user_id, room_id, from.unwrap_or_else(PduCount::min)) + .await } /// Replace a PDU with the redacted form.