improve appservice service async interfaces

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-04-07 03:28:51 +00:00 committed by Jade Ellis
parent b3e5d2f683
commit ecf20f7ebb
No known key found for this signature in database
GPG key ID: 8705A2A3EBF77BD2
2 changed files with 36 additions and 45 deletions

View file

@ -1,5 +1,6 @@
use clap::Subcommand; use clap::Subcommand;
use conduwuit::Result; use conduwuit::Result;
use futures::TryStreamExt;
use crate::Command; use crate::Command;
@ -31,7 +32,7 @@ pub(super) async fn process(subcommand: AppserviceCommand, context: &Command<'_>
}, },
| AppserviceCommand::All => { | AppserviceCommand::All => {
let timer = tokio::time::Instant::now(); let timer = tokio::time::Instant::now();
let results = services.appservice.all().await; let results: Vec<_> = services.appservice.iter_db_ids().try_collect().await?;
let query_time = timer.elapsed(); let query_time = timer.elapsed();
write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```") write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```")

View file

@ -1,20 +1,20 @@
mod namespace_regex; mod namespace_regex;
mod registration_info; mod registration_info;
use std::{collections::BTreeMap, sync::Arc}; use std::{collections::BTreeMap, iter::IntoIterator, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use conduwuit::{Result, err, utils::stream::TryIgnore}; use conduwuit::{Result, err, utils::stream::IterStream};
use database::Map; use database::Map;
use futures::{Future, StreamExt, TryStreamExt}; use futures::{Future, FutureExt, Stream, TryStreamExt};
use ruma::{RoomAliasId, RoomId, UserId, api::appservice::Registration}; use ruma::{RoomAliasId, RoomId, UserId, api::appservice::Registration};
use tokio::sync::RwLock; use tokio::sync::{RwLock, RwLockReadGuard};
pub use self::{namespace_regex::NamespaceRegex, registration_info::RegistrationInfo}; pub use self::{namespace_regex::NamespaceRegex, registration_info::RegistrationInfo};
use crate::{Dep, sending}; use crate::{Dep, sending};
pub struct Service { pub struct Service {
registration_info: RwLock<BTreeMap<String, RegistrationInfo>>, registration_info: RwLock<Registrations>,
services: Services, services: Services,
db: Data, db: Data,
} }
@ -27,6 +27,8 @@ struct Data {
id_appserviceregistrations: Arc<Map>, id_appserviceregistrations: Arc<Map>,
} }
type Registrations = BTreeMap<String, RegistrationInfo>;
#[async_trait] #[async_trait]
impl crate::Service for Service { impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> { fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
@ -41,19 +43,18 @@ impl crate::Service for Service {
})) }))
} }
async fn worker(self: Arc<Self>) -> Result<()> { async fn worker(self: Arc<Self>) -> Result {
// Inserting registrations into cache // Inserting registrations into cache
for appservice in self.iter_db_ids().await? { self.iter_db_ids()
self.registration_info.write().await.insert( .try_for_each(async |appservice| {
appservice.0, self.registration_info
appservice .write()
.1 .await
.try_into() .insert(appservice.0, appservice.1.try_into()?);
.expect("Should be validated on registration"),
);
}
Ok(()) Ok(())
})
.await
} }
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
@ -84,7 +85,7 @@ impl Service {
/// # Arguments /// # Arguments
/// ///
/// * `service_name` - the registration ID of the appservice /// * `service_name` - the registration ID of the appservice
pub async fn unregister_appservice(&self, appservice_id: &str) -> Result<()> { pub async fn unregister_appservice(&self, appservice_id: &str) -> Result {
// removes the appservice registration info // removes the appservice registration info
self.registration_info self.registration_info
.write() .write()
@ -112,15 +113,6 @@ impl Service {
.map(|info| info.registration) .map(|info| info.registration)
} }
pub async fn iter_ids(&self) -> Vec<String> {
self.registration_info
.read()
.await
.keys()
.cloned()
.collect()
}
pub async fn find_from_token(&self, token: &str) -> Option<RegistrationInfo> { pub async fn find_from_token(&self, token: &str) -> Option<RegistrationInfo> {
self.read() self.read()
.await .await
@ -156,15 +148,22 @@ impl Service {
.any(|info| info.rooms.is_exclusive_match(room_id.as_str())) .any(|info| info.rooms.is_exclusive_match(room_id.as_str()))
} }
pub fn read( pub fn iter_ids(&self) -> impl Stream<Item = String> + Send {
&self, self.read()
) -> impl Future<Output = tokio::sync::RwLockReadGuard<'_, BTreeMap<String, RegistrationInfo>>> .map(|info| info.keys().cloned().collect::<Vec<_>>())
{ .map(IntoIterator::into_iter)
self.registration_info.read() .map(IterStream::stream)
.flatten_stream()
} }
#[inline] pub fn iter_db_ids(&self) -> impl Stream<Item = Result<(String, Registration)>> + Send {
pub async fn all(&self) -> Result<Vec<(String, Registration)>> { self.iter_db_ids().await } self.db
.id_appserviceregistrations
.keys()
.and_then(move |id: &str| async move {
Ok((id.to_owned(), self.get_db_registration(id).await?))
})
}
pub async fn get_db_registration(&self, id: &str) -> Result<Registration> { pub async fn get_db_registration(&self, id: &str) -> Result<Registration> {
self.db self.db
@ -175,16 +174,7 @@ impl Service {
.map_err(|e| err!(Database("Invalid appservice {id:?} registration: {e:?}"))) .map_err(|e| err!(Database("Invalid appservice {id:?} registration: {e:?}")))
} }
async fn iter_db_ids(&self) -> Result<Vec<(String, Registration)>> { pub fn read(&self) -> impl Future<Output = RwLockReadGuard<'_, Registrations>> + Send {
self.db self.registration_info.read()
.id_appserviceregistrations
.keys()
.ignore_err()
.then(|id: String| async move {
let reg = self.get_db_registration(&id).await?;
Ok((id, reg))
})
.try_collect()
.await
} }
} }