From 460cf27a031c9470030d4487b8b95ae573351e73 Mon Sep 17 00:00:00 2001
From: Jason Volk <jason@zemos.net>
Date: Mon, 7 Apr 2025 03:28:51 +0000
Subject: [PATCH] improve appservice service async interfaces

Signed-off-by: Jason Volk <jason@zemos.net>
---
 src/admin/query/appservice.rs |  3 +-
 src/service/appservice/mod.rs | 78 +++++++++++++++--------------------
 2 files changed, 36 insertions(+), 45 deletions(-)

diff --git a/src/admin/query/appservice.rs b/src/admin/query/appservice.rs
index 93c76a7e..0359261a 100644
--- a/src/admin/query/appservice.rs
+++ b/src/admin/query/appservice.rs
@@ -1,5 +1,6 @@
 use clap::Subcommand;
 use conduwuit::Result;
+use futures::TryStreamExt;
 
 use crate::Command;
 
@@ -31,7 +32,7 @@ pub(super) async fn process(subcommand: AppserviceCommand, context: &Command<'_>
 		},
 		| AppserviceCommand::All => {
 			let timer = tokio::time::Instant::now();
-			let results = services.appservice.all().await;
+			let results: Vec<_> = services.appservice.iter_db_ids().try_collect().await?;
 			let query_time = timer.elapsed();
 
 			write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```")
diff --git a/src/service/appservice/mod.rs b/src/service/appservice/mod.rs
index 50a60033..7be8a471 100644
--- a/src/service/appservice/mod.rs
+++ b/src/service/appservice/mod.rs
@@ -1,20 +1,20 @@
 mod namespace_regex;
 mod registration_info;
 
-use std::{collections::BTreeMap, sync::Arc};
+use std::{collections::BTreeMap, iter::IntoIterator, sync::Arc};
 
 use async_trait::async_trait;
-use conduwuit::{Result, err, utils::stream::TryIgnore};
+use conduwuit::{Result, err, utils::stream::IterStream};
 use database::Map;
-use futures::{Future, StreamExt, TryStreamExt};
+use futures::{Future, FutureExt, Stream, TryStreamExt};
 use ruma::{RoomAliasId, RoomId, UserId, api::appservice::Registration};
-use tokio::sync::RwLock;
+use tokio::sync::{RwLock, RwLockReadGuard};
 
 pub use self::{namespace_regex::NamespaceRegex, registration_info::RegistrationInfo};
 use crate::{Dep, sending};
 
 pub struct Service {
-	registration_info: RwLock<BTreeMap<String, RegistrationInfo>>,
+	registration_info: RwLock<Registrations>,
 	services: Services,
 	db: Data,
 }
@@ -27,6 +27,8 @@ struct Data {
 	id_appserviceregistrations: Arc<Map>,
 }
 
+type Registrations = BTreeMap<String, RegistrationInfo>;
+
 #[async_trait]
 impl crate::Service for Service {
 	fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
@@ -41,19 +43,18 @@ impl crate::Service for Service {
 		}))
 	}
 
-	async fn worker(self: Arc<Self>) -> Result<()> {
+	async fn worker(self: Arc<Self>) -> Result {
 		// Inserting registrations into cache
-		for appservice in self.iter_db_ids().await? {
-			self.registration_info.write().await.insert(
-				appservice.0,
-				appservice
-					.1
-					.try_into()
-					.expect("Should be validated on registration"),
-			);
-		}
+		self.iter_db_ids()
+			.try_for_each(async |appservice| {
+				self.registration_info
+					.write()
+					.await
+					.insert(appservice.0, appservice.1.try_into()?);
 
-		Ok(())
+				Ok(())
+			})
+			.await
 	}
 
 	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
@@ -84,7 +85,7 @@ impl Service {
 	/// # Arguments
 	///
 	/// * `service_name` - the registration ID of the appservice
-	pub async fn unregister_appservice(&self, appservice_id: &str) -> Result<()> {
+	pub async fn unregister_appservice(&self, appservice_id: &str) -> Result {
 		// removes the appservice registration info
 		self.registration_info
 			.write()
@@ -112,15 +113,6 @@ impl Service {
 			.map(|info| info.registration)
 	}
 
-	pub async fn iter_ids(&self) -> Vec<String> {
-		self.registration_info
-			.read()
-			.await
-			.keys()
-			.cloned()
-			.collect()
-	}
-
 	pub async fn find_from_token(&self, token: &str) -> Option<RegistrationInfo> {
 		self.read()
 			.await
@@ -156,15 +148,22 @@ impl Service {
 			.any(|info| info.rooms.is_exclusive_match(room_id.as_str()))
 	}
 
-	pub fn read(
-		&self,
-	) -> impl Future<Output = tokio::sync::RwLockReadGuard<'_, BTreeMap<String, RegistrationInfo>>>
-	{
-		self.registration_info.read()
+	pub fn iter_ids(&self) -> impl Stream<Item = String> + Send {
+		self.read()
+			.map(|info| info.keys().cloned().collect::<Vec<_>>())
+			.map(IntoIterator::into_iter)
+			.map(IterStream::stream)
+			.flatten_stream()
 	}
 
-	#[inline]
-	pub async fn all(&self) -> Result<Vec<(String, Registration)>> { self.iter_db_ids().await }
+	pub fn iter_db_ids(&self) -> impl Stream<Item = Result<(String, Registration)>> + Send {
+		self.db
+			.id_appserviceregistrations
+			.keys()
+			.and_then(move |id: &str| async move {
+				Ok((id.to_owned(), self.get_db_registration(id).await?))
+			})
+	}
 
 	pub async fn get_db_registration(&self, id: &str) -> Result<Registration> {
 		self.db
@@ -175,16 +174,7 @@ impl Service {
 			.map_err(|e| err!(Database("Invalid appservice {id:?} registration: {e:?}")))
 	}
 
-	async fn iter_db_ids(&self) -> Result<Vec<(String, Registration)>> {
-		self.db
-			.id_appserviceregistrations
-			.keys()
-			.ignore_err()
-			.then(|id: String| async move {
-				let reg = self.get_db_registration(&id).await?;
-				Ok((id, reg))
-			})
-			.try_collect()
-			.await
+	pub fn read(&self) -> impl Future<Output = RwLockReadGuard<'_, Registrations>> + Send {
+		self.registration_info.read()
 	}
 }