refactor and optimize receipt service data
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
343ec59a8b
commit
c903a71807
5 changed files with 26 additions and 45 deletions
|
@ -559,8 +559,6 @@ async fn load_joined_room(
|
|||
.lazy_loading
|
||||
.lazy_load_confirm_delivery(sender_user, sender_device, room_id, sincecount);
|
||||
|
||||
// Database queries:
|
||||
|
||||
let current_shortstatehash = services
|
||||
.rooms
|
||||
.state
|
||||
|
@ -983,9 +981,9 @@ async fn load_joined_room(
|
|||
.filter_map(|(read_user, _, edu)| async move {
|
||||
services
|
||||
.users
|
||||
.user_is_ignored(&read_user, sender_user)
|
||||
.user_is_ignored(read_user, sender_user)
|
||||
.await
|
||||
.or_some((read_user, edu))
|
||||
.or_some((read_user.to_owned(), edu))
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
|
|
@ -495,11 +495,11 @@ pub(crate) async fn sync_events_v4_route(
|
|||
.read_receipt
|
||||
.readreceipts_since(room_id, *roomsince)
|
||||
.filter_map(|(read_user, ts, v)| async move {
|
||||
(!services
|
||||
services
|
||||
.users
|
||||
.user_is_ignored(&read_user, sender_user)
|
||||
.await)
|
||||
.then_some((read_user, ts, v))
|
||||
.user_is_ignored(read_user, sender_user)
|
||||
.await
|
||||
.or_some((read_user.to_owned(), ts, v))
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
|
|
@ -1,16 +1,15 @@
|
|||
use std::{mem::size_of, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduit::{
|
||||
utils,
|
||||
utils::{stream::TryIgnore, ReadyExt},
|
||||
Error, Result,
|
||||
Result,
|
||||
};
|
||||
use database::{Deserialized, Json, Map};
|
||||
use futures::{Stream, StreamExt};
|
||||
use ruma::{
|
||||
events::{receipt::ReceiptEvent, AnySyncEphemeralRoomEvent},
|
||||
serde::Raw,
|
||||
CanonicalJsonObject, OwnedUserId, RoomId, UserId,
|
||||
CanonicalJsonObject, RoomId, UserId,
|
||||
};
|
||||
|
||||
use crate::{globals, Dep};
|
||||
|
@ -26,7 +25,7 @@ struct Services {
|
|||
globals: Dep<globals::Service>,
|
||||
}
|
||||
|
||||
pub(super) type ReceiptItem = (OwnedUserId, u64, Raw<AnySyncEphemeralRoomEvent>);
|
||||
pub(super) type ReceiptItem<'a> = (&'a UserId, u64, Raw<AnySyncEphemeralRoomEvent>);
|
||||
|
||||
impl Data {
|
||||
pub(super) fn new(args: &crate::Args<'_>) -> Self {
|
||||
|
@ -59,39 +58,23 @@ impl Data {
|
|||
|
||||
pub(super) fn readreceipts_since<'a>(
|
||||
&'a self, room_id: &'a RoomId, since: u64,
|
||||
) -> impl Stream<Item = ReceiptItem> + Send + 'a {
|
||||
) -> impl Stream<Item = ReceiptItem<'_>> + Send + 'a {
|
||||
type Key<'a> = (&'a RoomId, u64, &'a UserId);
|
||||
type KeyVal<'a> = (Key<'a>, CanonicalJsonObject);
|
||||
|
||||
let after_since = since.saturating_add(1); // +1 so we don't send the event at since
|
||||
let first_possible_edu = (room_id, after_since);
|
||||
|
||||
let mut prefix = room_id.as_bytes().to_vec();
|
||||
prefix.push(0xFF);
|
||||
let prefix2 = prefix.clone();
|
||||
|
||||
self.readreceiptid_readreceipt
|
||||
.stream_from_raw(&first_possible_edu)
|
||||
.stream_from(&first_possible_edu)
|
||||
.ignore_err()
|
||||
.ready_take_while(move |(k, _)| k.starts_with(&prefix2))
|
||||
.map(move |(k, v)| {
|
||||
let count_offset = prefix.len().saturating_add(size_of::<u64>());
|
||||
let user_id_offset = count_offset.saturating_add(1);
|
||||
|
||||
let count = utils::u64_from_bytes(&k[prefix.len()..count_offset])
|
||||
.map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?;
|
||||
|
||||
let user_id_str = utils::string_from_bytes(&k[user_id_offset..])
|
||||
.map_err(|_| Error::bad_database("Invalid readreceiptid userid bytes in db."))?;
|
||||
|
||||
let user_id = UserId::parse(user_id_str)
|
||||
.map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?;
|
||||
|
||||
let mut json = serde_json::from_slice::<CanonicalJsonObject>(v)
|
||||
.map_err(|_| Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json."))?;
|
||||
|
||||
.ready_take_while(move |((r, ..), _): &KeyVal<'_>| *r == room_id)
|
||||
.map(move |((_, count, user_id), mut json): KeyVal<'_>| {
|
||||
json.remove("room_id");
|
||||
|
||||
let event = Raw::from_json(serde_json::value::to_raw_value(&json)?);
|
||||
let event = serde_json::value::to_raw_value(&json)?;
|
||||
|
||||
Ok((user_id, count, event))
|
||||
Ok((user_id, count, Raw::from_json(event)))
|
||||
})
|
||||
.ignore_err()
|
||||
}
|
||||
|
|
|
@ -7,10 +7,10 @@ use futures::Stream;
|
|||
use ruma::{
|
||||
events::{
|
||||
receipt::{ReceiptEvent, ReceiptEventContent},
|
||||
SyncEphemeralRoomEvent,
|
||||
AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent,
|
||||
},
|
||||
serde::Raw,
|
||||
RoomId, UserId,
|
||||
OwnedUserId, RoomId, UserId,
|
||||
};
|
||||
|
||||
use self::data::{Data, ReceiptItem};
|
||||
|
@ -55,7 +55,7 @@ impl Service {
|
|||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn readreceipts_since<'a>(
|
||||
&'a self, room_id: &'a RoomId, since: u64,
|
||||
) -> impl Stream<Item = ReceiptItem> + Send + 'a {
|
||||
) -> impl Stream<Item = ReceiptItem<'_>> + Send + 'a {
|
||||
self.db.readreceipts_since(room_id, since)
|
||||
}
|
||||
|
||||
|
@ -83,7 +83,7 @@ impl Service {
|
|||
#[must_use]
|
||||
pub fn pack_receipts<I>(receipts: I) -> Raw<SyncEphemeralRoomEvent<ReceiptEventContent>>
|
||||
where
|
||||
I: Iterator<Item = ReceiptItem>,
|
||||
I: Iterator<Item = (OwnedUserId, u64, Raw<AnySyncEphemeralRoomEvent>)>,
|
||||
{
|
||||
let mut json = BTreeMap::new();
|
||||
for (_, _, value) in receipts {
|
||||
|
|
|
@ -376,7 +376,7 @@ impl Service {
|
|||
let mut read = BTreeMap::<OwnedUserId, ReceiptData>::new();
|
||||
while let Some((user_id, count, read_receipt)) = receipts.next().await {
|
||||
*max_edu_count = cmp::max(count, *max_edu_count);
|
||||
if !self.services.globals.user_is_local(&user_id) {
|
||||
if !self.services.globals.user_is_local(user_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -400,7 +400,7 @@ impl Service {
|
|||
let receipt = receipt
|
||||
.remove(&ReceiptType::Read)
|
||||
.expect("our read receipts always set this")
|
||||
.remove(&user_id)
|
||||
.remove(user_id)
|
||||
.expect("our read receipts always have the user here");
|
||||
|
||||
let receipt_data = ReceiptData {
|
||||
|
@ -408,7 +408,7 @@ impl Service {
|
|||
event_ids: vec![event_id.clone()],
|
||||
};
|
||||
|
||||
if read.insert(user_id, receipt_data).is_none() {
|
||||
if read.insert(user_id.to_owned(), receipt_data).is_none() {
|
||||
*num = num.saturating_add(1);
|
||||
if *num >= SELECT_RECEIPT_LIMIT {
|
||||
break;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue