additional database stream deserializations for serde_json::from_ elim

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-10-03 10:02:24 +00:00 committed by strawberry
parent d526db681f
commit 57e0a5f65d
4 changed files with 57 additions and 82 deletions

View file

@ -1,9 +1,9 @@
use std::{collections::BTreeMap, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use conduit::{ use conduit::{
err, implement, utils, err, implement,
utils::stream::{ReadyExt, TryIgnore}, utils::stream::{ReadyExt, TryIgnore},
Err, Error, Result, Err, Result,
}; };
use database::{Deserialized, Ignore, Interfix, Map}; use database::{Deserialized, Ignore, Interfix, Map};
use futures::StreamExt; use futures::StreamExt;
@ -110,57 +110,35 @@ pub async fn update_backup(
#[implement(Service)] #[implement(Service)]
pub async fn get_latest_backup_version(&self, user_id: &UserId) -> Result<String> { pub async fn get_latest_backup_version(&self, user_id: &UserId) -> Result<String> {
let mut prefix = user_id.as_bytes().to_vec(); type Key<'a> = (&'a UserId, &'a str);
prefix.push(0xFF);
let mut last_possible_key = prefix.clone();
last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes());
let last_possible_key = (user_id, u64::MAX);
self.db self.db
.backupid_algorithm .backupid_algorithm
.rev_raw_keys_from(&last_possible_key) .rev_keys_from(&last_possible_key)
.ignore_err() .ignore_err()
.ready_take_while(move |key| key.starts_with(&prefix)) .ready_take_while(|(user_id_, _): &Key<'_>| *user_id_ == user_id)
.map(|(_, version): Key<'_>| version.to_owned())
.next() .next()
.await .await
.ok_or_else(|| err!(Request(NotFound("No backup versions found")))) .ok_or_else(|| err!(Request(NotFound("No backup versions found"))))
.and_then(|key| {
utils::string_from_bytes(
key.rsplit(|&b| b == 0xFF)
.next()
.expect("rsplit always returns an element"),
)
.map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))
})
} }
#[implement(Service)] #[implement(Service)]
pub async fn get_latest_backup(&self, user_id: &UserId) -> Result<(String, Raw<BackupAlgorithm>)> { pub async fn get_latest_backup(&self, user_id: &UserId) -> Result<(String, Raw<BackupAlgorithm>)> {
let mut prefix = user_id.as_bytes().to_vec(); type Key<'a> = (&'a UserId, &'a str);
prefix.push(0xFF); type KeyVal<'a> = (Key<'a>, Raw<BackupAlgorithm>);
let mut last_possible_key = prefix.clone();
last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes());
let last_possible_key = (user_id, u64::MAX);
self.db self.db
.backupid_algorithm .backupid_algorithm
.rev_raw_stream_from(&last_possible_key) .rev_stream_from(&last_possible_key)
.ignore_err() .ignore_err()
.ready_take_while(move |(key, _)| key.starts_with(&prefix)) .ready_take_while(|((user_id_, _), _): &KeyVal<'_>| *user_id_ == user_id)
.map(|((_, version), algorithm): KeyVal<'_>| (version.to_owned(), algorithm))
.next() .next()
.await .await
.ok_or_else(|| err!(Request(NotFound("No backup found")))) .ok_or_else(|| err!(Request(NotFound("No backup found"))))
.and_then(|(key, val)| {
let version = utils::string_from_bytes(
key.rsplit(|&b| b == 0xFF)
.next()
.expect("rsplit always returns an element"),
)
.map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))?;
let algorithm = serde_json::from_slice(val)
.map_err(|_| Error::bad_database("Algorithm in backupid_algorithm is invalid."))?;
Ok((version, algorithm))
})
} }
#[implement(Service)] #[implement(Service)]
@ -223,7 +201,8 @@ pub async fn get_etag(&self, user_id: &UserId, version: &str) -> String {
#[implement(Service)] #[implement(Service)]
pub async fn get_all(&self, user_id: &UserId, version: &str) -> BTreeMap<OwnedRoomId, RoomKeyBackup> { pub async fn get_all(&self, user_id: &UserId, version: &str) -> BTreeMap<OwnedRoomId, RoomKeyBackup> {
type KeyVal<'a> = ((Ignore, Ignore, &'a RoomId, &'a str), &'a [u8]); type Key<'a> = (Ignore, Ignore, &'a RoomId, &'a str);
type KeyVal<'a> = (Key<'a>, Raw<KeyBackupData>);
let mut rooms = BTreeMap::<OwnedRoomId, RoomKeyBackup>::new(); let mut rooms = BTreeMap::<OwnedRoomId, RoomKeyBackup>::new();
let default = || RoomKeyBackup { let default = || RoomKeyBackup {
@ -235,13 +214,12 @@ pub async fn get_all(&self, user_id: &UserId, version: &str) -> BTreeMap<OwnedRo
.backupkeyid_backup .backupkeyid_backup
.stream_prefix(&prefix) .stream_prefix(&prefix)
.ignore_err() .ignore_err()
.ready_for_each(|((_, _, room_id, session_id), value): KeyVal<'_>| { .ready_for_each(|((_, _, room_id, session_id), key_backup_data): KeyVal<'_>| {
let key_data = serde_json::from_slice(value).expect("Invalid KeyBackupData JSON");
rooms rooms
.entry(room_id.into()) .entry(room_id.into())
.or_insert_with(default) .or_insert_with(default)
.sessions .sessions
.insert(session_id.into(), key_data); .insert(session_id.into(), key_backup_data);
}) })
.await; .await;
@ -252,18 +230,14 @@ pub async fn get_all(&self, user_id: &UserId, version: &str) -> BTreeMap<OwnedRo
pub async fn get_room( pub async fn get_room(
&self, user_id: &UserId, version: &str, room_id: &RoomId, &self, user_id: &UserId, version: &str, room_id: &RoomId,
) -> BTreeMap<String, Raw<KeyBackupData>> { ) -> BTreeMap<String, Raw<KeyBackupData>> {
type KeyVal<'a> = ((Ignore, Ignore, Ignore, &'a str), &'a [u8]); type KeyVal<'a> = ((Ignore, Ignore, Ignore, &'a str), Raw<KeyBackupData>);
let prefix = (user_id, version, room_id, Interfix); let prefix = (user_id, version, room_id, Interfix);
self.db self.db
.backupkeyid_backup .backupkeyid_backup
.stream_prefix(&prefix) .stream_prefix(&prefix)
.ignore_err() .ignore_err()
.map(|((.., session_id), value): KeyVal<'_>| { .map(|((.., session_id), key_backup_data): KeyVal<'_>| (session_id.to_owned(), key_backup_data))
let session_id = session_id.to_owned();
let key_backup_data = serde_json::from_slice(value).expect("Invalid KeyBackupData JSON");
(session_id, key_backup_data)
})
.collect() .collect()
.await .await
} }

View file

@ -99,7 +99,7 @@ impl Service {
.senderkey_pusher .senderkey_pusher
.stream_prefix(&prefix) .stream_prefix(&prefix)
.ignore_err() .ignore_err()
.map(|(_, val): (Ignore, &[u8])| serde_json::from_slice(val).expect("Invalid Pusher in db.")) .map(|(_, pusher): (Ignore, Pusher)| pusher)
.collect() .collect()
.await .await
} }

View file

@ -3,7 +3,7 @@ use std::{
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}; };
use conduit::{utils, utils::stream::TryIgnore, Error, Result}; use conduit::{utils::stream::TryIgnore, Result};
use database::{Deserialized, Interfix, Map}; use database::{Deserialized, Interfix, Map};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use ruma::{ use ruma::{
@ -135,20 +135,31 @@ impl Data {
pub(super) fn rooms_invited<'a>( pub(super) fn rooms_invited<'a>(
&'a self, user_id: &'a UserId, &'a self, user_id: &'a UserId,
) -> impl Stream<Item = StrippedStateEventItem> + Send + 'a { ) -> impl Stream<Item = StrippedStateEventItem> + Send + 'a {
type Key<'a> = (&'a UserId, &'a RoomId);
type KeyVal<'a> = (Key<'a>, Raw<Vec<AnyStrippedStateEvent>>);
let prefix = (user_id, Interfix); let prefix = (user_id, Interfix);
self.userroomid_invitestate self.userroomid_invitestate
.stream_raw_prefix(&prefix) .stream_prefix(&prefix)
.ignore_err() .ignore_err()
.map(|(key, val)| { .map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state))
let room_id = key.rsplit(|&b| b == 0xFF).next().unwrap(); .map(|(room_id, state)| Ok((room_id, state.deserialize_as()?)))
let room_id = utils::string_from_bytes(room_id).unwrap(); .ignore_err()
let room_id = RoomId::parse(room_id).unwrap(); }
let state = serde_json::from_slice(val)
.map_err(|_| Error::bad_database("Invalid state in userroomid_invitestate."))
.unwrap();
(room_id, state) /// Returns an iterator over all rooms a user left.
}) #[inline]
pub(super) fn rooms_left<'a>(&'a self, user_id: &'a UserId) -> impl Stream<Item = SyncStateEventItem> + Send + 'a {
type Key<'a> = (&'a UserId, &'a RoomId);
type KeyVal<'a> = (Key<'a>, Raw<Vec<Raw<AnySyncStateEvent>>>);
let prefix = (user_id, Interfix);
self.userroomid_leftstate
.stream_prefix(&prefix)
.ignore_err()
.map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state))
.map(|(room_id, state)| Ok((room_id, state.deserialize_as()?)))
.ignore_err()
} }
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug")]
@ -156,7 +167,11 @@ impl Data {
&self, user_id: &UserId, room_id: &RoomId, &self, user_id: &UserId, room_id: &RoomId,
) -> Result<Vec<Raw<AnyStrippedStateEvent>>> { ) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
let key = (user_id, room_id); let key = (user_id, room_id);
self.userroomid_invitestate.qry(&key).await.deserialized() self.userroomid_invitestate
.qry(&key)
.await
.deserialized()
.and_then(|val: Raw<Vec<AnyStrippedStateEvent>>| val.deserialize_as().map_err(Into::into))
} }
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug")]
@ -164,25 +179,10 @@ impl Data {
&self, user_id: &UserId, room_id: &RoomId, &self, user_id: &UserId, room_id: &RoomId,
) -> Result<Vec<Raw<AnyStrippedStateEvent>>> { ) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
let key = (user_id, room_id); let key = (user_id, room_id);
self.userroomid_leftstate.qry(&key).await.deserialized()
}
/// Returns an iterator over all rooms a user left.
#[inline]
pub(super) fn rooms_left<'a>(&'a self, user_id: &'a UserId) -> impl Stream<Item = SyncStateEventItem> + Send + 'a {
let prefix = (user_id, Interfix);
self.userroomid_leftstate self.userroomid_leftstate
.stream_raw_prefix(&prefix) .qry(&key)
.ignore_err() .await
.map(|(key, val)| { .deserialized()
let room_id = key.rsplit(|&b| b == 0xFF).next().unwrap(); .and_then(|val: Raw<Vec<AnyStrippedStateEvent>>| val.deserialize_as().map_err(Into::into))
let room_id = utils::string_from_bytes(room_id).unwrap();
let room_id = RoomId::parse(room_id).unwrap();
let state = serde_json::from_slice(val)
.map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))
.unwrap();
(room_id, state)
})
} }
} }

View file

@ -2,7 +2,7 @@ use std::{collections::BTreeMap, mem, mem::size_of, sync::Arc};
use conduit::{ use conduit::{
debug_warn, err, utils, debug_warn, err, utils,
utils::{stream::TryIgnore, string::Unquoted, ReadyExt, TryReadyExt}, utils::{stream::TryIgnore, string::Unquoted, ReadyExt},
warn, Err, Error, Result, Server, warn, Err, Error, Result, Server,
}; };
use database::{Deserialized, Ignore, Interfix, Map}; use database::{Deserialized, Ignore, Interfix, Map};
@ -749,9 +749,9 @@ impl Service {
let prefix = (user_id, device_id, Interfix); let prefix = (user_id, device_id, Interfix);
self.db self.db
.todeviceid_events .todeviceid_events
.stream_raw_prefix(&prefix) .stream_prefix(&prefix)
.ready_and_then(|(_, val)| serde_json::from_slice(val).map_err(Into::into))
.ignore_err() .ignore_err()
.map(|(_, val): (Ignore, Raw<AnyToDeviceEvent>)| val)
} }
pub async fn remove_to_device_events(&self, user_id: &UserId, device_id: &DeviceId, until: u64) { pub async fn remove_to_device_events(&self, user_id: &UserId, device_id: &DeviceId, until: u64) {
@ -812,11 +812,12 @@ impl Service {
} }
pub fn all_devices_metadata<'a>(&'a self, user_id: &'a UserId) -> impl Stream<Item = Device> + Send + 'a { pub fn all_devices_metadata<'a>(&'a self, user_id: &'a UserId) -> impl Stream<Item = Device> + Send + 'a {
let key = (user_id, Interfix);
self.db self.db
.userdeviceid_metadata .userdeviceid_metadata
.stream_raw_prefix(&(user_id, Interfix)) .stream_prefix(&key)
.ready_and_then(|(_, val)| serde_json::from_slice::<Device>(val).map_err(Into::into))
.ignore_err() .ignore_err()
.map(|(_, val): (Ignore, Device)| val)
} }
/// Creates a new sync filter. Returns the filter id. /// Creates a new sync filter. Returns the filter id.