From 57e0a5f65dce2be514d0bc45dbfb26b5c5b0cd00 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 3 Oct 2024 10:02:24 +0000 Subject: [PATCH] additional database stream deserializations for serde_json::from_ elim Signed-off-by: Jason Volk --- src/service/key_backups/mod.rs | 64 ++++++++------------------- src/service/pusher/mod.rs | 2 +- src/service/rooms/state_cache/data.rs | 62 +++++++++++++------------- src/service/users/mod.rs | 11 ++--- 4 files changed, 57 insertions(+), 82 deletions(-) diff --git a/src/service/key_backups/mod.rs b/src/service/key_backups/mod.rs index decf32f7..55263eeb 100644 --- a/src/service/key_backups/mod.rs +++ b/src/service/key_backups/mod.rs @@ -1,9 +1,9 @@ use std::{collections::BTreeMap, sync::Arc}; use conduit::{ - err, implement, utils, + err, implement, utils::stream::{ReadyExt, TryIgnore}, - Err, Error, Result, + Err, Result, }; use database::{Deserialized, Ignore, Interfix, Map}; use futures::StreamExt; @@ -110,57 +110,35 @@ pub async fn update_backup( #[implement(Service)] pub async fn get_latest_backup_version(&self, user_id: &UserId) -> Result { - let mut prefix = user_id.as_bytes().to_vec(); - prefix.push(0xFF); - let mut last_possible_key = prefix.clone(); - last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); + type Key<'a> = (&'a UserId, &'a str); + let last_possible_key = (user_id, u64::MAX); self.db .backupid_algorithm - .rev_raw_keys_from(&last_possible_key) + .rev_keys_from(&last_possible_key) .ignore_err() - .ready_take_while(move |key| key.starts_with(&prefix)) + .ready_take_while(|(user_id_, _): &Key<'_>| *user_id_ == user_id) + .map(|(_, version): Key<'_>| version.to_owned()) .next() .await .ok_or_else(|| err!(Request(NotFound("No backup versions found")))) - .and_then(|key| { - utils::string_from_bytes( - key.rsplit(|&b| b == 0xFF) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| Error::bad_database("backupid_algorithm key is invalid.")) - }) } #[implement(Service)] pub async fn get_latest_backup(&self, user_id: &UserId) -> Result<(String, Raw)> { - let mut prefix = user_id.as_bytes().to_vec(); - prefix.push(0xFF); - let mut last_possible_key = prefix.clone(); - last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); + type Key<'a> = (&'a UserId, &'a str); + type KeyVal<'a> = (Key<'a>, Raw); + let last_possible_key = (user_id, u64::MAX); self.db .backupid_algorithm - .rev_raw_stream_from(&last_possible_key) + .rev_stream_from(&last_possible_key) .ignore_err() - .ready_take_while(move |(key, _)| key.starts_with(&prefix)) + .ready_take_while(|((user_id_, _), _): &KeyVal<'_>| *user_id_ == user_id) + .map(|((_, version), algorithm): KeyVal<'_>| (version.to_owned(), algorithm)) .next() .await .ok_or_else(|| err!(Request(NotFound("No backup found")))) - .and_then(|(key, val)| { - let version = utils::string_from_bytes( - key.rsplit(|&b| b == 0xFF) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))?; - - let algorithm = serde_json::from_slice(val) - .map_err(|_| Error::bad_database("Algorithm in backupid_algorithm is invalid."))?; - - Ok((version, algorithm)) - }) } #[implement(Service)] @@ -223,7 +201,8 @@ pub async fn get_etag(&self, user_id: &UserId, version: &str) -> String { #[implement(Service)] pub async fn get_all(&self, user_id: &UserId, version: &str) -> BTreeMap { - type KeyVal<'a> = ((Ignore, Ignore, &'a RoomId, &'a str), &'a [u8]); + type Key<'a> = (Ignore, Ignore, &'a RoomId, &'a str); + type KeyVal<'a> = (Key<'a>, Raw); let mut rooms = BTreeMap::::new(); let default = || RoomKeyBackup { @@ -235,13 +214,12 @@ pub async fn get_all(&self, user_id: &UserId, version: &str) -> BTreeMap| { - let key_data = serde_json::from_slice(value).expect("Invalid KeyBackupData JSON"); + .ready_for_each(|((_, _, room_id, session_id), key_backup_data): KeyVal<'_>| { rooms .entry(room_id.into()) .or_insert_with(default) .sessions - .insert(session_id.into(), key_data); + .insert(session_id.into(), key_backup_data); }) .await; @@ -252,18 +230,14 @@ pub async fn get_all(&self, user_id: &UserId, version: &str) -> BTreeMap BTreeMap> { - type KeyVal<'a> = ((Ignore, Ignore, Ignore, &'a str), &'a [u8]); + type KeyVal<'a> = ((Ignore, Ignore, Ignore, &'a str), Raw); let prefix = (user_id, version, room_id, Interfix); self.db .backupkeyid_backup .stream_prefix(&prefix) .ignore_err() - .map(|((.., session_id), value): KeyVal<'_>| { - let session_id = session_id.to_owned(); - let key_backup_data = serde_json::from_slice(value).expect("Invalid KeyBackupData JSON"); - (session_id, key_backup_data) - }) + .map(|((.., session_id), key_backup_data): KeyVal<'_>| (session_id.to_owned(), key_backup_data)) .collect() .await } diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs index 8d8b553f..e7b1824a 100644 --- a/src/service/pusher/mod.rs +++ b/src/service/pusher/mod.rs @@ -99,7 +99,7 @@ impl Service { .senderkey_pusher .stream_prefix(&prefix) .ignore_err() - .map(|(_, val): (Ignore, &[u8])| serde_json::from_slice(val).expect("Invalid Pusher in db.")) + .map(|(_, pusher): (Ignore, Pusher)| pusher) .collect() .await } diff --git a/src/service/rooms/state_cache/data.rs b/src/service/rooms/state_cache/data.rs index f3ccaf10..6e01e49d 100644 --- a/src/service/rooms/state_cache/data.rs +++ b/src/service/rooms/state_cache/data.rs @@ -3,7 +3,7 @@ use std::{ sync::{Arc, RwLock}, }; -use conduit::{utils, utils::stream::TryIgnore, Error, Result}; +use conduit::{utils::stream::TryIgnore, Result}; use database::{Deserialized, Interfix, Map}; use futures::{Stream, StreamExt}; use ruma::{ @@ -135,20 +135,31 @@ impl Data { pub(super) fn rooms_invited<'a>( &'a self, user_id: &'a UserId, ) -> impl Stream + Send + 'a { + type Key<'a> = (&'a UserId, &'a RoomId); + type KeyVal<'a> = (Key<'a>, Raw>); + let prefix = (user_id, Interfix); self.userroomid_invitestate - .stream_raw_prefix(&prefix) + .stream_prefix(&prefix) .ignore_err() - .map(|(key, val)| { - let room_id = key.rsplit(|&b| b == 0xFF).next().unwrap(); - let room_id = utils::string_from_bytes(room_id).unwrap(); - let room_id = RoomId::parse(room_id).unwrap(); - let state = serde_json::from_slice(val) - .map_err(|_| Error::bad_database("Invalid state in userroomid_invitestate.")) - .unwrap(); + .map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state)) + .map(|(room_id, state)| Ok((room_id, state.deserialize_as()?))) + .ignore_err() + } - (room_id, state) - }) + /// Returns an iterator over all rooms a user left. + #[inline] + pub(super) fn rooms_left<'a>(&'a self, user_id: &'a UserId) -> impl Stream + Send + 'a { + type Key<'a> = (&'a UserId, &'a RoomId); + type KeyVal<'a> = (Key<'a>, Raw>>); + + let prefix = (user_id, Interfix); + self.userroomid_leftstate + .stream_prefix(&prefix) + .ignore_err() + .map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state)) + .map(|(room_id, state)| Ok((room_id, state.deserialize_as()?))) + .ignore_err() } #[tracing::instrument(skip(self), level = "debug")] @@ -156,7 +167,11 @@ impl Data { &self, user_id: &UserId, room_id: &RoomId, ) -> Result>> { let key = (user_id, room_id); - self.userroomid_invitestate.qry(&key).await.deserialized() + self.userroomid_invitestate + .qry(&key) + .await + .deserialized() + .and_then(|val: Raw>| val.deserialize_as().map_err(Into::into)) } #[tracing::instrument(skip(self), level = "debug")] @@ -164,25 +179,10 @@ impl Data { &self, user_id: &UserId, room_id: &RoomId, ) -> Result>> { let key = (user_id, room_id); - self.userroomid_leftstate.qry(&key).await.deserialized() - } - - /// Returns an iterator over all rooms a user left. - #[inline] - pub(super) fn rooms_left<'a>(&'a self, user_id: &'a UserId) -> impl Stream + Send + 'a { - let prefix = (user_id, Interfix); self.userroomid_leftstate - .stream_raw_prefix(&prefix) - .ignore_err() - .map(|(key, val)| { - let room_id = key.rsplit(|&b| b == 0xFF).next().unwrap(); - let room_id = utils::string_from_bytes(room_id).unwrap(); - let room_id = RoomId::parse(room_id).unwrap(); - let state = serde_json::from_slice(val) - .map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate.")) - .unwrap(); - - (room_id, state) - }) + .qry(&key) + .await + .deserialized() + .and_then(|val: Raw>| val.deserialize_as().map_err(Into::into)) } } diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index 3ab6b3c3..71a93666 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, mem, mem::size_of, sync::Arc}; use conduit::{ debug_warn, err, utils, - utils::{stream::TryIgnore, string::Unquoted, ReadyExt, TryReadyExt}, + utils::{stream::TryIgnore, string::Unquoted, ReadyExt}, warn, Err, Error, Result, Server, }; use database::{Deserialized, Ignore, Interfix, Map}; @@ -749,9 +749,9 @@ impl Service { let prefix = (user_id, device_id, Interfix); self.db .todeviceid_events - .stream_raw_prefix(&prefix) - .ready_and_then(|(_, val)| serde_json::from_slice(val).map_err(Into::into)) + .stream_prefix(&prefix) .ignore_err() + .map(|(_, val): (Ignore, Raw)| val) } pub async fn remove_to_device_events(&self, user_id: &UserId, device_id: &DeviceId, until: u64) { @@ -812,11 +812,12 @@ impl Service { } pub fn all_devices_metadata<'a>(&'a self, user_id: &'a UserId) -> impl Stream + Send + 'a { + let key = (user_id, Interfix); self.db .userdeviceid_metadata - .stream_raw_prefix(&(user_id, Interfix)) - .ready_and_then(|(_, val)| serde_json::from_slice::(val).map_err(Into::into)) + .stream_prefix(&key) .ignore_err() + .map(|(_, val): (Ignore, Device)| val) } /// Creates a new sync filter. Returns the filter id.