From c569881b0853245dea0f8704342d6cfa6c465edb Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 29 Sep 2024 13:13:09 +0000 Subject: [PATCH] merge rooms/short Data w/ Service; optimize queries Signed-off-by: Jason Volk --- src/service/account_data/mod.rs | 2 +- src/service/appservice/data.rs | 2 +- src/service/globals/data.rs | 12 +- src/service/globals/migrations.rs | 6 +- src/service/media/data.rs | 2 +- src/service/presence/data.rs | 6 +- src/service/rooms/alias/mod.rs | 6 +- src/service/rooms/directory/mod.rs | 2 +- src/service/rooms/outlier/mod.rs | 4 +- src/service/rooms/short/data.rs | 167 --------------- src/service/rooms/short/mod.rs | 232 +++++++++++++++++---- src/service/rooms/state/data.rs | 2 +- src/service/rooms/state_accessor/data.rs | 4 +- src/service/rooms/state_cache/mod.rs | 8 +- src/service/rooms/state_compressor/data.rs | 4 +- src/service/rooms/timeline/data.rs | 22 +- src/service/sending/data.rs | 6 +- src/service/users/mod.rs | 32 +-- 18 files changed, 257 insertions(+), 262 deletions(-) delete mode 100644 src/service/rooms/short/data.rs diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index 4f00cff1..482229e7 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -106,7 +106,7 @@ pub async fn get( self.db .roomusertype_roomuserdataid .qry(&key) - .and_then(|roomuserdataid| self.db.roomuserdataid_accountdata.qry(&roomuserdataid)) + .and_then(|roomuserdataid| self.db.roomuserdataid_accountdata.get(&roomuserdataid)) .await .deserialized() } diff --git a/src/service/appservice/data.rs b/src/service/appservice/data.rs index f31c5e63..4eb9d09e 100644 --- a/src/service/appservice/data.rs +++ b/src/service/appservice/data.rs @@ -38,7 +38,7 @@ impl Data { pub async fn get_registration(&self, id: &str) -> Result { self.id_appserviceregistrations - .qry(id) + .get(id) .await .deserialized() .map_err(|e| err!(Database("Invalid appservice {id:?} registration: {e:?}"))) diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index 5332f07d..57a295d9 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -260,7 +260,7 @@ impl Data { &self, origin: &ServerName, new_keys: ServerSigningKeys, ) -> BTreeMap { // Not atomic, but this is not critical - let signingkeys = self.server_signingkeys.qry(origin).await; + let signingkeys = self.server_signingkeys.get(origin).await; let mut keys = signingkeys .and_then(|keys| serde_json::from_slice(&keys).map_err(Into::into)) @@ -311,10 +311,16 @@ impl Data { } pub async fn signing_keys_for(&self, origin: &ServerName) -> Result { - self.server_signingkeys.qry(origin).await.deserialized() + self.server_signingkeys.get(origin).await.deserialized() } - pub async fn database_version(&self) -> u64 { self.global.qry("version").await.deserialized().unwrap_or(0) } + pub async fn database_version(&self) -> u64 { + self.global + .get(b"version") + .await + .deserialized() + .unwrap_or(0) + } #[inline] pub fn bump_database_version(&self, new_version: u64) -> Result<()> { diff --git a/src/service/globals/migrations.rs b/src/service/globals/migrations.rs index c7a73230..469159fc 100644 --- a/src/service/globals/migrations.rs +++ b/src/service/globals/migrations.rs @@ -99,14 +99,14 @@ async fn migrate(services: &Services) -> Result<()> { db_lt_13(services).await?; } - if db["global"].qry("feat_sha256_media").await.is_not_found() { + if db["global"].get(b"feat_sha256_media").await.is_not_found() { media::migrations::migrate_sha256_media(services).await?; } else if config.media_startup_check { media::migrations::checkup_sha256_media(services).await?; } if db["global"] - .qry("fix_bad_double_separator_in_state_cache") + .get(b"fix_bad_double_separator_in_state_cache") .await .is_not_found() { @@ -114,7 +114,7 @@ async fn migrate(services: &Services) -> Result<()> { } if db["global"] - .qry("retroactively_fix_bad_data_from_roomuserid_joined") + .get(b"retroactively_fix_bad_data_from_roomuserid_joined") .await .is_not_found() { diff --git a/src/service/media/data.rs b/src/service/media/data.rs index 29d562cc..248e9e1d 100644 --- a/src/service/media/data.rs +++ b/src/service/media/data.rs @@ -253,7 +253,7 @@ impl Data { } pub(super) async fn get_url_preview(&self, url: &str) -> Result { - let values = self.url_previews.qry(url).await?; + let values = self.url_previews.get(url).await?; let mut values = values.split(|&b| b == 0xFF); diff --git a/src/service/presence/data.rs b/src/service/presence/data.rs index 0c3f3d31..9c9d0ae3 100644 --- a/src/service/presence/data.rs +++ b/src/service/presence/data.rs @@ -39,12 +39,12 @@ impl Data { pub async fn get_presence(&self, user_id: &UserId) -> Result<(u64, PresenceEvent)> { let count = self .userid_presenceid - .qry(user_id) + .get(user_id) .await .deserialized::()?; let key = presenceid_key(count, user_id); - let bytes = self.presenceid_presence.qry(&key).await?; + let bytes = self.presenceid_presence.get(&key).await?; let event = Presence::from_json_bytes(&bytes)? .to_presence_event(user_id, &self.services.users) .await; @@ -127,7 +127,7 @@ impl Data { pub(super) async fn remove_presence(&self, user_id: &UserId) { let Ok(count) = self .userid_presenceid - .qry(user_id) + .get(user_id) .await .deserialized::() else { diff --git a/src/service/rooms/alias/mod.rs b/src/service/rooms/alias/mod.rs index 6b81a221..1d44cd2d 100644 --- a/src/service/rooms/alias/mod.rs +++ b/src/service/rooms/alias/mod.rs @@ -94,7 +94,7 @@ impl Service { } let alias = alias.alias(); - let Ok(room_id) = self.db.alias_roomid.qry(&alias).await else { + let Ok(room_id) = self.db.alias_roomid.get(&alias).await else { return Err!(Request(NotFound("Alias does not exist or is invalid."))); }; @@ -151,7 +151,7 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub async fn resolve_local_alias(&self, alias: &RoomAliasId) -> Result { - self.db.alias_roomid.qry(alias.alias()).await.deserialized() + self.db.alias_roomid.get(alias.alias()).await.deserialized() } #[tracing::instrument(skip(self), level = "debug")] @@ -219,7 +219,7 @@ impl Service { } async fn who_created_alias(&self, alias: &RoomAliasId) -> Result { - self.db.alias_userid.qry(alias.alias()).await.deserialized() + self.db.alias_userid.get(alias.alias()).await.deserialized() } async fn resolve_appservice_alias(&self, room_alias: &RoomAliasId) -> Result> { diff --git a/src/service/rooms/directory/mod.rs b/src/service/rooms/directory/mod.rs index 3585205d..5666a91a 100644 --- a/src/service/rooms/directory/mod.rs +++ b/src/service/rooms/directory/mod.rs @@ -32,7 +32,7 @@ pub fn set_public(&self, room_id: &RoomId) { self.db.publicroomids.insert(room_i pub fn set_not_public(&self, room_id: &RoomId) { self.db.publicroomids.remove(room_id.as_bytes()); } #[implement(Service)] -pub async fn is_public_room(&self, room_id: &RoomId) -> bool { self.db.publicroomids.qry(room_id).await.is_ok() } +pub async fn is_public_room(&self, room_id: &RoomId) -> bool { self.db.publicroomids.get(room_id).await.is_ok() } #[implement(Service)] pub fn public_rooms(&self) -> impl Stream + Send { diff --git a/src/service/rooms/outlier/mod.rs b/src/service/rooms/outlier/mod.rs index 4c9225ae..b9d04263 100644 --- a/src/service/rooms/outlier/mod.rs +++ b/src/service/rooms/outlier/mod.rs @@ -31,7 +31,7 @@ impl crate::Service for Service { pub async fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result { self.db .eventid_outlierpdu - .qry(event_id) + .get(event_id) .await .deserialized() } @@ -41,7 +41,7 @@ pub async fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result Result { self.db .eventid_outlierpdu - .qry(event_id) + .get(event_id) .await .deserialized() } diff --git a/src/service/rooms/short/data.rs b/src/service/rooms/short/data.rs deleted file mode 100644 index fff3f2d6..00000000 --- a/src/service/rooms/short/data.rs +++ /dev/null @@ -1,167 +0,0 @@ -use std::sync::Arc; - -use conduit::{err, utils, Error, Result}; -use database::{Deserialized, Map}; -use ruma::{events::StateEventType, EventId, RoomId}; - -use crate::{globals, Dep}; - -pub(super) struct Data { - eventid_shorteventid: Arc, - shorteventid_eventid: Arc, - statekey_shortstatekey: Arc, - shortstatekey_statekey: Arc, - roomid_shortroomid: Arc, - statehash_shortstatehash: Arc, - services: Services, -} - -struct Services { - globals: Dep, -} - -impl Data { - pub(super) fn new(args: &crate::Args<'_>) -> Self { - let db = &args.db; - Self { - eventid_shorteventid: db["eventid_shorteventid"].clone(), - shorteventid_eventid: db["shorteventid_eventid"].clone(), - statekey_shortstatekey: db["statekey_shortstatekey"].clone(), - shortstatekey_statekey: db["shortstatekey_statekey"].clone(), - roomid_shortroomid: db["roomid_shortroomid"].clone(), - statehash_shortstatehash: db["statehash_shortstatehash"].clone(), - services: Services { - globals: args.depend::("globals"), - }, - } - } - - pub(super) async fn get_or_create_shorteventid(&self, event_id: &EventId) -> u64 { - if let Ok(shorteventid) = self.eventid_shorteventid.qry(event_id).await.deserialized() { - return shorteventid; - } - - let shorteventid = self.services.globals.next_count().unwrap(); - self.eventid_shorteventid - .insert(event_id.as_bytes(), &shorteventid.to_be_bytes()); - self.shorteventid_eventid - .insert(&shorteventid.to_be_bytes(), event_id.as_bytes()); - - shorteventid - } - - pub(super) async fn multi_get_or_create_shorteventid(&self, event_ids: &[&EventId]) -> Vec { - let mut ret: Vec = Vec::with_capacity(event_ids.len()); - let keys = event_ids - .iter() - .map(|id| id.as_bytes()) - .collect::>(); - - for (i, short) in self - .eventid_shorteventid - .get_batch_blocking(keys.iter()) - .iter() - .enumerate() - { - #[allow(clippy::single_match_else)] - match short { - Some(short) => ret.push( - utils::u64_from_bytes(short) - .map_err(|_| Error::bad_database("Invalid shorteventid in db.")) - .unwrap(), - ), - None => { - let short = self.services.globals.next_count().unwrap(); - self.eventid_shorteventid - .insert(keys[i], &short.to_be_bytes()); - self.shorteventid_eventid - .insert(&short.to_be_bytes(), keys[i]); - - debug_assert!(ret.len() == i, "position of result must match input"); - ret.push(short); - }, - } - } - - ret - } - - pub(super) async fn get_shortstatekey(&self, event_type: &StateEventType, state_key: &str) -> Result { - let key = (event_type, state_key); - self.statekey_shortstatekey.qry(&key).await.deserialized() - } - - pub(super) async fn get_or_create_shortstatekey(&self, event_type: &StateEventType, state_key: &str) -> u64 { - let key = (event_type.to_string(), state_key); - if let Ok(shortstatekey) = self.statekey_shortstatekey.qry(&key).await.deserialized() { - return shortstatekey; - } - - let mut key = event_type.to_string().as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(state_key.as_bytes()); - - let shortstatekey = self.services.globals.next_count().unwrap(); - self.statekey_shortstatekey - .insert(&key, &shortstatekey.to_be_bytes()); - self.shortstatekey_statekey - .insert(&shortstatekey.to_be_bytes(), &key); - - shortstatekey - } - - pub(super) async fn get_eventid_from_short(&self, shorteventid: u64) -> Result> { - self.shorteventid_eventid - .qry(&shorteventid) - .await - .deserialized() - .map_err(|e| err!(Database("Failed to find EventId from short {shorteventid:?}: {e:?}"))) - } - - pub(super) async fn get_statekey_from_short(&self, shortstatekey: u64) -> Result<(StateEventType, String)> { - self.shortstatekey_statekey - .qry(&shortstatekey) - .await - .deserialized() - .map_err(|e| { - err!(Database( - "Failed to find (StateEventType, state_key) from short {shortstatekey:?}: {e:?}" - )) - }) - } - - /// Returns (shortstatehash, already_existed) - pub(super) async fn get_or_create_shortstatehash(&self, state_hash: &[u8]) -> (u64, bool) { - if let Ok(shortstatehash) = self - .statehash_shortstatehash - .qry(state_hash) - .await - .deserialized() - { - return (shortstatehash, true); - } - - let shortstatehash = self.services.globals.next_count().unwrap(); - self.statehash_shortstatehash - .insert(state_hash, &shortstatehash.to_be_bytes()); - - (shortstatehash, false) - } - - pub(super) async fn get_shortroomid(&self, room_id: &RoomId) -> Result { - self.roomid_shortroomid.qry(room_id).await.deserialized() - } - - pub(super) async fn get_or_create_shortroomid(&self, room_id: &RoomId) -> u64 { - self.roomid_shortroomid - .qry(room_id) - .await - .deserialized() - .unwrap_or_else(|_| { - let short = self.services.globals.next_count().unwrap(); - self.roomid_shortroomid - .insert(room_id.as_bytes(), &short.to_be_bytes()); - short - }) - } -} diff --git a/src/service/rooms/short/mod.rs b/src/service/rooms/short/mod.rs index 00bb7cb1..66da3948 100644 --- a/src/service/rooms/short/mod.rs +++ b/src/service/rooms/short/mod.rs @@ -1,61 +1,215 @@ -mod data; - use std::sync::Arc; -use conduit::Result; +use conduit::{err, implement, utils, Error, Result}; +use database::{Deserialized, Map}; use ruma::{events::StateEventType, EventId, RoomId}; -use self::data::Data; +use crate::{globals, Dep}; pub struct Service { db: Data, + services: Services, +} + +struct Data { + eventid_shorteventid: Arc, + shorteventid_eventid: Arc, + statekey_shortstatekey: Arc, + shortstatekey_statekey: Arc, + roomid_shortroomid: Arc, + statehash_shortstatehash: Arc, +} + +struct Services { + globals: Dep, } impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { - db: Data::new(&args), + db: Data { + eventid_shorteventid: args.db["eventid_shorteventid"].clone(), + shorteventid_eventid: args.db["shorteventid_eventid"].clone(), + statekey_shortstatekey: args.db["statekey_shortstatekey"].clone(), + shortstatekey_statekey: args.db["shortstatekey_statekey"].clone(), + roomid_shortroomid: args.db["roomid_shortroomid"].clone(), + statehash_shortstatehash: args.db["statehash_shortstatehash"].clone(), + }, + services: Services { + globals: args.depend::("globals"), + }, })) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } -impl Service { - pub async fn get_or_create_shorteventid(&self, event_id: &EventId) -> u64 { - self.db.get_or_create_shorteventid(event_id).await +#[implement(Service)] +pub async fn get_or_create_shorteventid(&self, event_id: &EventId) -> u64 { + if let Ok(shorteventid) = self + .db + .eventid_shorteventid + .get(event_id) + .await + .deserialized() + { + return shorteventid; } - pub async fn multi_get_or_create_shorteventid(&self, event_ids: &[&EventId]) -> Vec { - self.db.multi_get_or_create_shorteventid(event_ids).await - } + let shorteventid = self.services.globals.next_count().unwrap(); + self.db + .eventid_shorteventid + .insert(event_id.as_bytes(), &shorteventid.to_be_bytes()); + self.db + .shorteventid_eventid + .insert(&shorteventid.to_be_bytes(), event_id.as_bytes()); - pub async fn get_shortstatekey(&self, event_type: &StateEventType, state_key: &str) -> Result { - self.db.get_shortstatekey(event_type, state_key).await - } - - pub async fn get_or_create_shortstatekey(&self, event_type: &StateEventType, state_key: &str) -> u64 { - self.db - .get_or_create_shortstatekey(event_type, state_key) - .await - } - - pub async fn get_eventid_from_short(&self, shorteventid: u64) -> Result> { - self.db.get_eventid_from_short(shorteventid).await - } - - pub async fn get_statekey_from_short(&self, shortstatekey: u64) -> Result<(StateEventType, String)> { - self.db.get_statekey_from_short(shortstatekey).await - } - - /// Returns (shortstatehash, already_existed) - pub async fn get_or_create_shortstatehash(&self, state_hash: &[u8]) -> (u64, bool) { - self.db.get_or_create_shortstatehash(state_hash).await - } - - pub async fn get_shortroomid(&self, room_id: &RoomId) -> Result { self.db.get_shortroomid(room_id).await } - - pub async fn get_or_create_shortroomid(&self, room_id: &RoomId) -> u64 { - self.db.get_or_create_shortroomid(room_id).await - } + shorteventid +} + +#[implement(Service)] +pub async fn multi_get_or_create_shorteventid(&self, event_ids: &[&EventId]) -> Vec { + let mut ret: Vec = Vec::with_capacity(event_ids.len()); + let keys = event_ids + .iter() + .map(|id| id.as_bytes()) + .collect::>(); + + for (i, short) in self + .db + .eventid_shorteventid + .get_batch_blocking(keys.iter()) + .iter() + .enumerate() + { + match short { + Some(short) => ret.push( + utils::u64_from_bytes(short) + .map_err(|_| Error::bad_database("Invalid shorteventid in db.")) + .unwrap(), + ), + None => { + let short = self.services.globals.next_count().unwrap(); + self.db + .eventid_shorteventid + .insert(keys[i], &short.to_be_bytes()); + self.db + .shorteventid_eventid + .insert(&short.to_be_bytes(), keys[i]); + + debug_assert!(ret.len() == i, "position of result must match input"); + ret.push(short); + }, + } + } + + ret +} + +#[implement(Service)] +pub async fn get_shortstatekey(&self, event_type: &StateEventType, state_key: &str) -> Result { + let key = (event_type, state_key); + self.db + .statekey_shortstatekey + .qry(&key) + .await + .deserialized() +} + +#[implement(Service)] +pub async fn get_or_create_shortstatekey(&self, event_type: &StateEventType, state_key: &str) -> u64 { + let key = (event_type.to_string(), state_key); + if let Ok(shortstatekey) = self + .db + .statekey_shortstatekey + .qry(&key) + .await + .deserialized() + { + return shortstatekey; + } + + let mut key = event_type.to_string().as_bytes().to_vec(); + key.push(0xFF); + key.extend_from_slice(state_key.as_bytes()); + + let shortstatekey = self.services.globals.next_count().unwrap(); + self.db + .statekey_shortstatekey + .insert(&key, &shortstatekey.to_be_bytes()); + self.db + .shortstatekey_statekey + .insert(&shortstatekey.to_be_bytes(), &key); + + shortstatekey +} + +#[implement(Service)] +pub async fn get_eventid_from_short(&self, shorteventid: u64) -> Result> { + const BUFSIZE: usize = size_of::(); + + self.db + .shorteventid_eventid + .aqry::(&shorteventid) + .await + .deserialized() + .map_err(|e| err!(Database("Failed to find EventId from short {shorteventid:?}: {e:?}"))) +} + +#[implement(Service)] +pub async fn get_statekey_from_short(&self, shortstatekey: u64) -> Result<(StateEventType, String)> { + const BUFSIZE: usize = size_of::(); + + self.db + .shortstatekey_statekey + .aqry::(&shortstatekey) + .await + .deserialized() + .map_err(|e| { + err!(Database( + "Failed to find (StateEventType, state_key) from short {shortstatekey:?}: {e:?}" + )) + }) +} + +/// Returns (shortstatehash, already_existed) +#[implement(Service)] +pub async fn get_or_create_shortstatehash(&self, state_hash: &[u8]) -> (u64, bool) { + if let Ok(shortstatehash) = self + .db + .statehash_shortstatehash + .get(state_hash) + .await + .deserialized() + { + return (shortstatehash, true); + } + + let shortstatehash = self.services.globals.next_count().unwrap(); + self.db + .statehash_shortstatehash + .insert(state_hash, &shortstatehash.to_be_bytes()); + + (shortstatehash, false) +} + +#[implement(Service)] +pub async fn get_shortroomid(&self, room_id: &RoomId) -> Result { + self.db.roomid_shortroomid.qry(room_id).await.deserialized() +} + +#[implement(Service)] +pub async fn get_or_create_shortroomid(&self, room_id: &RoomId) -> u64 { + self.db + .roomid_shortroomid + .get(room_id) + .await + .deserialized() + .unwrap_or_else(|_| { + let short = self.services.globals.next_count().unwrap(); + self.db + .roomid_shortroomid + .insert(room_id.as_bytes(), &short.to_be_bytes()); + short + }) } diff --git a/src/service/rooms/state/data.rs b/src/service/rooms/state/data.rs index ccf7509a..3072e3c6 100644 --- a/src/service/rooms/state/data.rs +++ b/src/service/rooms/state/data.rs @@ -25,7 +25,7 @@ impl Data { } pub(super) async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result { - self.roomid_shortstatehash.qry(room_id).await.deserialized() + self.roomid_shortstatehash.get(room_id).await.deserialized() } #[inline] diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index 79a98325..adc26f00 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -157,8 +157,8 @@ impl Data { /// Returns the state hash for this pdu. pub(super) async fn pdu_shortstatehash(&self, event_id: &EventId) -> Result { self.eventid_shorteventid - .qry(event_id) - .and_then(|shorteventid| self.shorteventid_shortstatehash.qry(&shorteventid)) + .get(event_id) + .and_then(|shorteventid| self.shorteventid_shortstatehash.get(&shorteventid)) .await .deserialized() } diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index ce5b024b..eedff861 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -435,10 +435,10 @@ impl Service { /// Returns an iterator over all rooms this user joined. #[tracing::instrument(skip(self), level = "debug")] - pub fn rooms_joined(&self, user_id: &UserId) -> impl Stream + Send { + pub fn rooms_joined<'a>(&'a self, user_id: &'a UserId) -> impl Stream + Send + 'a { self.db .userroomid_joined - .keys_prefix(user_id) + .keys_prefix_raw(user_id) .ignore_err() .map(|(_, room_id): (Ignore, &RoomId)| room_id) } @@ -494,10 +494,10 @@ impl Service { } #[tracing::instrument(skip(self), level = "debug")] - pub fn servers_invite_via<'a>(&'a self, room_id: &RoomId) -> impl Stream + Send + 'a { + pub fn servers_invite_via<'a>(&'a self, room_id: &'a RoomId) -> impl Stream + Send + 'a { self.db .roomid_inviteviaservers - .stream_prefix(room_id) + .stream_prefix_raw(room_id) .ignore_err() .map(|(_, servers): (Ignore, Vec<&ServerName>)| &**(servers.last().expect("at least one servername"))) } diff --git a/src/service/rooms/state_compressor/data.rs b/src/service/rooms/state_compressor/data.rs index 9a9f70a2..cb020470 100644 --- a/src/service/rooms/state_compressor/data.rs +++ b/src/service/rooms/state_compressor/data.rs @@ -23,9 +23,11 @@ impl Data { } pub(super) async fn get_statediff(&self, shortstatehash: u64) -> Result { + const BUFSIZE: usize = size_of::(); + let value = self .shortstatehash_statediff - .qry(&shortstatehash) + .aqry::(&shortstatehash) .await .map_err(|e| err!(Database("Failed to find StateDiff from short {shortstatehash:?}: {e}")))?; diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 1f9dad1d..cb85cf19 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -79,7 +79,7 @@ impl Data { /// Returns the `count` of this pdu's id. pub(super) async fn get_pdu_count(&self, event_id: &EventId) -> Result { self.eventid_pduid - .qry(event_id) + .get(event_id) .await .map(|pdu_id| pdu_count(&pdu_id)) } @@ -90,27 +90,27 @@ impl Data { return Ok(pdu); } - self.eventid_outlierpdu.qry(event_id).await.deserialized() + self.eventid_outlierpdu.get(event_id).await.deserialized() } /// Returns the json of a pdu. pub(super) async fn get_non_outlier_pdu_json(&self, event_id: &EventId) -> Result { let pduid = self.get_pdu_id(event_id).await?; - self.pduid_pdu.qry(&pduid).await.deserialized() + self.pduid_pdu.get(&pduid).await.deserialized() } /// Returns the pdu's id. #[inline] pub(super) async fn get_pdu_id(&self, event_id: &EventId) -> Result> { - self.eventid_pduid.qry(event_id).await + self.eventid_pduid.get(event_id).await } /// Returns the pdu directly from `eventid_pduid` only. pub(super) async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result { let pduid = self.get_pdu_id(event_id).await?; - self.pduid_pdu.qry(&pduid).await.deserialized() + self.pduid_pdu.get(&pduid).await.deserialized() } /// Like get_non_outlier_pdu(), but without the expense of fetching and @@ -118,7 +118,7 @@ impl Data { pub(super) async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> Result<()> { let pduid = self.get_pdu_id(event_id).await?; - self.pduid_pdu.qry(&pduid).await?; + self.pduid_pdu.get(&pduid).await?; Ok(()) } @@ -132,7 +132,7 @@ impl Data { } self.eventid_outlierpdu - .qry(event_id) + .get(event_id) .await .deserialized() .map(Arc::new) @@ -141,7 +141,7 @@ impl Data { /// Like get_non_outlier_pdu(), but without the expense of fetching and /// parsing the PduEvent pub(super) async fn outlier_pdu_exists(&self, event_id: &EventId) -> Result<()> { - self.eventid_outlierpdu.qry(event_id).await?; + self.eventid_outlierpdu.get(event_id).await?; Ok(()) } @@ -159,12 +159,12 @@ impl Data { /// /// This does __NOT__ check the outliers `Tree`. pub(super) async fn get_pdu_from_id(&self, pdu_id: &[u8]) -> Result { - self.pduid_pdu.qry(pdu_id).await.deserialized() + self.pduid_pdu.get(pdu_id).await.deserialized() } /// Returns the pdu as a `BTreeMap`. pub(super) async fn get_pdu_json_from_id(&self, pdu_id: &[u8]) -> Result { - self.pduid_pdu.qry(pdu_id).await.deserialized() + self.pduid_pdu.get(pdu_id).await.deserialized() } pub(super) async fn append_pdu(&self, pdu_id: &[u8], pdu: &PduEvent, json: &CanonicalJsonObject, count: u64) { @@ -196,7 +196,7 @@ impl Data { pub(super) async fn replace_pdu( &self, pdu_id: &[u8], pdu_json: &CanonicalJsonObject, _pdu: &PduEvent, ) -> Result<()> { - if self.pduid_pdu.qry(pdu_id).await.is_not_found() { + if self.pduid_pdu.get(pdu_id).await.is_not_found() { return Err!(Request(NotFound("PDU does not exist."))); } diff --git a/src/service/sending/data.rs b/src/service/sending/data.rs index b96f9a03..6f4b5b97 100644 --- a/src/service/sending/data.rs +++ b/src/service/sending/data.rs @@ -98,7 +98,7 @@ impl Data { } #[inline] - pub fn active_requests_for<'a>(&'a self, destination: &Destination) -> impl Stream + Send + 'a { + pub fn active_requests_for(&self, destination: &Destination) -> impl Stream + Send + '_ { let prefix = destination.get_prefix(); self.servercurrentevent_data .stream_raw_prefix(&prefix) @@ -133,7 +133,7 @@ impl Data { keys } - pub fn queued_requests<'a>(&'a self, destination: &Destination) -> impl Stream + Send + 'a { + pub fn queued_requests(&self, destination: &Destination) -> impl Stream + Send + '_ { let prefix = destination.get_prefix(); self.servernameevent_data .stream_raw_prefix(&prefix) @@ -152,7 +152,7 @@ impl Data { pub async fn get_latest_educount(&self, server_name: &ServerName) -> u64 { self.servername_educount - .qry(server_name) + .get(server_name) .await .deserialized() .unwrap_or(0) diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index fa8c41b6..eb77ef35 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -120,13 +120,13 @@ impl Service { /// Check if a user has an account on this homeserver. #[inline] - pub async fn exists(&self, user_id: &UserId) -> bool { self.db.userid_password.qry(user_id).await.is_ok() } + pub async fn exists(&self, user_id: &UserId) -> bool { self.db.userid_password.get(user_id).await.is_ok() } /// Check if account is deactivated pub async fn is_deactivated(&self, user_id: &UserId) -> Result { self.db .userid_password - .qry(user_id) + .get(user_id) .map_ok(|val| val.is_empty()) .map_err(|_| err!(Request(NotFound("User does not exist.")))) .await @@ -146,7 +146,7 @@ impl Service { /// Find out which user an access token belongs to. pub async fn find_from_token(&self, token: &str) -> Result<(OwnedUserId, OwnedDeviceId)> { - self.db.token_userdeviceid.qry(token).await.deserialized() + self.db.token_userdeviceid.get(token).await.deserialized() } /// Returns an iterator over all users on this homeserver (offered for @@ -171,7 +171,7 @@ impl Service { /// Returns the password hash for the given user. pub async fn password_hash(&self, user_id: &UserId) -> Result { - self.db.userid_password.qry(user_id).await.deserialized() + self.db.userid_password.get(user_id).await.deserialized() } /// Hash and set the user's password to the Argon2 hash @@ -196,7 +196,7 @@ impl Service { /// Returns the displayname of a user on this homeserver. pub async fn displayname(&self, user_id: &UserId) -> Result { - self.db.userid_displayname.qry(user_id).await.deserialized() + self.db.userid_displayname.get(user_id).await.deserialized() } /// Sets a new displayname or removes it if displayname is None. You still @@ -213,7 +213,7 @@ impl Service { /// Get the `avatar_url` of a user. pub async fn avatar_url(&self, user_id: &UserId) -> Result { - self.db.userid_avatarurl.qry(user_id).await.deserialized() + self.db.userid_avatarurl.get(user_id).await.deserialized() } /// Sets a new avatar_url or removes it if avatar_url is None. @@ -229,7 +229,7 @@ impl Service { /// Get the blurhash of a user. pub async fn blurhash(&self, user_id: &UserId) -> Result { - self.db.userid_blurhash.qry(user_id).await.deserialized() + self.db.userid_blurhash.get(user_id).await.deserialized() } /// Sets a new avatar_url or removes it if avatar_url is None. @@ -284,7 +284,7 @@ impl Service { userdeviceid.extend_from_slice(device_id.as_bytes()); // Remove tokens - if let Ok(old_token) = self.db.userdeviceid_token.qry(&userdeviceid).await { + if let Ok(old_token) = self.db.userdeviceid_token.get(&userdeviceid).await { self.db.userdeviceid_token.remove(&userdeviceid); self.db.token_userdeviceid.remove(&old_token); } @@ -390,7 +390,7 @@ impl Service { pub async fn last_one_time_keys_update(&self, user_id: &UserId) -> u64 { self.db .userid_lastonetimekeyupdate - .qry(user_id) + .get(user_id) .await .deserialized() .unwrap_or(0) @@ -664,7 +664,7 @@ impl Service { let key = self .db .keyid_key - .qry(key_id) + .get(key_id) .await .deserialized::()?; @@ -679,7 +679,7 @@ impl Service { where F: Fn(&UserId) -> bool + Send + Sync, { - let key_id = self.db.userid_masterkeyid.qry(user_id).await?; + let key_id = self.db.userid_masterkeyid.get(user_id).await?; self.get_key(&key_id, sender_user, user_id, allowed_signatures) .await @@ -691,16 +691,16 @@ impl Service { where F: Fn(&UserId) -> bool + Send + Sync, { - let key_id = self.db.userid_selfsigningkeyid.qry(user_id).await?; + let key_id = self.db.userid_selfsigningkeyid.get(user_id).await?; self.get_key(&key_id, sender_user, user_id, allowed_signatures) .await } pub async fn get_user_signing_key(&self, user_id: &UserId) -> Result> { - let key_id = self.db.userid_usersigningkeyid.qry(user_id).await?; + let key_id = self.db.userid_usersigningkeyid.get(user_id).await?; - self.db.keyid_key.qry(&*key_id).await.deserialized() + self.db.keyid_key.get(&*key_id).await.deserialized() } pub async fn add_to_device_event( @@ -797,7 +797,7 @@ impl Service { pub async fn get_devicelist_version(&self, user_id: &UserId) -> Result { self.db .userid_devicelistversion - .qry(user_id) + .get(user_id) .await .deserialized() } @@ -853,7 +853,7 @@ impl Service { /// Find out which user an OpenID access token belongs to. pub async fn find_from_openid_token(&self, token: &str) -> Result { - let Ok(value) = self.db.openidtoken_expiresatuserid.qry(token).await else { + let Ok(value) = self.db.openidtoken_expiresatuserid.get(token).await else { return Err!(Request(Unauthorized("OpenID token is unrecognised"))); };