merge rooms threads data and service

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-11-06 21:02:23 +00:00
parent 9da523c004
commit 137e3008ea
5 changed files with 91 additions and 124 deletions

View file

@ -97,7 +97,7 @@ async fn paginate_relations_with_filter(
filter_event_type: Option<TimelineEventType>, filter_rel_type: Option<RelationType>, from: Option<&str>, filter_event_type: Option<TimelineEventType>, filter_rel_type: Option<RelationType>, from: Option<&str>,
to: Option<&str>, limit: Option<UInt>, recurse: bool, dir: Direction, to: Option<&str>, limit: Option<UInt>, recurse: bool, dir: Direction,
) -> Result<get_relating_events::v1::Response> { ) -> Result<get_relating_events::v1::Response> {
let from: PduCount = from let start: PduCount = from
.map(str::parse) .map(str::parse)
.transpose()? .transpose()?
.unwrap_or_else(|| match dir { .unwrap_or_else(|| match dir {
@ -124,7 +124,7 @@ async fn paginate_relations_with_filter(
let events: Vec<PdusIterItem> = services let events: Vec<PdusIterItem> = services
.rooms .rooms
.pdu_metadata .pdu_metadata
.get_relations(sender_user, room_id, target, from, limit, depth, dir) .get_relations(sender_user, room_id, target, start, limit, depth, dir)
.await .await
.into_iter() .into_iter()
.filter(|(_, pdu)| { .filter(|(_, pdu)| {
@ -146,16 +146,20 @@ async fn paginate_relations_with_filter(
.await; .await;
let next_batch = match dir { let next_batch = match dir {
Direction::Backward => events.first(),
Direction::Forward => events.last(), Direction::Forward => events.last(),
Direction::Backward => events.first(),
} }
.map(at!(0)) .map(at!(0))
.map(|count| match dir {
Direction::Forward => count.saturating_add(1),
Direction::Backward => count.saturating_sub(1),
})
.as_ref() .as_ref()
.map(ToString::to_string); .map(ToString::to_string);
Ok(get_relating_events::v1::Response { Ok(get_relating_events::v1::Response {
next_batch, next_batch,
prev_batch: Some(from.to_string()), prev_batch: from.map(Into::into),
recursion_depth: recurse.then_some(depth.into()), recursion_depth: recurse.then_some(depth.into()),
chunk: events chunk: events
.into_iter() .into_iter()

View file

@ -1,5 +1,5 @@
use axum::extract::State; use axum::extract::State;
use conduit::{PduCount, PduEvent}; use conduit::{at, PduCount, PduEvent};
use futures::StreamExt; use futures::StreamExt;
use ruma::{api::client::threads::get_threads, uint}; use ruma::{api::client::threads::get_threads, uint};
@ -44,12 +44,16 @@ pub(crate) async fn get_threads_route(
Ok(get_threads::v1::Response { Ok(get_threads::v1::Response {
next_batch: threads next_batch: threads
.last() .last()
.map(|(count, _)| count) .filter(|_| threads.len() >= limit)
.map(at!(0))
.map(|count| count.saturating_sub(1))
.as_ref()
.map(ToString::to_string), .map(ToString::to_string),
chunk: threads chunk: threads
.into_iter() .into_iter()
.map(|(_, pdu)| pdu.to_room_event()) .map(at!(1))
.map(|pdu| pdu.to_room_event())
.collect(), .collect(),
}) })
} }

View file

@ -1,5 +1,6 @@
use std::{mem::size_of, sync::Arc}; use std::{mem::size_of, sync::Arc};
use arrayvec::ArrayVec;
use conduit::{ use conduit::{
result::LogErr, result::LogErr,
utils::{stream::TryIgnore, u64_from_u8, ReadyExt}, utils::{stream::TryIgnore, u64_from_u8, ReadyExt},
@ -54,15 +55,13 @@ impl Data {
pub(super) fn get_relations<'a>( pub(super) fn get_relations<'a>(
&'a self, user_id: &'a UserId, shortroomid: ShortRoomId, target: ShortEventId, from: PduCount, dir: Direction, &'a self, user_id: &'a UserId, shortroomid: ShortRoomId, target: ShortEventId, from: PduCount, dir: Direction,
) -> impl Stream<Item = PdusIterItem> + Send + '_ { ) -> impl Stream<Item = PdusIterItem> + Send + '_ {
let current: RawPduId = PduId { let mut current = ArrayVec::<u8, 16>::new();
shortroomid, current.extend(target.to_be_bytes());
shorteventid: from, current.extend(from.into_unsigned().to_be_bytes());
} let current = current.as_slice();
.into();
match dir { match dir {
Direction::Forward => self.tofrom_relation.raw_keys_from(&current).boxed(), Direction::Forward => self.tofrom_relation.raw_keys_from(current).boxed(),
Direction::Backward => self.tofrom_relation.rev_raw_keys_from(&current).boxed(), Direction::Backward => self.tofrom_relation.rev_raw_keys_from(current).boxed(),
} }
.ignore_err() .ignore_err()
.ready_take_while(move |key| key.starts_with(&target.to_be_bytes())) .ready_take_while(move |key| key.starts_with(&target.to_be_bytes()))

View file

@ -1,90 +0,0 @@
use std::sync::Arc;
use conduit::{
result::LogErr,
utils::{stream::TryIgnore, ReadyExt},
PduCount, PduEvent, Result,
};
use database::{Deserialized, Map};
use futures::{Stream, StreamExt};
use ruma::{api::client::threads::get_threads::v1::IncludeThreads, OwnedUserId, RoomId, UserId};
use crate::{
rooms,
rooms::{
short::ShortRoomId,
timeline::{PduId, RawPduId},
},
Dep,
};
pub(super) struct Data {
threadid_userids: Arc<Map>,
services: Services,
}
struct Services {
short: Dep<rooms::short::Service>,
timeline: Dep<rooms::timeline::Service>,
}
impl Data {
pub(super) fn new(args: &crate::Args<'_>) -> Self {
let db = &args.db;
Self {
threadid_userids: db["threadid_userids"].clone(),
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
},
}
}
#[inline]
pub(super) async fn threads_until<'a>(
&'a self, user_id: &'a UserId, room_id: &'a RoomId, until: PduCount, _include: &'a IncludeThreads,
) -> Result<impl Stream<Item = (PduCount, PduEvent)> + Send + 'a> {
let shortroomid: ShortRoomId = self.services.short.get_shortroomid(room_id).await?;
let current: RawPduId = PduId {
shortroomid,
shorteventid: until.saturating_sub(1),
}
.into();
let stream = self
.threadid_userids
.rev_raw_keys_from(&current)
.ignore_err()
.map(RawPduId::from)
.ready_take_while(move |pdu_id| pdu_id.shortroomid() == shortroomid.to_be_bytes())
.filter_map(move |pdu_id| async move {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let pdu_id: PduId = pdu_id.into();
if pdu.sender != user_id {
pdu.remove_transaction_id().log_err().ok();
}
Some((pdu_id.shorteventid, pdu))
});
Ok(stream)
}
pub(super) fn update_participants(&self, root_id: &RawPduId, participants: &[OwnedUserId]) -> Result {
let users = participants
.iter()
.map(|user| user.as_bytes())
.collect::<Vec<_>>()
.join(&[0xFF][..]);
self.threadid_userids.insert(root_id, &users);
Ok(())
}
pub(super) async fn get_participants(&self, root_id: &RawPduId) -> Result<Vec<OwnedUserId>> {
self.threadid_userids.get(root_id).await.deserialized()
}
}

View file

@ -1,34 +1,44 @@
mod data;
use std::{collections::BTreeMap, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use conduit::{err, PduCount, PduEvent, Result}; use conduit::{
use data::Data; err,
use futures::Stream; utils::{stream::TryIgnore, ReadyExt},
PduCount, PduEvent, PduId, RawPduId, Result,
};
use database::{Deserialized, Map};
use futures::{Stream, StreamExt};
use ruma::{ use ruma::{
api::client::threads::get_threads::v1::IncludeThreads, events::relation::BundledThread, uint, CanonicalJsonValue, api::client::threads::get_threads::v1::IncludeThreads, events::relation::BundledThread, uint, CanonicalJsonValue,
EventId, RoomId, UserId, EventId, OwnedUserId, RoomId, UserId,
}; };
use serde_json::json; use serde_json::json;
use crate::{rooms, Dep}; use crate::{rooms, rooms::short::ShortRoomId, Dep};
pub struct Service { pub struct Service {
services: Services,
db: Data, db: Data,
services: Services,
} }
struct Services { struct Services {
short: Dep<rooms::short::Service>,
timeline: Dep<rooms::timeline::Service>, timeline: Dep<rooms::timeline::Service>,
} }
pub(super) struct Data {
threadid_userids: Arc<Map>,
}
impl crate::Service for Service { impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> { fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self { Ok(Arc::new(Self {
db: Data {
threadid_userids: args.db["threadid_userids"].clone(),
},
services: Services { services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"), timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
}, },
db: Data::new(&args),
})) }))
} }
@ -36,14 +46,6 @@ impl crate::Service for Service {
} }
impl Service { impl Service {
pub async fn threads_until<'a>(
&'a self, user_id: &'a UserId, room_id: &'a RoomId, until: PduCount, include: &'a IncludeThreads,
) -> Result<impl Stream<Item = (PduCount, PduEvent)> + Send + 'a> {
self.db
.threads_until(user_id, room_id, until, include)
.await
}
pub async fn add_to_thread(&self, root_event_id: &EventId, pdu: &PduEvent) -> Result<()> { pub async fn add_to_thread(&self, root_event_id: &EventId, pdu: &PduEvent) -> Result<()> {
let root_id = self let root_id = self
.services .services
@ -113,13 +115,61 @@ impl Service {
} }
let mut users = Vec::new(); let mut users = Vec::new();
if let Ok(userids) = self.db.get_participants(&root_id).await { if let Ok(userids) = self.get_participants(&root_id).await {
users.extend_from_slice(&userids); users.extend_from_slice(&userids);
} else { } else {
users.push(root_pdu.sender); users.push(root_pdu.sender);
} }
users.push(pdu.sender.clone()); users.push(pdu.sender.clone());
self.db.update_participants(&root_id, &users) self.update_participants(&root_id, &users)
}
pub async fn threads_until<'a>(
&'a self, user_id: &'a UserId, room_id: &'a RoomId, shorteventid: PduCount, _inc: &'a IncludeThreads,
) -> Result<impl Stream<Item = (PduCount, PduEvent)> + Send + 'a> {
let shortroomid: ShortRoomId = self.services.short.get_shortroomid(room_id).await?;
let current: RawPduId = PduId {
shortroomid,
shorteventid,
}
.into();
let stream = self
.db
.threadid_userids
.rev_raw_keys_from(&current)
.ignore_err()
.map(RawPduId::from)
.ready_take_while(move |pdu_id| pdu_id.shortroomid() == shortroomid.to_be_bytes())
.filter_map(move |pdu_id| async move {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let pdu_id: PduId = pdu_id.into();
if pdu.sender != user_id {
pdu.remove_transaction_id().ok();
}
Some((pdu_id.shorteventid, pdu))
});
Ok(stream)
}
pub(super) fn update_participants(&self, root_id: &RawPduId, participants: &[OwnedUserId]) -> Result {
let users = participants
.iter()
.map(|user| user.as_bytes())
.collect::<Vec<_>>()
.join(&[0xFF][..]);
self.db.threadid_userids.insert(root_id, &users);
Ok(())
}
pub(super) async fn get_participants(&self, root_id: &RawPduId) -> Result<Vec<OwnedUserId>> {
self.db.threadid_userids.get(root_id).await.deserialized()
} }
} }