flatten timeline pdus iterations; increase concurrency

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-04 04:12:50 +00:00
parent 27328cbc01
commit 925061b92d
11 changed files with 238 additions and 237 deletions

View file

@ -26,7 +26,7 @@ pub(super) async fn handle_prev_pdu<'a>(
(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>),
>,
create_event: &Arc<PduEvent>,
first_pdu_in_room: &Arc<PduEvent>,
first_pdu_in_room: &PduEvent,
prev_id: &EventId,
) -> Result {
// Check for disabled again because it might have changed

View file

@ -1,22 +1,15 @@
use std::{
borrow::Borrow,
collections::{hash_map, HashMap},
sync::Arc,
};
use std::{borrow::Borrow, sync::Arc};
use conduwuit::{
at, err,
result::{LogErr, NotFound},
utils,
utils::{future::TryExtExt, stream::TryIgnore, ReadyExt},
utils::stream::TryReadyExt,
Err, PduCount, PduEvent, Result,
};
use database::{Database, Deserialized, Json, KeyVal, Map};
use futures::{future::select_ok, FutureExt, Stream, StreamExt};
use ruma::{
api::Direction, CanonicalJsonObject, EventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
};
use tokio::sync::Mutex;
use futures::{future::select_ok, pin_mut, FutureExt, Stream, TryFutureExt, TryStreamExt};
use ruma::{api::Direction, CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId};
use super::{PduId, RawPduId};
use crate::{rooms, rooms::short::ShortRoomId, Dep};
@ -27,7 +20,6 @@ pub(super) struct Data {
pduid_pdu: Arc<Map>,
userroomid_highlightcount: Arc<Map>,
userroomid_notificationcount: Arc<Map>,
pub(super) lasttimelinecount_cache: LastTimelineCountCache,
pub(super) db: Arc<Database>,
services: Services,
}
@ -37,7 +29,6 @@ struct Services {
}
pub type PdusIterItem = (PduCount, PduEvent);
type LastTimelineCountCache = Mutex<HashMap<OwnedRoomId, PduCount>>;
impl Data {
pub(super) fn new(args: &crate::Args<'_>) -> Self {
@ -48,7 +39,6 @@ impl Data {
pduid_pdu: db["pduid_pdu"].clone(),
userroomid_highlightcount: db["userroomid_highlightcount"].clone(),
userroomid_notificationcount: db["userroomid_notificationcount"].clone(),
lasttimelinecount_cache: Mutex::new(HashMap::new()),
db: args.db.clone(),
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
@ -56,27 +46,39 @@ impl Data {
}
}
#[inline]
pub(super) async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
match self
.lasttimelinecount_cache
.lock()
.await
.entry(room_id.into())
{
| hash_map::Entry::Occupied(o) => Ok(*o.get()),
| hash_map::Entry::Vacant(v) => Ok(self
.pdus_rev(sender_user, room_id, PduCount::max())
.await?
.next()
.await
.map(at!(0))
.filter(|&count| matches!(count, PduCount::Normal(_)))
.map_or_else(PduCount::max, |count| *v.insert(count))),
}
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pin_mut!(pdus_rev);
let last_count = pdus_rev
.try_next()
.await?
.map(at!(0))
.filter(|&count| matches!(count, PduCount::Normal(_)))
.unwrap_or_else(PduCount::max);
Ok(last_count)
}
#[inline]
pub(super) async fn latest_pdu_in_room(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pin_mut!(pdus_rev);
pdus_rev
.try_next()
.await?
.map(at!(1))
.ok_or_else(|| err!(Request(NotFound("no PDU's found in room"))))
}
/// Returns the `count` of this pdu's id.
@ -129,7 +131,7 @@ impl Data {
pub(super) async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> Result {
let pduid = self.get_pdu_id(event_id).await?;
self.pduid_pdu.get(&pduid).await.map(|_| ())
self.pduid_pdu.exists(&pduid).await
}
/// Returns the pdu.
@ -148,17 +150,17 @@ impl Data {
/// Like get_non_outlier_pdu(), but without the expense of fetching and
/// parsing the PduEvent
#[inline]
pub(super) async fn outlier_pdu_exists(&self, event_id: &EventId) -> Result {
self.eventid_outlierpdu.get(event_id).await.map(|_| ())
self.eventid_outlierpdu.exists(event_id).await
}
/// Like get_pdu(), but without the expense of fetching and parsing the data
pub(super) async fn pdu_exists(&self, event_id: &EventId) -> bool {
let non_outlier = self.non_outlier_pdu_exists(event_id).is_ok();
let outlier = self.outlier_pdu_exists(event_id).is_ok();
pub(super) async fn pdu_exists(&self, event_id: &EventId) -> Result {
let non_outlier = self.non_outlier_pdu_exists(event_id).boxed();
let outlier = self.outlier_pdu_exists(event_id).boxed();
//TODO: parallelize
non_outlier.await || outlier.await
select_ok([non_outlier, outlier]).await.map(at!(0))
}
/// Returns the pdu.
@ -186,11 +188,6 @@ impl Data {
debug_assert!(matches!(count, PduCount::Normal(_)), "PduCount not Normal");
self.pduid_pdu.raw_put(pdu_id, Json(json));
self.lasttimelinecount_cache
.lock()
.await
.insert(pdu.room_id.clone(), count);
self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id);
self.eventid_outlierpdu.remove(pdu.event_id.as_bytes());
}
@ -225,49 +222,44 @@ impl Data {
/// Returns an iterator over all events and their tokens in a room that
/// happened before the event with id `until` in reverse-chronological
/// order.
pub(super) async fn pdus_rev<'a>(
pub(super) fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: PduCount,
) -> Result<impl Stream<Item = PdusIterItem> + Send + 'a> {
let current = self
.count_to_id(room_id, until, Direction::Backward)
.await?;
let prefix = current.shortroomid();
let stream = self
.pduid_pdu
.rev_raw_stream_from(&current)
.ignore_err()
.ready_take_while(move |(key, _)| key.starts_with(&prefix))
.map(move |item| Self::each_pdu(item, user_id));
Ok(stream)
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.count_to_id(room_id, until, Direction::Backward)
.map_ok(move |current| {
let prefix = current.shortroomid();
self.pduid_pdu
.rev_raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
})
.try_flatten_stream()
}
pub(super) async fn pdus<'a>(
pub(super) fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: PduCount,
) -> Result<impl Stream<Item = PdusIterItem> + Send + Unpin + 'a> {
let current = self.count_to_id(room_id, from, Direction::Forward).await?;
let prefix = current.shortroomid();
let stream = self
.pduid_pdu
.raw_stream_from(&current)
.ignore_err()
.ready_take_while(move |(key, _)| key.starts_with(&prefix))
.map(move |item| Self::each_pdu(item, user_id));
Ok(stream)
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.count_to_id(room_id, from, Direction::Forward)
.map_ok(move |current| {
let prefix = current.shortroomid();
self.pduid_pdu
.raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
})
.try_flatten_stream()
}
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> PdusIterItem {
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
let pdu_id: RawPduId = pdu_id.into();
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)
.expect("PduEvent in pduid_pdu database column is invalid JSON");
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
if Some(pdu.sender.borrow()) != user_id {
pdu.remove_transaction_id().log_err().ok();
@ -275,7 +267,7 @@ impl Data {
pdu.add_age().log_err().ok();
(pdu_id.pdu_count(), pdu)
Ok((pdu_id.pdu_count(), pdu))
}
pub(super) fn increment_notification_counts(

View file

@ -9,14 +9,16 @@ use std::{
};
use conduwuit::{
debug, debug_warn, err, error, implement, info,
at, debug, debug_warn, err, error, implement, info,
pdu::{gen_event_id, EventHash, PduBuilder, PduCount, PduEvent},
utils::{self, stream::TryIgnore, IterStream, MutexMap, MutexMapGuard, ReadyExt},
utils::{
self, future::TryExtExt, stream::TryIgnore, IterStream, MutexMap, MutexMapGuard, ReadyExt,
},
validated, warn, Err, Error, Result, Server,
};
pub use conduwuit::{PduId, RawPduId};
use futures::{
future, future::ready, Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
future, future::ready, pin_mut, Future, FutureExt, Stream, StreamExt, TryStreamExt,
};
use ruma::{
api::federation,
@ -34,7 +36,7 @@ use ruma::{
},
push::{Action, Ruleset, Tweak},
state_res::{self, Event, RoomVersion},
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
uint, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
OwnedServerName, OwnedUserId, RoomId, RoomVersionId, ServerName, UserId,
};
use serde::Deserialize;
@ -139,53 +141,34 @@ impl crate::Service for Service {
}
fn memory_usage(&self, out: &mut dyn Write) -> Result<()> {
/*
let lasttimelinecount_cache = self
.db
.lasttimelinecount_cache
.lock()
.expect("locked")
.len();
writeln!(out, "lasttimelinecount_cache: {lasttimelinecount_cache}")?;
*/
let mutex_insert = self.mutex_insert.len();
writeln!(out, "insert_mutex: {mutex_insert}")?;
Ok(())
}
fn clear_cache(&self) {
/*
self.db
.lasttimelinecount_cache
.lock()
.expect("locked")
.clear();
*/
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Arc<PduEvent>> {
self.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id)
.next()
.await
.map(|(_, p)| Arc::new(p))
pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
self.first_item_in_room(room_id).await.map(at!(1))
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> {
let pdus = self.pdus(None, room_id, None);
pin_mut!(pdus);
pdus.try_next()
.await?
.ok_or_else(|| err!(Request(NotFound("No PDU found in room"))))
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<Arc<PduEvent>> {
self.pdus_rev(None, room_id, None)
.await?
.next()
.await
.map(|(_, p)| Arc::new(p))
.ok_or_else(|| err!(Request(NotFound("No PDU found in room"))))
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
self.db.latest_pdu_in_room(None, room_id).await
}
#[tracing::instrument(skip(self), level = "debug")]
@ -202,29 +185,6 @@ impl Service {
self.db.get_pdu_count(event_id).await
}
// TODO Is this the same as the function above?
/*
#[tracing::instrument(skip(self))]
pub fn latest_pdu_count(&self, room_id: &RoomId) -> Result<u64> {
let prefix = self
.get_shortroomid(room_id)?
.expect("room exists")
.to_be_bytes()
.to_vec();
let mut last_possible_key = prefix.clone();
last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes());
self.pduid_pdu
.iter_from(&last_possible_key, true)
.take_while(move |(k, _)| k.starts_with(&prefix))
.next()
.map(|b| self.pdu_count(&b.0))
.transpose()
.map(|op| op.unwrap_or_default())
}
*/
/// Returns the json of a pdu.
pub async fn get_pdu_json(&self, event_id: &EventId) -> Result<CanonicalJsonObject> {
self.db.get_pdu_json(event_id).await
@ -260,16 +220,6 @@ impl Service {
self.db.get_pdu(event_id).await
}
/// Checks if pdu exists
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
pub fn pdu_exists<'a>(
&'a self,
event_id: &'a EventId,
) -> impl Future<Output = bool> + Send + 'a {
self.db.pdu_exists(event_id)
}
/// Returns the pdu.
///
/// This does __NOT__ check the outliers `Tree`.
@ -282,6 +232,16 @@ impl Service {
self.db.get_pdu_json_from_id(pdu_id).await
}
/// Checks if pdu exists
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
pub fn pdu_exists<'a>(
&'a self,
event_id: &'a EventId,
) -> impl Future<Output = bool> + Send + 'a {
self.db.pdu_exists(event_id).is_ok()
}
/// Removes a pdu and creates a new one with the same id.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn replace_pdu(
@ -1027,38 +987,32 @@ impl Service {
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
) -> impl Stream<Item = PdusIterItem> + Send + Unpin + 'a {
self.pdus(Some(user_id), room_id, None)
.map_ok(|stream| stream.map(Ok))
.try_flatten_stream()
.ignore_err()
.boxed()
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
self.pdus(Some(user_id), room_id, None).ignore_err()
}
/// Reverse iteration starting at from.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn pdus_rev<'a>(
pub fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: Option<PduCount>,
) -> Result<impl Stream<Item = PdusIterItem> + Send + 'a> {
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
.await
}
/// Forward iteration starting at from.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn pdus<'a>(
pub fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: Option<PduCount>,
) -> Result<impl Stream<Item = PdusIterItem> + Send + 'a> {
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
.await
}
/// Replace a PDU with the redacted form.
@ -1117,8 +1071,7 @@ impl Service {
}
let first_pdu = self
.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id)
.next()
.first_item_in_room(room_id)
.await
.expect("Room is not empty");
@ -1232,20 +1185,14 @@ impl Service {
self.services
.event_handler
.handle_incoming_pdu(origin, &room_id, &event_id, value, false)
.boxed()
.await?;
let value = self
.get_pdu_json(&event_id)
.await
.expect("We just created it");
let pdu = self.get_pdu(&event_id).await.expect("We just created it");
let value = self.get_pdu_json(&event_id).await?;
let shortroomid = self
.services
.short
.get_shortroomid(&room_id)
.await
.expect("room exists");
let pdu = self.get_pdu(&event_id).await?;
let shortroomid = self.services.short.get_shortroomid(&room_id).await?;
let insert_lock = self.mutex_insert.lock(&room_id).await;