feat: lazy loading

This commit is contained in:
Timo Kösters 2022-01-04 14:30:13 +01:00
parent 5bcc1324ed
commit 68e910bb77
No known key found for this signature in database
GPG key ID: 356E705610F626D5
5 changed files with 321 additions and 42 deletions

View file

@ -6,7 +6,11 @@ use ruma::{
},
events::EventType,
};
use std::{collections::BTreeMap, convert::TryInto, sync::Arc};
use std::{
collections::{BTreeMap, HashSet},
convert::TryInto,
sync::Arc,
};
#[cfg(feature = "conduit_bin")]
use rocket::{get, put};
@ -117,6 +121,7 @@ pub async fn get_message_events_route(
body: Ruma<get_message_events::Request<'_>>,
) -> ConduitResult<get_message_events::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
if !db.rooms.is_joined(sender_user, &body.room_id)? {
return Err(Error::BadRequest(
@ -136,6 +141,12 @@ pub async fn get_message_events_route(
// Use limit or else 10
let limit = body.limit.try_into().map_or(10_usize, |l: u32| l as usize);
let next_token;
let mut resp = get_message_events::Response::new();
let mut lazy_loaded = HashSet::new();
match body.dir {
get_message_events::Direction::Forward => {
let events_after: Vec<_> = db
@ -152,21 +163,27 @@ pub async fn get_message_events_route(
.take_while(|&(k, _)| Some(Ok(k)) != to) // Stop at `to`
.collect();
let end_token = events_after.last().map(|(count, _)| count.to_string());
for (_, event) in &events_after {
if !db.rooms.lazy_load_was_sent_before(
&sender_user,
&sender_device,
&body.room_id,
&event.sender,
)? {
lazy_loaded.insert(event.sender.clone());
}
}
next_token = events_after.last().map(|(count, _)| count).copied();
let events_after: Vec<_> = events_after
.into_iter()
.map(|(_, pdu)| pdu.to_room_event())
.collect();
let resp = get_message_events::Response {
start: body.from.to_owned(),
end: end_token,
chunk: events_after,
state: Vec::new(),
};
Ok(resp.into())
resp.start = body.from.to_owned();
resp.end = next_token.map(|count| count.to_string());
resp.chunk = events_after;
}
get_message_events::Direction::Backward => {
let events_before: Vec<_> = db
@ -183,21 +200,51 @@ pub async fn get_message_events_route(
.take_while(|&(k, _)| Some(Ok(k)) != to) // Stop at `to`
.collect();
let start_token = events_before.last().map(|(count, _)| count.to_string());
for (_, event) in &events_before {
if !db.rooms.lazy_load_was_sent_before(
&sender_user,
&sender_device,
&body.room_id,
&event.sender,
)? {
lazy_loaded.insert(event.sender.clone());
}
}
next_token = events_before.last().map(|(count, _)| count).copied();
let events_before: Vec<_> = events_before
.into_iter()
.map(|(_, pdu)| pdu.to_room_event())
.collect();
let resp = get_message_events::Response {
start: body.from.to_owned(),
end: start_token,
chunk: events_before,
state: Vec::new(),
};
Ok(resp.into())
resp.start = body.from.to_owned();
resp.end = next_token.map(|count| count.to_string());
resp.chunk = events_before;
}
}
db.rooms
.lazy_load_confirm_delivery(&sender_user, &sender_device, &body.room_id, from)?;
resp.state = Vec::new();
for ll_id in &lazy_loaded {
if let Some(member_event) =
db.rooms
.room_state_get(&body.room_id, &EventType::RoomMember, ll_id.as_str())?
{
resp.state.push(member_event.to_state_event());
}
}
if let Some(next_token) = next_token {
db.rooms.lazy_load_mark_sent(
&sender_user,
&sender_device,
&body.room_id,
lazy_loaded.into_iter().collect(),
next_token,
);
}
Ok(resp.into())
}