From 2ed0c267eb698c33befc4daa482811f0ae45707a Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 7 Oct 2024 17:54:27 +0000 Subject: [PATCH] Refactor for structured insertions Signed-off-by: Jason Volk --- Cargo.lock | 26 +-- Cargo.toml | 2 +- src/api/client/sync.rs | 5 +- src/api/server/invite.rs | 6 - src/service/account_data/mod.rs | 40 +--- src/service/globals/data.rs | 32 ++- src/service/globals/migrations.rs | 17 +- src/service/key_backups/mod.rs | 62 ++---- src/service/media/data.rs | 103 +++------ src/service/media/migrations.rs | 2 +- src/service/presence/data.rs | 18 +- src/service/presence/presence.rs | 4 - src/service/pusher/mod.rs | 16 +- src/service/rooms/directory/mod.rs | 4 +- src/service/rooms/lazy_loading/mod.rs | 12 +- src/service/rooms/metadata/mod.rs | 8 +- src/service/rooms/outlier/mod.rs | 7 +- src/service/rooms/pdu_metadata/data.rs | 16 +- src/service/rooms/read_receipt/data.rs | 40 +--- src/service/rooms/short/mod.rs | 47 ++-- src/service/rooms/state/data.rs | 13 +- src/service/rooms/state_accessor/mod.rs | 6 + src/service/rooms/state_cache/data.rs | 67 +++--- src/service/rooms/state_cache/mod.rs | 89 ++++---- src/service/rooms/timeline/data.rs | 18 +- src/service/rooms/user/data.rs | 26 +-- src/service/sending/data.rs | 3 +- src/service/sending/dest.rs | 2 +- src/service/uiaa/mod.rs | 17 +- src/service/updates/mod.rs | 5 +- src/service/users/mod.rs | 272 +++++++++--------------- 31 files changed, 364 insertions(+), 621 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cae6994c..db1394ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2976,7 +2976,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.10.1" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "assign", "js_int", @@ -2998,7 +2998,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.10.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "js_int", "ruma-common", @@ -3010,7 +3010,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.18.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "as_variant", "assign", @@ -3033,7 +3033,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.13.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "as_variant", "base64 0.22.1", @@ -3063,7 +3063,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.28.1" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "as_variant", "indexmap 2.6.0", @@ -3087,7 +3087,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.9.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "bytes", "http", @@ -3105,7 +3105,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.9.5" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "js_int", "thiserror", @@ -3114,7 +3114,7 @@ dependencies = [ [[package]] name = "ruma-identity-service-api" version = "0.9.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "js_int", "ruma-common", @@ -3124,7 +3124,7 @@ dependencies = [ [[package]] name = "ruma-macros" version = "0.13.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "cfg-if", "once_cell", @@ -3140,7 +3140,7 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.9.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "js_int", "ruma-common", @@ -3152,7 +3152,7 @@ dependencies = [ [[package]] name = "ruma-server-util" version = "0.3.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "headers", "http", @@ -3165,7 +3165,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.15.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "base64 0.22.1", "ed25519-dalek", @@ -3181,7 +3181,7 @@ dependencies = [ [[package]] name = "ruma-state-res" version = "0.11.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=f485a0265c67a59df75fc6686787538172fa4cac#f485a0265c67a59df75fc6686787538172fa4cac" +source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" dependencies = [ "futures-util", "itertools 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index 25d1001d..0a98befd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -315,7 +315,7 @@ version = "0.1.2" [workspace.dependencies.ruma] git = "https://github.com/girlbossceo/ruwuma" #branch = "conduwuit-changes" -rev = "f485a0265c67a59df75fc6686787538172fa4cac" +rev = "3109496a1f91357c89cbb57cf86f179e2cb013e7" features = [ "compat", "rand", diff --git a/src/api/client/sync.rs b/src/api/client/sync.rs index 8c4c6a44..65af775d 100644 --- a/src/api/client/sync.rs +++ b/src/api/client/sync.rs @@ -7,7 +7,7 @@ use std::{ use axum::extract::State; use conduit::{ debug, err, error, is_equal_to, - result::{FlatOk, IntoIsOk}, + result::FlatOk, utils::{ math::{ruma_from_u64, ruma_from_usize, usize_from_ruma, usize_from_u64_truncated}, BoolExt, IterStream, ReadyExt, TryFutureExtExt, @@ -1136,8 +1136,7 @@ async fn share_encrypted_room( services .rooms .state_accessor - .room_state_get(other_room_id, &StateEventType::RoomEncryption, "") - .map(Result::into_is_ok) + .is_encrypted_room(other_room_id) }) .await } diff --git a/src/api/server/invite.rs b/src/api/server/invite.rs index 447e54be..f02655e6 100644 --- a/src/api/server/invite.rs +++ b/src/api/server/invite.rs @@ -65,12 +65,6 @@ pub(crate) async fn create_invite_route( return Err!(Request(Forbidden("Server is banned on this homeserver."))); } - if let Some(via) = &body.via { - if via.is_empty() { - return Err!(Request(InvalidParam("via field must not be empty."))); - } - } - let mut signed_event = utils::to_canonical_object(&body.event) .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invite event is invalid."))?; diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index 8065ac55..ac3f5f83 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -5,7 +5,7 @@ use conduit::{ utils::{stream::TryIgnore, ReadyExt}, Err, Error, Result, }; -use database::{Deserialized, Handle, Map}; +use database::{Deserialized, Handle, Json, Map}; use futures::{StreamExt, TryFutureExt}; use ruma::{ events::{ @@ -56,41 +56,19 @@ impl crate::Service for Service { pub async fn update( &self, room_id: Option<&RoomId>, user_id: &UserId, event_type: RoomAccountDataEventType, data: &serde_json::Value, ) -> Result<()> { - let event_type = event_type.to_string(); - let count = self.services.globals.next_count()?; - - let mut prefix = room_id - .map(ToString::to_string) - .unwrap_or_default() - .as_bytes() - .to_vec(); - prefix.push(0xFF); - prefix.extend_from_slice(user_id.as_bytes()); - prefix.push(0xFF); - - let mut roomuserdataid = prefix.clone(); - roomuserdataid.extend_from_slice(&count.to_be_bytes()); - roomuserdataid.push(0xFF); - roomuserdataid.extend_from_slice(event_type.as_bytes()); - - let mut key = prefix; - key.extend_from_slice(event_type.as_bytes()); - if data.get("type").is_none() || data.get("content").is_none() { return Err!(Request(InvalidParam("Account data doesn't have all required fields."))); } - self.db.roomuserdataid_accountdata.insert( - &roomuserdataid, - &serde_json::to_vec(&data).expect("to_vec always works on json values"), - ); - - let prev_key = (room_id, user_id, &event_type); - let prev = self.db.roomusertype_roomuserdataid.qry(&prev_key).await; - + let count = self.services.globals.next_count().unwrap(); + let roomuserdataid = (room_id, user_id, count, &event_type); self.db - .roomusertype_roomuserdataid - .insert(&key, &roomuserdataid); + .roomuserdataid_accountdata + .put(roomuserdataid, Json(data)); + + let key = (room_id, user_id, &event_type); + let prev = self.db.roomusertype_roomuserdataid.qry(&key).await; + self.db.roomusertype_roomuserdataid.put(key, roomuserdataid); // Remove old entry if let Ok(prev) = prev { diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index 57a295d9..3638cb56 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -4,7 +4,7 @@ use std::{ }; use conduit::{trace, utils, utils::rand, Error, Result, Server}; -use database::{Database, Deserialized, Map}; +use database::{Database, Deserialized, Json, Map}; use futures::{pin_mut, stream::FuturesUnordered, FutureExt, StreamExt}; use ruma::{ api::federation::discovery::{ServerSigningKeys, VerifyKey}, @@ -83,7 +83,7 @@ impl Data { .checked_add(1) .expect("counter must not overflow u64"); - self.global.insert(COUNTER, &counter.to_be_bytes()); + self.global.insert(COUNTER, counter.to_be_bytes()); Ok(*counter) } @@ -259,29 +259,21 @@ impl Data { pub async fn add_signing_key( &self, origin: &ServerName, new_keys: ServerSigningKeys, ) -> BTreeMap { - // Not atomic, but this is not critical - let signingkeys = self.server_signingkeys.get(origin).await; - - let mut keys = signingkeys - .and_then(|keys| serde_json::from_slice(&keys).map_err(Into::into)) + // (timo) Not atomic, but this is not critical + let mut keys: ServerSigningKeys = self + .server_signingkeys + .get(origin) + .await + .deserialized() .unwrap_or_else(|_| { // Just insert "now", it doesn't matter ServerSigningKeys::new(origin.to_owned(), MilliSecondsSinceUnixEpoch::now()) }); - let ServerSigningKeys { - verify_keys, - old_verify_keys, - .. - } = new_keys; + keys.verify_keys.extend(new_keys.verify_keys); + keys.old_verify_keys.extend(new_keys.old_verify_keys); - keys.verify_keys.extend(verify_keys); - keys.old_verify_keys.extend(old_verify_keys); - - self.server_signingkeys.insert( - origin.as_bytes(), - &serde_json::to_vec(&keys).expect("serversigningkeys can be serialized"), - ); + self.server_signingkeys.raw_put(origin, Json(&keys)); let mut tree = keys.verify_keys; tree.extend( @@ -324,7 +316,7 @@ impl Data { #[inline] pub fn bump_database_version(&self, new_version: u64) -> Result<()> { - self.global.insert(b"version", &new_version.to_be_bytes()); + self.global.raw_put(b"version", new_version); Ok(()) } diff --git a/src/service/globals/migrations.rs b/src/service/globals/migrations.rs index 334e71c6..c953e7b1 100644 --- a/src/service/globals/migrations.rs +++ b/src/service/globals/migrations.rs @@ -2,7 +2,7 @@ use conduit::{ debug_info, debug_warn, error, info, result::NotFound, utils::{stream::TryIgnore, IterStream, ReadyExt}, - warn, Err, Error, Result, + warn, Err, Result, }; use futures::{FutureExt, StreamExt}; use itertools::Itertools; @@ -37,10 +37,9 @@ pub(crate) async fn migrations(services: &Services) -> Result<()> { // requires recreating the database from scratch. if users_count > 0 { let conduit_user = &services.globals.server_user; - if !services.users.exists(conduit_user).await { - error!("The {} server user does not exist, and the database is not new.", conduit_user); - return Err(Error::bad_database( + error!("The {conduit_user} server user does not exist, and the database is not new."); + return Err!(Database( "Cannot reuse an existing database after changing the server name, please delete the old one first.", )); } @@ -62,9 +61,9 @@ async fn fresh(services: &Services) -> Result<()> { .db .bump_database_version(DATABASE_VERSION)?; - db["global"].insert(b"feat_sha256_media", &[]); - db["global"].insert(b"fix_bad_double_separator_in_state_cache", &[]); - db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", &[]); + db["global"].insert(b"feat_sha256_media", []); + db["global"].insert(b"fix_bad_double_separator_in_state_cache", []); + db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []); // Create the admin room and server user on first run crate::admin::create_admin_room(services).await?; @@ -359,7 +358,7 @@ async fn fix_bad_double_separator_in_state_cache(services: &Services) -> Result< .await; db.db.cleanup()?; - db["global"].insert(b"fix_bad_double_separator_in_state_cache", &[]); + db["global"].insert(b"fix_bad_double_separator_in_state_cache", []); info!("Finished fixing"); Ok(()) @@ -440,7 +439,7 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) } db.db.cleanup()?; - db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", &[]); + db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []); info!("Finished fixing"); Ok(()) diff --git a/src/service/key_backups/mod.rs b/src/service/key_backups/mod.rs index 4c303757..bae6f214 100644 --- a/src/service/key_backups/mod.rs +++ b/src/service/key_backups/mod.rs @@ -5,7 +5,7 @@ use conduit::{ utils::stream::{ReadyExt, TryIgnore}, Err, Result, }; -use database::{Deserialized, Ignore, Interfix, Map}; +use database::{Deserialized, Ignore, Interfix, Json, Map}; use futures::StreamExt; use ruma::{ api::client::backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup}, @@ -50,31 +50,21 @@ impl crate::Service for Service { #[implement(Service)] pub fn create_backup(&self, user_id: &UserId, backup_metadata: &Raw) -> Result { let version = self.services.globals.next_count()?.to_string(); + let count = self.services.globals.next_count()?; - let mut key = user_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(version.as_bytes()); + let key = (user_id, &version); + self.db.backupid_algorithm.put(key, Json(backup_metadata)); - self.db.backupid_algorithm.insert( - &key, - &serde_json::to_vec(backup_metadata).expect("BackupAlgorithm::to_vec always works"), - ); - - self.db - .backupid_etag - .insert(&key, &self.services.globals.next_count()?.to_be_bytes()); + self.db.backupid_etag.put(key, count); Ok(version) } #[implement(Service)] pub async fn delete_backup(&self, user_id: &UserId, version: &str) { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(version.as_bytes()); - - self.db.backupid_algorithm.remove(&key); - self.db.backupid_etag.remove(&key); + let key = (user_id, version); + self.db.backupid_algorithm.del(key); + self.db.backupid_etag.del(key); let key = (user_id, version, Interfix); self.db @@ -86,26 +76,21 @@ pub async fn delete_backup(&self, user_id: &UserId, version: &str) { } #[implement(Service)] -pub async fn update_backup( - &self, user_id: &UserId, version: &str, backup_metadata: &Raw, -) -> Result { +pub async fn update_backup<'a>( + &self, user_id: &UserId, version: &'a str, backup_metadata: &Raw, +) -> Result<&'a str> { let key = (user_id, version); if self.db.backupid_algorithm.qry(&key).await.is_err() { return Err!(Request(NotFound("Tried to update nonexistent backup."))); } - let mut key = user_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(version.as_bytes()); - + let count = self.services.globals.next_count().unwrap(); + self.db.backupid_etag.put(key, count); self.db .backupid_algorithm - .insert(&key, backup_metadata.json().get().as_bytes()); - self.db - .backupid_etag - .insert(&key, &self.services.globals.next_count()?.to_be_bytes()); + .put_raw(key, backup_metadata.json().get()); - Ok(version.to_owned()) + Ok(version) } #[implement(Service)] @@ -156,22 +141,13 @@ pub async fn add_key( return Err!(Request(NotFound("Tried to update nonexistent backup."))); } - let mut key = user_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(version.as_bytes()); - - self.db - .backupid_etag - .insert(&key, &self.services.globals.next_count()?.to_be_bytes()); - - key.push(0xFF); - key.extend_from_slice(room_id.as_bytes()); - key.push(0xFF); - key.extend_from_slice(session_id.as_bytes()); + let count = self.services.globals.next_count().unwrap(); + self.db.backupid_etag.put(key, count); + let key = (user_id, version, room_id, session_id); self.db .backupkeyid_backup - .insert(&key, key_data.json().get().as_bytes()); + .put_raw(key, key_data.json().get()); Ok(()) } diff --git a/src/service/media/data.rs b/src/service/media/data.rs index b2271883..9afbd708 100644 --- a/src/service/media/data.rs +++ b/src/service/media/data.rs @@ -1,13 +1,13 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use conduit::{ - debug, debug_info, trace, + debug, debug_info, err, utils::{str_from_bytes, stream::TryIgnore, string_from_bytes, ReadyExt}, Err, Error, Result, }; -use database::{Database, Map}; +use database::{Database, Interfix, Map}; use futures::StreamExt; -use ruma::{api::client::error::ErrorKind, http_headers::ContentDisposition, Mxc, OwnedMxcUri, UserId}; +use ruma::{http_headers::ContentDisposition, Mxc, OwnedMxcUri, UserId}; use super::{preview::UrlPreviewData, thumbnail::Dim}; @@ -37,39 +37,13 @@ impl Data { &self, mxc: &Mxc<'_>, user: Option<&UserId>, dim: &Dim, content_disposition: Option<&ContentDisposition>, content_type: Option<&str>, ) -> Result> { - let mut key: Vec = Vec::new(); - key.extend_from_slice(b"mxc://"); - key.extend_from_slice(mxc.server_name.as_bytes()); - key.extend_from_slice(b"/"); - key.extend_from_slice(mxc.media_id.as_bytes()); - key.push(0xFF); - key.extend_from_slice(&dim.width.to_be_bytes()); - key.extend_from_slice(&dim.height.to_be_bytes()); - key.push(0xFF); - key.extend_from_slice( - content_disposition - .map(ToString::to_string) - .unwrap_or_default() - .as_bytes(), - ); - key.push(0xFF); - key.extend_from_slice( - content_type - .as_ref() - .map(|c| c.as_bytes()) - .unwrap_or_default(), - ); - - self.mediaid_file.insert(&key, &[]); - + let dim: &[u32] = &[dim.width, dim.height]; + let key = (mxc, dim, content_disposition, content_type); + let key = database::serialize_to_vec(key)?; + self.mediaid_file.insert(&key, []); if let Some(user) = user { - let mut key: Vec = Vec::new(); - key.extend_from_slice(b"mxc://"); - key.extend_from_slice(mxc.server_name.as_bytes()); - key.extend_from_slice(b"/"); - key.extend_from_slice(mxc.media_id.as_bytes()); - let user = user.as_bytes().to_vec(); - self.mediaid_user.insert(&key, &user); + let key = (mxc, user); + self.mediaid_user.put_raw(key, user); } Ok(key) @@ -78,33 +52,23 @@ impl Data { pub(super) async fn delete_file_mxc(&self, mxc: &Mxc<'_>) { debug!("MXC URI: {mxc}"); - let mut prefix: Vec = Vec::new(); - prefix.extend_from_slice(b"mxc://"); - prefix.extend_from_slice(mxc.server_name.as_bytes()); - prefix.extend_from_slice(b"/"); - prefix.extend_from_slice(mxc.media_id.as_bytes()); - prefix.push(0xFF); - - trace!("MXC db prefix: {prefix:?}"); + let prefix = (mxc, Interfix); self.mediaid_file - .raw_keys_prefix(&prefix) + .keys_prefix_raw(&prefix) .ignore_err() - .ready_for_each(|key| { - debug!("Deleting key: {:?}", key); - self.mediaid_file.remove(key); - }) + .ready_for_each(|key| self.mediaid_file.remove(key)) .await; self.mediaid_user - .raw_stream_prefix(&prefix) + .stream_prefix_raw(&prefix) .ignore_err() .ready_for_each(|(key, val)| { - if key.starts_with(&prefix) { - let user = str_from_bytes(val).unwrap_or_default(); - debug_info!("Deleting key {key:?} which was uploaded by user {user}"); + debug_assert!(key.starts_with(mxc.to_string().as_bytes()), "key should start with the mxc"); - self.mediaid_user.remove(key); - } + let user = str_from_bytes(val).unwrap_or_default(); + debug_info!("Deleting key {key:?} which was uploaded by user {user}"); + + self.mediaid_user.remove(key); }) .await; } @@ -113,16 +77,10 @@ impl Data { pub(super) async fn search_mxc_metadata_prefix(&self, mxc: &Mxc<'_>) -> Result>> { debug!("MXC URI: {mxc}"); - let mut prefix: Vec = Vec::new(); - prefix.extend_from_slice(b"mxc://"); - prefix.extend_from_slice(mxc.server_name.as_bytes()); - prefix.extend_from_slice(b"/"); - prefix.extend_from_slice(mxc.media_id.as_bytes()); - prefix.push(0xFF); - + let prefix = (mxc, Interfix); let keys: Vec> = self .mediaid_file - .raw_keys_prefix(&prefix) + .keys_prefix_raw(&prefix) .ignore_err() .map(<[u8]>::to_vec) .collect() @@ -138,24 +96,17 @@ impl Data { } pub(super) async fn search_file_metadata(&self, mxc: &Mxc<'_>, dim: &Dim) -> Result { - let mut prefix: Vec = Vec::new(); - prefix.extend_from_slice(b"mxc://"); - prefix.extend_from_slice(mxc.server_name.as_bytes()); - prefix.extend_from_slice(b"/"); - prefix.extend_from_slice(mxc.media_id.as_bytes()); - prefix.push(0xFF); - prefix.extend_from_slice(&dim.width.to_be_bytes()); - prefix.extend_from_slice(&dim.height.to_be_bytes()); - prefix.push(0xFF); + let dim: &[u32] = &[dim.width, dim.height]; + let prefix = (mxc, dim, Interfix); let key = self .mediaid_file - .raw_keys_prefix(&prefix) + .keys_prefix_raw(&prefix) .ignore_err() .map(ToOwned::to_owned) .next() .await - .ok_or_else(|| Error::BadRequest(ErrorKind::NotFound, "Media not found"))?; + .ok_or_else(|| err!(Request(NotFound("Media not found"))))?; let mut parts = key.rsplit(|&b| b == 0xFF); @@ -215,9 +166,7 @@ impl Data { Ok(()) } - pub(super) fn set_url_preview( - &self, url: &str, data: &UrlPreviewData, timestamp: std::time::Duration, - ) -> Result<()> { + pub(super) fn set_url_preview(&self, url: &str, data: &UrlPreviewData, timestamp: Duration) -> Result<()> { let mut value = Vec::::new(); value.extend_from_slice(×tamp.as_secs().to_be_bytes()); value.push(0xFF); diff --git a/src/service/media/migrations.rs b/src/service/media/migrations.rs index 2d1b39f9..0e358d44 100644 --- a/src/service/media/migrations.rs +++ b/src/service/media/migrations.rs @@ -54,7 +54,7 @@ pub(crate) async fn migrate_sha256_media(services: &Services) -> Result<()> { services.globals.db.bump_database_version(13)?; } - db["global"].insert(b"feat_sha256_media", &[]); + db["global"].insert(b"feat_sha256_media", []); info!("Finished applying sha256_media"); Ok(()) } diff --git a/src/service/presence/data.rs b/src/service/presence/data.rs index 9c9d0ae3..8522746f 100644 --- a/src/service/presence/data.rs +++ b/src/service/presence/data.rs @@ -5,7 +5,7 @@ use conduit::{ utils::{stream::TryIgnore, ReadyExt}, Result, }; -use database::{Deserialized, Map}; +use database::{Deserialized, Json, Map}; use futures::Stream; use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId}; @@ -107,14 +107,12 @@ impl Data { last_active_ts, status_msg, ); + let count = self.services.globals.next_count()?; let key = presenceid_key(count, user_id); - self.presenceid_presence - .insert(&key, &presence.to_json_bytes()?); - - self.userid_presenceid - .insert(user_id.as_bytes(), &count.to_be_bytes()); + self.presenceid_presence.raw_put(key, Json(presence)); + self.userid_presenceid.raw_put(user_id, count); if let Ok((last_count, _)) = last_presence { let key = presenceid_key(last_count, user_id); @@ -136,7 +134,7 @@ impl Data { let key = presenceid_key(count, user_id); self.presenceid_presence.remove(&key); - self.userid_presenceid.remove(user_id.as_bytes()); + self.userid_presenceid.remove(user_id); } pub fn presence_since(&self, since: u64) -> impl Stream)> + Send + '_ { @@ -152,7 +150,11 @@ impl Data { #[inline] fn presenceid_key(count: u64, user_id: &UserId) -> Vec { - [count.to_be_bytes().to_vec(), user_id.as_bytes().to_vec()].concat() + let cap = size_of::().saturating_add(user_id.as_bytes().len()); + let mut key = Vec::with_capacity(cap); + key.extend_from_slice(&count.to_be_bytes()); + key.extend_from_slice(user_id.as_bytes()); + key } #[inline] diff --git a/src/service/presence/presence.rs b/src/service/presence/presence.rs index 0d5c226b..c4372003 100644 --- a/src/service/presence/presence.rs +++ b/src/service/presence/presence.rs @@ -35,10 +35,6 @@ impl Presence { serde_json::from_slice(bytes).map_err(|_| Error::bad_database("Invalid presence data in database")) } - pub(super) fn to_json_bytes(&self) -> Result> { - serde_json::to_vec(self).map_err(|_| Error::bad_database("Could not serialize Presence to JSON")) - } - /// Creates a PresenceEvent from available data. pub(super) async fn to_presence_event(&self, user_id: &UserId, users: &users::Service) -> PresenceEvent { let now = utils::millis_since_unix_epoch(); diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs index e7b1824a..af15e332 100644 --- a/src/service/pusher/mod.rs +++ b/src/service/pusher/mod.rs @@ -6,7 +6,7 @@ use conduit::{ utils::{stream::TryIgnore, string_from_bytes}, Err, PduEvent, Result, }; -use database::{Deserialized, Ignore, Interfix, Map}; +use database::{Deserialized, Ignore, Interfix, Json, Map}; use futures::{Stream, StreamExt}; use ipaddress::IPAddress; use ruma::{ @@ -68,18 +68,12 @@ impl Service { pub fn set_pusher(&self, sender: &UserId, pusher: &set_pusher::v3::PusherAction) { match pusher { set_pusher::v3::PusherAction::Post(data) => { - let mut key = sender.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(data.pusher.ids.pushkey.as_bytes()); - self.db - .senderkey_pusher - .insert(&key, &serde_json::to_vec(pusher).expect("Pusher is valid JSON value")); + let key = (sender, &data.pusher.ids.pushkey); + self.db.senderkey_pusher.put(key, Json(pusher)); }, set_pusher::v3::PusherAction::Delete(ids) => { - let mut key = sender.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(ids.pushkey.as_bytes()); - self.db.senderkey_pusher.remove(&key); + let key = (sender, &ids.pushkey); + self.db.senderkey_pusher.del(key); }, } } diff --git a/src/service/rooms/directory/mod.rs b/src/service/rooms/directory/mod.rs index 2112ecef..f366ffe2 100644 --- a/src/service/rooms/directory/mod.rs +++ b/src/service/rooms/directory/mod.rs @@ -26,10 +26,10 @@ impl crate::Service for Service { } #[implement(Service)] -pub fn set_public(&self, room_id: &RoomId) { self.db.publicroomids.insert(room_id.as_bytes(), &[]); } +pub fn set_public(&self, room_id: &RoomId) { self.db.publicroomids.insert(room_id, []); } #[implement(Service)] -pub fn set_not_public(&self, room_id: &RoomId) { self.db.publicroomids.remove(room_id.as_bytes()); } +pub fn set_not_public(&self, room_id: &RoomId) { self.db.publicroomids.remove(room_id); } #[implement(Service)] pub async fn is_public_room(&self, room_id: &RoomId) -> bool { self.db.publicroomids.get(room_id).await.is_ok() } diff --git a/src/service/rooms/lazy_loading/mod.rs b/src/service/rooms/lazy_loading/mod.rs index 9493dcc4..7a4da2a6 100644 --- a/src/service/rooms/lazy_loading/mod.rs +++ b/src/service/rooms/lazy_loading/mod.rs @@ -79,17 +79,9 @@ pub fn lazy_load_confirm_delivery(&self, user_id: &UserId, device_id: &DeviceId, return; }; - let mut prefix = user_id.as_bytes().to_vec(); - prefix.push(0xFF); - prefix.extend_from_slice(device_id.as_bytes()); - prefix.push(0xFF); - prefix.extend_from_slice(room_id.as_bytes()); - prefix.push(0xFF); - for ll_id in &user_ids { - let mut key = prefix.clone(); - key.extend_from_slice(ll_id.as_bytes()); - self.db.lazyloadedids.insert(&key, &[]); + let key = (user_id, device_id, room_id, ll_id); + self.db.lazyloadedids.put_raw(key, []); } } diff --git a/src/service/rooms/metadata/mod.rs b/src/service/rooms/metadata/mod.rs index 8367eb72..4ee390a5 100644 --- a/src/service/rooms/metadata/mod.rs +++ b/src/service/rooms/metadata/mod.rs @@ -64,9 +64,9 @@ pub fn iter_ids(&self) -> impl Stream + Send + '_ { self.db.room #[inline] pub fn disable_room(&self, room_id: &RoomId, disabled: bool) { if disabled { - self.db.disabledroomids.insert(room_id.as_bytes(), &[]); + self.db.disabledroomids.insert(room_id, []); } else { - self.db.disabledroomids.remove(room_id.as_bytes()); + self.db.disabledroomids.remove(room_id); } } @@ -74,9 +74,9 @@ pub fn disable_room(&self, room_id: &RoomId, disabled: bool) { #[inline] pub fn ban_room(&self, room_id: &RoomId, banned: bool) { if banned { - self.db.bannedroomids.insert(room_id.as_bytes(), &[]); + self.db.bannedroomids.insert(room_id, []); } else { - self.db.bannedroomids.remove(room_id.as_bytes()); + self.db.bannedroomids.remove(room_id); } } diff --git a/src/service/rooms/outlier/mod.rs b/src/service/rooms/outlier/mod.rs index b9d04263..03e77838 100644 --- a/src/service/rooms/outlier/mod.rs +++ b/src/service/rooms/outlier/mod.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use conduit::{implement, Result}; -use database::{Deserialized, Map}; +use database::{Deserialized, Json, Map}; use ruma::{CanonicalJsonObject, EventId}; use crate::PduEvent; @@ -50,8 +50,5 @@ pub async fn get_pdu_outlier(&self, event_id: &EventId) -> Result { #[implement(Service)] #[tracing::instrument(skip(self, pdu), level = "debug")] pub fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) { - self.db.eventid_outlierpdu.insert( - event_id.as_bytes(), - &serde_json::to_vec(&pdu).expect("CanonicalJsonObject is valid"), - ); + self.db.eventid_outlierpdu.raw_put(event_id, Json(pdu)); } diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index 8e045658..4d570e6d 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -39,9 +39,10 @@ impl Data { } pub(super) fn add_relation(&self, from: u64, to: u64) { - let mut key = to.to_be_bytes().to_vec(); - key.extend_from_slice(&from.to_be_bytes()); - self.tofrom_relation.insert(&key, &[]); + const BUFSIZE: usize = size_of::() * 2; + + let key: &[u64] = &[to, from]; + self.tofrom_relation.aput_raw::(key, []); } pub(super) fn relations_until<'a>( @@ -78,9 +79,8 @@ impl Data { pub(super) fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc]) { for prev in event_ids { - let mut key = room_id.as_bytes().to_vec(); - key.extend_from_slice(prev.as_bytes()); - self.referencedevents.insert(&key, &[]); + let key = (room_id, prev); + self.referencedevents.put_raw(key, []); } } @@ -89,9 +89,7 @@ impl Data { self.referencedevents.qry(&key).await.is_ok() } - pub(super) fn mark_event_soft_failed(&self, event_id: &EventId) { - self.softfailedeventids.insert(event_id.as_bytes(), &[]); - } + pub(super) fn mark_event_soft_failed(&self, event_id: &EventId) { self.softfailedeventids.insert(event_id, []); } pub(super) async fn is_event_soft_failed(&self, event_id: &EventId) -> bool { self.softfailedeventids.get(event_id).await.is_ok() diff --git a/src/service/rooms/read_receipt/data.rs b/src/service/rooms/read_receipt/data.rs index 74b649ef..80a35e88 100644 --- a/src/service/rooms/read_receipt/data.rs +++ b/src/service/rooms/read_receipt/data.rs @@ -5,7 +5,7 @@ use conduit::{ utils::{stream::TryIgnore, ReadyExt}, Error, Result, }; -use database::{Deserialized, Map}; +use database::{Deserialized, Json, Map}; use futures::{Stream, StreamExt}; use ruma::{ events::{receipt::ReceiptEvent, AnySyncEphemeralRoomEvent}, @@ -44,33 +44,19 @@ impl Data { pub(super) async fn readreceipt_update(&self, user_id: &UserId, room_id: &RoomId, event: &ReceiptEvent) { type KeyVal<'a> = (&'a RoomId, u64, &'a UserId); - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xFF); - - let mut last_possible_key = prefix.clone(); - last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); - // Remove old entry + let last_possible_key = (room_id, u64::MAX); self.readreceiptid_readreceipt - .rev_keys_from_raw(&last_possible_key) + .rev_keys_from(&last_possible_key) .ignore_err() .ready_take_while(|(r, ..): &KeyVal<'_>| *r == room_id) .ready_filter_map(|(r, c, u): KeyVal<'_>| (u == user_id).then_some((r, c, u))) - .ready_for_each(|old: KeyVal<'_>| { - // This is the old room_latest - self.readreceiptid_readreceipt.del(&old); - }) + .ready_for_each(|old: KeyVal<'_>| self.readreceiptid_readreceipt.del(old)) .await; - let mut room_latest_id = prefix; - room_latest_id.extend_from_slice(&self.services.globals.next_count().unwrap().to_be_bytes()); - room_latest_id.push(0xFF); - room_latest_id.extend_from_slice(user_id.as_bytes()); - - self.readreceiptid_readreceipt.insert( - &room_latest_id, - &serde_json::to_vec(event).expect("EduEvent::to_string always works"), - ); + let count = self.services.globals.next_count().unwrap(); + let latest_id = (room_id, count, user_id); + self.readreceiptid_readreceipt.put(latest_id, Json(event)); } pub(super) fn readreceipts_since<'a>( @@ -113,15 +99,11 @@ impl Data { } pub(super) fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) { - let mut key = room_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(user_id.as_bytes()); + let key = (room_id, user_id); + let next_count = self.services.globals.next_count().unwrap(); - self.roomuserid_privateread - .insert(&key, &count.to_be_bytes()); - - self.roomuserid_lastprivatereadupdate - .insert(&key, &self.services.globals.next_count().unwrap().to_be_bytes()); + self.roomuserid_privateread.put(key, count); + self.roomuserid_lastprivatereadupdate.put(key, next_count); } pub(super) async fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result { diff --git a/src/service/rooms/short/mod.rs b/src/service/rooms/short/mod.rs index bd8fdcc9..609c0e07 100644 --- a/src/service/rooms/short/mod.rs +++ b/src/service/rooms/short/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{mem::size_of_val, sync::Arc}; use conduit::{err, implement, utils, Result}; use database::{Deserialized, Map}; @@ -46,6 +46,8 @@ impl crate::Service for Service { #[implement(Service)] pub async fn get_or_create_shorteventid(&self, event_id: &EventId) -> u64 { + const BUFSIZE: usize = size_of::(); + if let Ok(shorteventid) = self .db .eventid_shorteventid @@ -57,12 +59,15 @@ pub async fn get_or_create_shorteventid(&self, event_id: &EventId) -> u64 { } let shorteventid = self.services.globals.next_count().unwrap(); + debug_assert!(size_of_val(&shorteventid) == BUFSIZE, "buffer requirement changed"); + self.db .eventid_shorteventid - .insert(event_id.as_bytes(), &shorteventid.to_be_bytes()); + .raw_aput::(event_id, shorteventid); + self.db .shorteventid_eventid - .insert(&shorteventid.to_be_bytes(), event_id.as_bytes()); + .aput_raw::(shorteventid, event_id); shorteventid } @@ -77,13 +82,17 @@ pub async fn multi_get_or_create_shorteventid(&self, event_ids: &[&EventId]) -> .map(|(i, result)| match result { Ok(ref short) => utils::u64_from_u8(short), Err(_) => { + const BUFSIZE: usize = size_of::(); + let short = self.services.globals.next_count().unwrap(); + debug_assert!(size_of_val(&short) == BUFSIZE, "buffer requirement changed"); + self.db .eventid_shorteventid - .insert(event_ids[i], &short.to_be_bytes()); + .raw_aput::(event_ids[i], short); self.db .shorteventid_eventid - .insert(&short.to_be_bytes(), event_ids[i]); + .aput_raw::(short, event_ids[i]); short }, @@ -103,7 +112,9 @@ pub async fn get_shortstatekey(&self, event_type: &StateEventType, state_key: &s #[implement(Service)] pub async fn get_or_create_shortstatekey(&self, event_type: &StateEventType, state_key: &str) -> u64 { - let key = (event_type.to_string(), state_key); + const BUFSIZE: usize = size_of::(); + + let key = (event_type, state_key); if let Ok(shortstatekey) = self .db .statekey_shortstatekey @@ -114,17 +125,16 @@ pub async fn get_or_create_shortstatekey(&self, event_type: &StateEventType, sta return shortstatekey; } - let mut key = event_type.to_string().as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(state_key.as_bytes()); - let shortstatekey = self.services.globals.next_count().unwrap(); + debug_assert!(size_of_val(&shortstatekey) == BUFSIZE, "buffer requirement changed"); + self.db .statekey_shortstatekey - .insert(&key, &shortstatekey.to_be_bytes()); + .put_aput::(key, shortstatekey); + self.db .shortstatekey_statekey - .insert(&shortstatekey.to_be_bytes(), &key); + .aput_put::(shortstatekey, key); shortstatekey } @@ -177,6 +187,8 @@ pub async fn get_statekey_from_short(&self, shortstatekey: u64) -> Result<(State /// Returns (shortstatehash, already_existed) #[implement(Service)] pub async fn get_or_create_shortstatehash(&self, state_hash: &[u8]) -> (u64, bool) { + const BUFSIZE: usize = size_of::(); + if let Ok(shortstatehash) = self .db .statehash_shortstatehash @@ -188,9 +200,11 @@ pub async fn get_or_create_shortstatehash(&self, state_hash: &[u8]) -> (u64, boo } let shortstatehash = self.services.globals.next_count().unwrap(); + debug_assert!(size_of_val(&shortstatehash) == BUFSIZE, "buffer requirement changed"); + self.db .statehash_shortstatehash - .insert(state_hash, &shortstatehash.to_be_bytes()); + .raw_aput::(state_hash, shortstatehash); (shortstatehash, false) } @@ -208,10 +222,15 @@ pub async fn get_or_create_shortroomid(&self, room_id: &RoomId) -> u64 { .await .deserialized() .unwrap_or_else(|_| { + const BUFSIZE: usize = size_of::(); + let short = self.services.globals.next_count().unwrap(); + debug_assert!(size_of_val(&short) == BUFSIZE, "buffer requirement changed"); + self.db .roomid_shortroomid - .insert(room_id.as_bytes(), &short.to_be_bytes()); + .raw_aput::(room_id, short); + short }) } diff --git a/src/service/rooms/state/data.rs b/src/service/rooms/state/data.rs index 7265038f..813f48ae 100644 --- a/src/service/rooms/state/data.rs +++ b/src/service/rooms/state/data.rs @@ -36,12 +36,12 @@ impl Data { _mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex ) { self.roomid_shortstatehash - .insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes()); + .raw_put(room_id, new_shortstatehash); } pub(super) fn set_event_state(&self, shorteventid: u64, shortstatehash: u64) { self.shorteventid_shortstatehash - .insert(&shorteventid.to_be_bytes(), &shortstatehash.to_be_bytes()); + .put(shorteventid, shortstatehash); } pub(super) async fn set_forward_extremities( @@ -57,12 +57,9 @@ impl Data { .ready_for_each(|key| self.roomid_pduleaves.remove(key)) .await; - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xFF); - for event_id in event_ids { - let mut key = prefix.clone(); - key.extend_from_slice(event_id.as_bytes()); - self.roomid_pduleaves.insert(&key, event_id.as_bytes()); + for event_id in &event_ids { + let key = (room_id, event_id); + self.roomid_pduleaves.put_raw(key, event_id); } } } diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 19f1f141..561db18a 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -481,4 +481,10 @@ impl Service { .await .map(|content: RoomEncryptionEventContent| content.algorithm) } + + pub async fn is_encrypted_room(&self, room_id: &RoomId) -> bool { + self.room_state_get(room_id, &StateEventType::RoomEncryption, "") + .await + .is_ok() + } } diff --git a/src/service/rooms/state_cache/data.rs b/src/service/rooms/state_cache/data.rs index 6e01e49d..c06c8107 100644 --- a/src/service/rooms/state_cache/data.rs +++ b/src/service/rooms/state_cache/data.rs @@ -4,7 +4,7 @@ use std::{ }; use conduit::{utils::stream::TryIgnore, Result}; -use database::{Deserialized, Interfix, Map}; +use database::{serialize_to_vec, Deserialized, Interfix, Json, Map}; use futures::{Stream, StreamExt}; use ruma::{ events::{AnyStrippedStateEvent, AnySyncStateEvent}, @@ -63,71 +63,62 @@ impl Data { } pub(super) fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) { - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); - self.roomuseroncejoinedids.insert(&userroom_id, &[]); + let key = (user_id, room_id); + + self.roomuseroncejoinedids.put_raw(key, []); } pub(super) fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) { - let roomid = room_id.as_bytes().to_vec(); + let userroom_id = (user_id, room_id); + let userroom_id = serialize_to_vec(userroom_id).expect("failed to serialize userroom_id"); - let mut roomuser_id = roomid.clone(); - roomuser_id.push(0xFF); - roomuser_id.extend_from_slice(user_id.as_bytes()); + let roomuser_id = (room_id, user_id); + let roomuser_id = serialize_to_vec(roomuser_id).expect("failed to serialize roomuser_id"); - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); + self.userroomid_joined.insert(&userroom_id, []); + self.roomuserid_joined.insert(&roomuser_id, []); - self.userroomid_joined.insert(&userroom_id, &[]); - self.roomuserid_joined.insert(&roomuser_id, &[]); self.userroomid_invitestate.remove(&userroom_id); self.roomuserid_invitecount.remove(&roomuser_id); + self.userroomid_leftstate.remove(&userroom_id); self.roomuserid_leftcount.remove(&roomuser_id); - self.roomid_inviteviaservers.remove(&roomid); + self.roomid_inviteviaservers.remove(room_id); } pub(super) fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) { - let roomid = room_id.as_bytes().to_vec(); + let userroom_id = (user_id, room_id); + let userroom_id = serialize_to_vec(userroom_id).expect("failed to serialize userroom_id"); - let mut roomuser_id = roomid.clone(); - roomuser_id.push(0xFF); - roomuser_id.extend_from_slice(user_id.as_bytes()); + let roomuser_id = (room_id, user_id); + let roomuser_id = serialize_to_vec(roomuser_id).expect("failed to serialize roomuser_id"); - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); + // (timo) TODO + let leftstate = Vec::>::new(); + let count = self.services.globals.next_count().unwrap(); + + self.userroomid_leftstate + .raw_put(&userroom_id, Json(leftstate)); + self.roomuserid_leftcount.raw_put(&roomuser_id, count); - self.userroomid_leftstate.insert( - &userroom_id, - &serde_json::to_vec(&Vec::>::new()).unwrap(), - ); // TODO - self.roomuserid_leftcount - .insert(&roomuser_id, &self.services.globals.next_count().unwrap().to_be_bytes()); self.userroomid_joined.remove(&userroom_id); self.roomuserid_joined.remove(&roomuser_id); + self.userroomid_invitestate.remove(&userroom_id); self.roomuserid_invitecount.remove(&roomuser_id); - self.roomid_inviteviaservers.remove(&roomid); + self.roomid_inviteviaservers.remove(room_id); } /// Makes a user forget a room. #[tracing::instrument(skip(self), level = "debug")] pub(super) fn forget(&self, room_id: &RoomId, user_id: &UserId) { - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); + let userroom_id = (user_id, room_id); + let roomuser_id = (room_id, user_id); - let mut roomuser_id = room_id.as_bytes().to_vec(); - roomuser_id.push(0xFF); - roomuser_id.extend_from_slice(user_id.as_bytes()); - - self.userroomid_leftstate.remove(&userroom_id); - self.roomuserid_leftcount.remove(&roomuser_id); + self.userroomid_leftstate.del(userroom_id); + self.roomuserid_leftcount.del(roomuser_id); } /// Returns an iterator over all rooms a user was invited to. diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index edfae529..077eee10 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -3,13 +3,13 @@ mod data; use std::{collections::HashSet, sync::Arc}; use conduit::{ - err, + err, is_not_empty, utils::{stream::TryIgnore, ReadyExt, StreamTools}, warn, Result, }; use data::Data; -use database::{Deserialized, Ignore, Interfix}; -use futures::{Stream, StreamExt}; +use database::{serialize_to_vec, Deserialized, Ignore, Interfix, Json}; +use futures::{stream::iter, Stream, StreamExt}; use itertools::Itertools; use ruma::{ events::{ @@ -547,50 +547,37 @@ impl Service { .unwrap_or(0), ); - self.db - .roomid_joinedcount - .insert(room_id.as_bytes(), &joinedcount.to_be_bytes()); - - self.db - .roomid_invitedcount - .insert(room_id.as_bytes(), &invitedcount.to_be_bytes()); + self.db.roomid_joinedcount.raw_put(room_id, joinedcount); + self.db.roomid_invitedcount.raw_put(room_id, invitedcount); self.room_servers(room_id) .ready_for_each(|old_joined_server| { - if !joined_servers.remove(old_joined_server) { - // Server not in room anymore - let mut roomserver_id = room_id.as_bytes().to_vec(); - roomserver_id.push(0xFF); - roomserver_id.extend_from_slice(old_joined_server.as_bytes()); - - let mut serverroom_id = old_joined_server.as_bytes().to_vec(); - serverroom_id.push(0xFF); - serverroom_id.extend_from_slice(room_id.as_bytes()); - - self.db.roomserverids.remove(&roomserver_id); - self.db.serverroomids.remove(&serverroom_id); + if joined_servers.remove(old_joined_server) { + return; } + + // Server not in room anymore + let roomserver_id = (room_id, old_joined_server); + let serverroom_id = (old_joined_server, room_id); + + self.db.roomserverids.del(roomserver_id); + self.db.serverroomids.del(serverroom_id); }) .await; // Now only new servers are in joined_servers anymore - for server in joined_servers { - let mut roomserver_id = room_id.as_bytes().to_vec(); - roomserver_id.push(0xFF); - roomserver_id.extend_from_slice(server.as_bytes()); + for server in &joined_servers { + let roomserver_id = (room_id, server); + let serverroom_id = (server, room_id); - let mut serverroom_id = server.as_bytes().to_vec(); - serverroom_id.push(0xFF); - serverroom_id.extend_from_slice(room_id.as_bytes()); - - self.db.roomserverids.insert(&roomserver_id, &[]); - self.db.serverroomids.insert(&serverroom_id, &[]); + self.db.roomserverids.put_raw(roomserver_id, []); + self.db.serverroomids.put_raw(serverroom_id, []); } self.db .appservice_in_room_cache .write() - .unwrap() + .expect("locked") .remove(room_id); } @@ -598,44 +585,44 @@ impl Service { &self, user_id: &UserId, room_id: &RoomId, last_state: Option>>, invite_via: Option>, ) { - let mut roomuser_id = room_id.as_bytes().to_vec(); - roomuser_id.push(0xFF); - roomuser_id.extend_from_slice(user_id.as_bytes()); + let roomuser_id = (room_id, user_id); + let roomuser_id = serialize_to_vec(roomuser_id).expect("failed to serialize roomuser_id"); - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); + let userroom_id = (user_id, room_id); + let userroom_id = serialize_to_vec(userroom_id).expect("failed to serialize userroom_id"); + + self.db + .userroomid_invitestate + .raw_put(&userroom_id, Json(last_state.unwrap_or_default())); - self.db.userroomid_invitestate.insert( - &userroom_id, - &serde_json::to_vec(&last_state.unwrap_or_default()).expect("state to bytes always works"), - ); self.db .roomuserid_invitecount - .insert(&roomuser_id, &self.services.globals.next_count().unwrap().to_be_bytes()); + .raw_aput::<8, _, _>(&roomuser_id, self.services.globals.next_count().unwrap()); + self.db.userroomid_joined.remove(&userroom_id); self.db.roomuserid_joined.remove(&roomuser_id); + self.db.userroomid_leftstate.remove(&userroom_id); self.db.roomuserid_leftcount.remove(&roomuser_id); - if let Some(servers) = invite_via.as_deref() { + if let Some(servers) = invite_via.filter(is_not_empty!()) { self.add_servers_invite_via(room_id, servers).await; } } #[tracing::instrument(skip(self, servers), level = "debug")] - pub async fn add_servers_invite_via(&self, room_id: &RoomId, servers: &[OwnedServerName]) { - let mut prev_servers: Vec<_> = self + pub async fn add_servers_invite_via(&self, room_id: &RoomId, servers: Vec) { + let mut servers: Vec<_> = self .servers_invite_via(room_id) .map(ToOwned::to_owned) + .chain(iter(servers.into_iter())) .collect() .await; - prev_servers.extend(servers.to_owned()); - prev_servers.sort_unstable(); - prev_servers.dedup(); + servers.sort_unstable(); + servers.dedup(); - let servers = prev_servers + let servers = servers .iter() .map(|server| server.as_bytes()) .collect_vec() diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index cb85cf19..c51b7856 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -11,7 +11,7 @@ use conduit::{ utils::{stream::TryIgnore, u64_from_u8, ReadyExt}, Err, PduCount, PduEvent, Result, }; -use database::{Database, Deserialized, KeyVal, Map}; +use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, StreamExt}; use ruma::{CanonicalJsonObject, EventId, OwnedRoomId, OwnedUserId, RoomId, UserId}; use tokio::sync::Mutex; @@ -168,10 +168,7 @@ impl Data { } pub(super) async fn append_pdu(&self, pdu_id: &[u8], pdu: &PduEvent, json: &CanonicalJsonObject, count: u64) { - self.pduid_pdu.insert( - pdu_id, - &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"), - ); + self.pduid_pdu.raw_put(pdu_id, Json(json)); self.lasttimelinecount_cache .lock() @@ -183,13 +180,10 @@ impl Data { } pub(super) fn prepend_backfill_pdu(&self, pdu_id: &[u8], event_id: &EventId, json: &CanonicalJsonObject) { - self.pduid_pdu.insert( - pdu_id, - &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"), - ); + self.pduid_pdu.raw_put(pdu_id, Json(json)); - self.eventid_pduid.insert(event_id.as_bytes(), pdu_id); - self.eventid_outlierpdu.remove(event_id.as_bytes()); + self.eventid_pduid.insert(event_id, pdu_id); + self.eventid_outlierpdu.remove(event_id); } /// Removes a pdu and creates a new one with the same id. @@ -328,5 +322,5 @@ pub(super) fn pdu_count(pdu_id: &[u8]) -> PduCount { fn increment(db: &Arc, key: &[u8]) { let old = db.get_blocking(key); let new = utils::increment(old.ok().as_deref()); - db.insert(key, &new); + db.insert(key, new); } diff --git a/src/service/rooms/user/data.rs b/src/service/rooms/user/data.rs index d4d9874c..96b009f8 100644 --- a/src/service/rooms/user/data.rs +++ b/src/service/rooms/user/data.rs @@ -38,20 +38,13 @@ impl Data { } pub(super) fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) { - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); - let mut roomuser_id = room_id.as_bytes().to_vec(); - roomuser_id.push(0xFF); - roomuser_id.extend_from_slice(user_id.as_bytes()); + let userroom_id = (user_id, room_id); + self.userroomid_highlightcount.put(userroom_id, 0_u64); + self.userroomid_notificationcount.put(userroom_id, 0_u64); - self.userroomid_notificationcount - .insert(&userroom_id, &0_u64.to_be_bytes()); - self.userroomid_highlightcount - .insert(&userroom_id, &0_u64.to_be_bytes()); - - self.roomuserid_lastnotificationread - .insert(&roomuser_id, &self.services.globals.next_count().unwrap().to_be_bytes()); + let roomuser_id = (room_id, user_id); + let count = self.services.globals.next_count().unwrap(); + self.roomuserid_lastnotificationread.put(roomuser_id, count); } pub(super) async fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> u64 { @@ -89,11 +82,8 @@ impl Data { .await .expect("room exists"); - let mut key = shortroomid.to_be_bytes().to_vec(); - key.extend_from_slice(&token.to_be_bytes()); - - self.roomsynctoken_shortstatehash - .insert(&key, &shortstatehash.to_be_bytes()); + let key: &[u64] = &[shortroomid, token]; + self.roomsynctoken_shortstatehash.put(key, shortstatehash); } pub(super) async fn get_token_shortstatehash(&self, room_id: &RoomId, token: u64) -> Result { diff --git a/src/service/sending/data.rs b/src/service/sending/data.rs index 96d4a6a9..f75a212c 100644 --- a/src/service/sending/data.rs +++ b/src/service/sending/data.rs @@ -146,8 +146,7 @@ impl Data { } pub(super) fn set_latest_educount(&self, server_name: &ServerName, last_count: u64) { - self.servername_educount - .insert(server_name.as_bytes(), &last_count.to_be_bytes()); + self.servername_educount.raw_put(server_name, last_count); } pub async fn get_latest_educount(&self, server_name: &ServerName) -> u64 { diff --git a/src/service/sending/dest.rs b/src/service/sending/dest.rs index 9968acd7..234a0b90 100644 --- a/src/service/sending/dest.rs +++ b/src/service/sending/dest.rs @@ -12,7 +12,7 @@ pub enum Destination { #[implement(Destination)] #[must_use] -pub fn get_prefix(&self) -> Vec { +pub(super) fn get_prefix(&self) -> Vec { match self { Self::Normal(server) => { let len = server.as_bytes().len().saturating_add(1); diff --git a/src/service/uiaa/mod.rs b/src/service/uiaa/mod.rs index f75f1bcd..d2865d88 100644 --- a/src/service/uiaa/mod.rs +++ b/src/service/uiaa/mod.rs @@ -8,7 +8,7 @@ use conduit::{ utils::{hash, string::EMPTY}, Error, Result, }; -use database::{Deserialized, Map}; +use database::{Deserialized, Json, Map}; use ruma::{ api::client::{ error::ErrorKind, @@ -217,21 +217,14 @@ pub fn get_uiaa_request( #[implement(Service)] fn update_uiaa_session(&self, user_id: &UserId, device_id: &DeviceId, session: &str, uiaainfo: Option<&UiaaInfo>) { - let mut userdevicesessionid = user_id.as_bytes().to_vec(); - userdevicesessionid.push(0xFF); - userdevicesessionid.extend_from_slice(device_id.as_bytes()); - userdevicesessionid.push(0xFF); - userdevicesessionid.extend_from_slice(session.as_bytes()); + let key = (user_id, device_id, session); if let Some(uiaainfo) = uiaainfo { - self.db.userdevicesessionid_uiaainfo.insert( - &userdevicesessionid, - &serde_json::to_vec(&uiaainfo).expect("UiaaInfo::to_vec always works"), - ); - } else { self.db .userdevicesessionid_uiaainfo - .remove(&userdevicesessionid); + .put(key, Json(uiaainfo)); + } else { + self.db.userdevicesessionid_uiaainfo.del(key); } } diff --git a/src/service/updates/mod.rs b/src/service/updates/mod.rs index fca63725..adc85fe6 100644 --- a/src/service/updates/mod.rs +++ b/src/service/updates/mod.rs @@ -121,10 +121,7 @@ impl Service { } #[inline] - pub fn update_check_for_updates_id(&self, id: u64) { - self.db - .insert(LAST_CHECK_FOR_UPDATES_COUNT, &id.to_be_bytes()); - } + pub fn update_check_for_updates_id(&self, id: u64) { self.db.raw_put(LAST_CHECK_FOR_UPDATES_COUNT, id); } pub async fn last_check_for_updates_id(&self) -> u64 { self.db diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index a99a7df4..589aee8a 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -3,18 +3,19 @@ use std::{collections::BTreeMap, mem, mem::size_of, sync::Arc}; use conduit::{ debug_warn, err, utils, utils::{stream::TryIgnore, string::Unquoted, ReadyExt}, - warn, Err, Error, Result, Server, + Err, Error, Result, Server, }; -use database::{Deserialized, Ignore, Interfix, Map}; -use futures::{pin_mut, FutureExt, Stream, StreamExt, TryFutureExt}; +use database::{Deserialized, Ignore, Interfix, Json, Map}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt}; use ruma::{ api::client::{device::Device, error::ErrorKind, filter::FilterDefinition}, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, - events::{ignored_user_list::IgnoredUserListEvent, AnyToDeviceEvent, GlobalAccountDataEventType, StateEventType}, + events::{ignored_user_list::IgnoredUserListEvent, AnyToDeviceEvent, GlobalAccountDataEventType}, serde::Raw, DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri, OwnedUserId, UInt, UserId, }; +use serde_json::json; use crate::{account_data, admin, globals, rooms, Dep}; @@ -194,22 +195,16 @@ impl Service { /// Hash and set the user's password to the Argon2 hash pub fn set_password(&self, user_id: &UserId, password: Option<&str>) -> Result<()> { - if let Some(password) = password { - if let Ok(hash) = utils::hash::password(password) { - self.db - .userid_password - .insert(user_id.as_bytes(), hash.as_bytes()); - Ok(()) - } else { - Err(Error::BadRequest( - ErrorKind::InvalidParam, - "Password does not meet the requirements.", - )) - } - } else { - self.db.userid_password.insert(user_id.as_bytes(), b""); - Ok(()) - } + password + .map(utils::hash::password) + .transpose() + .map_err(|e| err!(Request(InvalidParam("Password does not meet the requirements: {e}"))))? + .map_or_else( + || self.db.userid_password.insert(user_id, b""), + |hash| self.db.userid_password.insert(user_id, hash), + ); + + Ok(()) } /// Returns the displayname of a user on this homeserver. @@ -221,11 +216,9 @@ impl Service { /// need to nofify all rooms of this change. pub fn set_displayname(&self, user_id: &UserId, displayname: Option) { if let Some(displayname) = displayname { - self.db - .userid_displayname - .insert(user_id.as_bytes(), displayname.as_bytes()); + self.db.userid_displayname.insert(user_id, displayname); } else { - self.db.userid_displayname.remove(user_id.as_bytes()); + self.db.userid_displayname.remove(user_id); } } @@ -237,11 +230,9 @@ impl Service { /// Sets a new avatar_url or removes it if avatar_url is None. pub fn set_avatar_url(&self, user_id: &UserId, avatar_url: Option) { if let Some(avatar_url) = avatar_url { - self.db - .userid_avatarurl - .insert(user_id.as_bytes(), avatar_url.to_string().as_bytes()); + self.db.userid_avatarurl.insert(user_id, &avatar_url); } else { - self.db.userid_avatarurl.remove(user_id.as_bytes()); + self.db.userid_avatarurl.remove(user_id); } } @@ -253,11 +244,9 @@ impl Service { /// Sets a new avatar_url or removes it if avatar_url is None. pub fn set_blurhash(&self, user_id: &UserId, blurhash: Option) { if let Some(blurhash) = blurhash { - self.db - .userid_blurhash - .insert(user_id.as_bytes(), blurhash.as_bytes()); + self.db.userid_blurhash.insert(user_id, blurhash); } else { - self.db.userid_blurhash.remove(user_id.as_bytes()); + self.db.userid_blurhash.remove(user_id); } } @@ -269,41 +258,29 @@ impl Service { // This method should never be called for nonexistent users. We shouldn't assert // though... if !self.exists(user_id).await { - warn!("Called create_device for non-existent user {} in database", user_id); - return Err(Error::BadRequest(ErrorKind::InvalidParam, "User does not exist.")); + return Err!(Request(InvalidParam(error!("Called create_device for non-existent {user_id}")))); } - let mut userdeviceid = user_id.as_bytes().to_vec(); - userdeviceid.push(0xFF); - userdeviceid.extend_from_slice(device_id.as_bytes()); + let key = (user_id, device_id); + let val = Device { + device_id: device_id.into(), + display_name: initial_device_display_name, + last_seen_ip: client_ip, + last_seen_ts: Some(MilliSecondsSinceUnixEpoch::now()), + }; increment(&self.db.userid_devicelistversion, user_id.as_bytes()); - - self.db.userdeviceid_metadata.insert( - &userdeviceid, - &serde_json::to_vec(&Device { - device_id: device_id.into(), - display_name: initial_device_display_name, - last_seen_ip: client_ip, - last_seen_ts: Some(MilliSecondsSinceUnixEpoch::now()), - }) - .expect("Device::to_string never fails."), - ); - - self.set_token(user_id, device_id, token).await?; - - Ok(()) + self.db.userdeviceid_metadata.put(key, Json(val)); + self.set_token(user_id, device_id, token).await } /// Removes a device from a user. pub async fn remove_device(&self, user_id: &UserId, device_id: &DeviceId) { - let mut userdeviceid = user_id.as_bytes().to_vec(); - userdeviceid.push(0xFF); - userdeviceid.extend_from_slice(device_id.as_bytes()); + let userdeviceid = (user_id, device_id); // Remove tokens - if let Ok(old_token) = self.db.userdeviceid_token.get(&userdeviceid).await { - self.db.userdeviceid_token.remove(&userdeviceid); + if let Ok(old_token) = self.db.userdeviceid_token.qry(&userdeviceid).await { + self.db.userdeviceid_token.del(userdeviceid); self.db.token_userdeviceid.remove(&old_token); } @@ -320,7 +297,7 @@ impl Service { increment(&self.db.userid_devicelistversion, user_id.as_bytes()); - self.db.userdeviceid_metadata.remove(&userdeviceid); + self.db.userdeviceid_metadata.del(userdeviceid); } /// Returns an iterator over all device ids of this user. @@ -333,6 +310,11 @@ impl Service { .map(|(_, device_id): (Ignore, &DeviceId)| device_id) } + pub async fn get_token(&self, user_id: &UserId, device_id: &DeviceId) -> Result { + let key = (user_id, device_id); + self.db.userdeviceid_token.qry(&key).await.deserialized() + } + /// Replaces the access token of one device. pub async fn set_token(&self, user_id: &UserId, device_id: &DeviceId, token: &str) -> Result<()> { let key = (user_id, device_id); @@ -352,15 +334,8 @@ impl Service { } // Assign token to user device combination - let mut userdeviceid = user_id.as_bytes().to_vec(); - userdeviceid.push(0xFF); - userdeviceid.extend_from_slice(device_id.as_bytes()); - self.db - .userdeviceid_token - .insert(&userdeviceid, token.as_bytes()); - self.db - .token_userdeviceid - .insert(token.as_bytes(), &userdeviceid); + self.db.userdeviceid_token.put_raw(key, token); + self.db.token_userdeviceid.raw_put(token, key); Ok(()) } @@ -393,14 +368,12 @@ impl Service { .as_bytes(), ); - self.db.onetimekeyid_onetimekeys.insert( - &key, - &serde_json::to_vec(&one_time_key_value).expect("OneTimeKey::to_vec always works"), - ); - self.db - .userid_lastonetimekeyupdate - .insert(user_id.as_bytes(), &self.services.globals.next_count()?.to_be_bytes()); + .onetimekeyid_onetimekeys + .raw_put(key, Json(one_time_key_value)); + + let count = self.services.globals.next_count().unwrap(); + self.db.userid_lastonetimekeyupdate.raw_put(user_id, count); Ok(()) } @@ -417,9 +390,8 @@ impl Service { pub async fn take_one_time_key( &self, user_id: &UserId, device_id: &DeviceId, key_algorithm: &DeviceKeyAlgorithm, ) -> Result<(OwnedDeviceKeyId, Raw)> { - self.db - .userid_lastonetimekeyupdate - .insert(user_id.as_bytes(), &self.services.globals.next_count()?.to_be_bytes()); + let count = self.services.globals.next_count()?.to_be_bytes(); + self.db.userid_lastonetimekeyupdate.insert(user_id, count); let mut prefix = user_id.as_bytes().to_vec(); prefix.push(0xFF); @@ -488,15 +460,9 @@ impl Service { } pub async fn add_device_keys(&self, user_id: &UserId, device_id: &DeviceId, device_keys: &Raw) { - let mut userdeviceid = user_id.as_bytes().to_vec(); - userdeviceid.push(0xFF); - userdeviceid.extend_from_slice(device_id.as_bytes()); - - self.db.keyid_key.insert( - &userdeviceid, - &serde_json::to_vec(&device_keys).expect("DeviceKeys::to_vec always works"), - ); + let key = (user_id, device_id); + self.db.keyid_key.put(key, Json(device_keys)); self.mark_device_key_update(user_id).await; } @@ -611,13 +577,8 @@ impl Service { .ok_or_else(|| err!(Database("signatures in keyid_key for a user is invalid.")))? .insert(signature.0, signature.1.into()); - let mut key = target_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(key_id.as_bytes()); - self.db.keyid_key.insert( - &key, - &serde_json::to_vec(&cross_signing_key).expect("CrossSigningKey::to_vec always works"), - ); + let key = (target_id, key_id); + self.db.keyid_key.put(key, Json(cross_signing_key)); self.mark_device_key_update(target_id).await; @@ -640,34 +601,21 @@ impl Service { } pub async fn mark_device_key_update(&self, user_id: &UserId) { - let count = self.services.globals.next_count().unwrap().to_be_bytes(); + let count = self.services.globals.next_count().unwrap(); - let rooms_joined = self.services.state_cache.rooms_joined(user_id); - - pin_mut!(rooms_joined); - while let Some(room_id) = rooms_joined.next().await { + self.services + .state_cache + .rooms_joined(user_id) // Don't send key updates to unencrypted rooms - if self - .services - .state_accessor - .room_state_get(room_id, &StateEventType::RoomEncryption, "") - .await - .is_err() - { - continue; - } + .filter(|room_id| self.services.state_accessor.is_encrypted_room(room_id)) + .ready_for_each(|room_id| { + let key = (room_id, count); + self.db.keychangeid_userid.put_raw(key, user_id); + }) + .await; - let mut key = room_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(&count); - - self.db.keychangeid_userid.insert(&key, user_id.as_bytes()); - } - - let mut key = user_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(&count); - self.db.keychangeid_userid.insert(&key, user_id.as_bytes()); + let key = (user_id, count); + self.db.keychangeid_userid.put_raw(key, user_id); } pub async fn get_device_keys<'a>(&'a self, user_id: &'a UserId, device_id: &DeviceId) -> Result> { @@ -681,12 +629,7 @@ impl Service { where F: Fn(&UserId) -> bool + Send + Sync, { - let key = self - .db - .keyid_key - .get(key_id) - .await - .deserialized::()?; + let key: serde_json::Value = self.db.keyid_key.get(key_id).await.deserialized()?; let cleaned = clean_signatures(key, sender_user, user_id, allowed_signatures)?; let raw_value = serde_json::value::to_raw_value(&cleaned)?; @@ -718,29 +661,29 @@ impl Service { } pub async fn get_user_signing_key(&self, user_id: &UserId) -> Result> { - let key_id = self.db.userid_usersigningkeyid.get(user_id).await?; - - self.db.keyid_key.get(&*key_id).await.deserialized() + self.db + .userid_usersigningkeyid + .get(user_id) + .and_then(|key_id| self.db.keyid_key.get(&*key_id)) + .await + .deserialized() } pub async fn add_to_device_event( &self, sender: &UserId, target_user_id: &UserId, target_device_id: &DeviceId, event_type: &str, content: serde_json::Value, ) { - let mut key = target_user_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(target_device_id.as_bytes()); - key.push(0xFF); - key.extend_from_slice(&self.services.globals.next_count().unwrap().to_be_bytes()); + let count = self.services.globals.next_count().unwrap(); - let mut json = serde_json::Map::new(); - json.insert("type".to_owned(), event_type.to_owned().into()); - json.insert("sender".to_owned(), sender.to_string().into()); - json.insert("content".to_owned(), content); - - let value = serde_json::to_vec(&json).expect("Map::to_vec always works"); - - self.db.todeviceid_events.insert(&key, &value); + let key = (target_user_id, target_device_id, count); + self.db.todeviceid_events.put( + key, + Json(json!({ + "type": event_type, + "sender": sender, + "content": content, + })), + ); } pub fn get_to_device_events<'a>( @@ -783,13 +726,8 @@ impl Service { pub async fn update_device_metadata(&self, user_id: &UserId, device_id: &DeviceId, device: &Device) -> Result<()> { increment(&self.db.userid_devicelistversion, user_id.as_bytes()); - let mut userdeviceid = user_id.as_bytes().to_vec(); - userdeviceid.push(0xFF); - userdeviceid.extend_from_slice(device_id.as_bytes()); - self.db.userdeviceid_metadata.insert( - &userdeviceid, - &serde_json::to_vec(device).expect("Device::to_string always works"), - ); + let key = (user_id, device_id); + self.db.userdeviceid_metadata.put(key, Json(device)); Ok(()) } @@ -824,23 +762,15 @@ impl Service { pub fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> String { let filter_id = utils::random_string(4); - let mut key = user_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(filter_id.as_bytes()); - - self.db - .userfilterid_filter - .insert(&key, &serde_json::to_vec(&filter).expect("filter is valid json")); + let key = (user_id, &filter_id); + self.db.userfilterid_filter.put(key, Json(filter)); filter_id } pub async fn get_filter(&self, user_id: &UserId, filter_id: &str) -> Result { - self.db - .userfilterid_filter - .qry(&(user_id, filter_id)) - .await - .deserialized() + let key = (user_id, filter_id); + self.db.userfilterid_filter.qry(&key).await.deserialized() } /// Creates an OpenID token, which can be used to prove that a user has @@ -913,17 +843,13 @@ impl Service { /// Sets a new profile key value, removes the key if value is None pub fn set_profile_key(&self, user_id: &UserId, profile_key: &str, profile_key_value: Option) { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(profile_key.as_bytes()); - // TODO: insert to the stable MSC4175 key when it's stable - if let Some(value) = profile_key_value { - let value = serde_json::to_vec(&value).unwrap(); + let key = (user_id, profile_key); - self.db.useridprofilekey_value.insert(&key, &value); + if let Some(value) = profile_key_value { + self.db.useridprofilekey_value.put(key, value); } else { - self.db.useridprofilekey_value.remove(&key); + self.db.useridprofilekey_value.del(key); } } @@ -945,17 +871,13 @@ impl Service { /// Sets a new timezone or removes it if timezone is None. pub fn set_timezone(&self, user_id: &UserId, timezone: Option) { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(b"us.cloke.msc4175.tz"); - // TODO: insert to the stable MSC4175 key when it's stable + let key = (user_id, "us.cloke.msc4175.tz"); + if let Some(timezone) = timezone { - self.db - .useridprofilekey_value - .insert(&key, timezone.as_bytes()); + self.db.useridprofilekey_value.put_raw(key, &timezone); } else { - self.db.useridprofilekey_value.remove(&key); + self.db.useridprofilekey_value.del(key); } } } @@ -1012,5 +934,5 @@ where fn increment(db: &Arc, key: &[u8]) { let old = db.get_blocking(key); let new = utils::increment(old.ok().as_deref()); - db.insert(key, &new); + db.insert(key, new); }