From 31c2968bb29e7447e56531333fb330da4ac08ede Mon Sep 17 00:00:00 2001
From: Jason Volk <jason@zemos.net>
Date: Wed, 29 Jan 2025 21:10:33 +0000
Subject: [PATCH] move db files command w/ filter args; misc related cleanup

Signed-off-by: Jason Volk <jason@zemos.net>
---
 src/admin/debug/commands.rs  | 66 +++++++++++++++++++++++++++++-------
 src/admin/debug/mod.rs       |  8 +++++
 src/admin/server/commands.rs | 15 +++-----
 src/admin/server/mod.rs      |  3 --
 src/database/engine/files.rs | 35 +++++--------------
 src/service/globals/data.rs  |  3 --
 6 files changed, 75 insertions(+), 55 deletions(-)

diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs
index 4e0ce2e3..dcf9879c 100644
--- a/src/admin/debug/commands.rs
+++ b/src/admin/debug/commands.rs
@@ -7,7 +7,10 @@ use std::{
 
 use conduwuit::{
 	debug_error, err, info, trace, utils,
-	utils::{stream::ReadyExt, string::EMPTY},
+	utils::{
+		stream::{IterStream, ReadyExt},
+		string::EMPTY,
+	},
 	warn, Error, PduEvent, PduId, RawPduId, Result,
 };
 use futures::{FutureExt, StreamExt, TryStreamExt};
@@ -640,6 +643,7 @@ pub(super) async fn force_set_room_state_from_server(
 			room_id: room_id.clone().into(),
 			event_id: first_pdu.event_id.clone(),
 		})
+		.boxed()
 		.await?;
 
 	for pdu in remote_state_response.pdus.clone() {
@@ -648,6 +652,7 @@ pub(super) async fn force_set_room_state_from_server(
 			.rooms
 			.event_handler
 			.parse_incoming_pdu(&pdu)
+			.boxed()
 			.await
 		{
 			| Ok(t) => t,
@@ -711,6 +716,7 @@ pub(super) async fn force_set_room_state_from_server(
 		.rooms
 		.event_handler
 		.resolve_state(&room_id, &room_version, state)
+		.boxed()
 		.await?;
 
 	info!("Forcing new room state");
@@ -946,21 +952,57 @@ pub(super) async fn database_stats(
 	property: Option<String>,
 	map: Option<String>,
 ) -> Result<RoomMessageEventContent> {
-	let property = property.unwrap_or_else(|| "rocksdb.stats".to_owned());
 	let map_name = map.as_ref().map_or(EMPTY, String::as_str);
+	let property = property.unwrap_or_else(|| "rocksdb.stats".to_owned());
+	self.services
+		.db
+		.iter()
+		.filter(|(&name, _)| map_name.is_empty() || map_name == name)
+		.try_stream()
+		.try_for_each(|(&name, map)| {
+			let res = map.property(&property).expect("invalid property");
+			writeln!(self, "##### {name}:\n```\n{}\n```", res.trim())
+		})
+		.await?;
 
-	let mut out = String::new();
-	for (&name, map) in self.services.db.iter() {
-		if !map_name.is_empty() && map_name != name {
-			continue;
-		}
+	Ok(RoomMessageEventContent::notice_plain(""))
+}
 
-		let res = map.property(&property)?;
-		let res = res.trim();
-		writeln!(out, "##### {name}:\n```\n{res}\n```")?;
-	}
+#[admin_command]
+pub(super) async fn database_files(
+	&self,
+	map: Option<String>,
+	level: Option<i32>,
+) -> Result<RoomMessageEventContent> {
+	let mut files: Vec<_> = self.services.db.db.file_list().collect::<Result<_>>()?;
 
-	Ok(RoomMessageEventContent::notice_markdown(out))
+	files.sort_by_key(|f| f.name.clone());
+
+	writeln!(self, "| lev  | sst  | keys | dels | size | column |").await?;
+	writeln!(self, "| ---: | :--- | ---: | ---: | ---: | :---   |").await?;
+	files
+		.into_iter()
+		.filter(|file| {
+			map.as_deref()
+				.is_none_or(|map| map == file.column_family_name)
+		})
+		.filter(|file| level.as_ref().is_none_or(|&level| level == file.level))
+		.try_stream()
+		.try_for_each(|file| {
+			writeln!(
+				self,
+				"| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |",
+				file.level,
+				file.name,
+				file.num_entries,
+				file.num_deletions,
+				file.size,
+				file.column_family_name,
+			)
+		})
+		.await?;
+
+	Ok(RoomMessageEventContent::notice_plain(""))
 }
 
 #[admin_command]
diff --git a/src/admin/debug/mod.rs b/src/admin/debug/mod.rs
index 07f7296b..db04ccf4 100644
--- a/src/admin/debug/mod.rs
+++ b/src/admin/debug/mod.rs
@@ -226,6 +226,14 @@ pub(super) enum DebugCommand {
 	/// - Trim memory usage
 	TrimMemory,
 
+	/// - List database files
+	DatabaseFiles {
+		map: Option<String>,
+
+		#[arg(long)]
+		level: Option<i32>,
+	},
+
 	/// - Developer test stubs
 	#[command(subcommand)]
 	#[allow(non_snake_case)]
diff --git a/src/admin/server/commands.rs b/src/admin/server/commands.rs
index 910dce6e..d4cfa7d5 100644
--- a/src/admin/server/commands.rs
+++ b/src/admin/server/commands.rs
@@ -92,7 +92,7 @@ pub(super) async fn clear_caches(&self) -> Result<RoomMessageEventContent> {
 
 #[admin_command]
 pub(super) async fn list_backups(&self) -> Result<RoomMessageEventContent> {
-	let result = self.services.globals.db.backup_list()?;
+	let result = self.services.db.db.backup_list()?;
 
 	if result.is_empty() {
 		Ok(RoomMessageEventContent::text_plain("No backups found."))
@@ -103,31 +103,24 @@ pub(super) async fn list_backups(&self) -> Result<RoomMessageEventContent> {
 
 #[admin_command]
 pub(super) async fn backup_database(&self) -> Result<RoomMessageEventContent> {
-	let globals = Arc::clone(&self.services.globals);
+	let db = Arc::clone(&self.services.db);
 	let mut result = self
 		.services
 		.server
 		.runtime()
-		.spawn_blocking(move || match globals.db.backup() {
+		.spawn_blocking(move || match db.db.backup() {
 			| Ok(()) => String::new(),
 			| Err(e) => e.to_string(),
 		})
 		.await?;
 
 	if result.is_empty() {
-		result = self.services.globals.db.backup_list()?;
+		result = self.services.db.db.backup_list()?;
 	}
 
 	Ok(RoomMessageEventContent::notice_markdown(result))
 }
 
-#[admin_command]
-pub(super) async fn list_database_files(&self) -> Result<RoomMessageEventContent> {
-	let result = self.services.globals.db.file_list()?;
-
-	Ok(RoomMessageEventContent::notice_markdown(result))
-}
-
 #[admin_command]
 pub(super) async fn admin_notice(&self, message: Vec<String>) -> Result<RoomMessageEventContent> {
 	let message = message.join(" ");
diff --git a/src/admin/server/mod.rs b/src/admin/server/mod.rs
index 3f3d6c5e..60615365 100644
--- a/src/admin/server/mod.rs
+++ b/src/admin/server/mod.rs
@@ -46,9 +46,6 @@ pub(super) enum ServerCommand {
 	/// - List database backups
 	ListBackups,
 
-	/// - List database files
-	ListDatabaseFiles,
-
 	/// - Send a message to the admin room.
 	AdminNotice {
 		message: Vec<String>,
diff --git a/src/database/engine/files.rs b/src/database/engine/files.rs
index f603c57b..33d6fdc4 100644
--- a/src/database/engine/files.rs
+++ b/src/database/engine/files.rs
@@ -1,32 +1,15 @@
-use std::fmt::Write;
-
 use conduwuit::{implement, Result};
+use rocksdb::LiveFile as SstFile;
 
 use super::Engine;
+use crate::util::map_err;
 
 #[implement(Engine)]
-pub fn file_list(&self) -> Result<String> {
-	match self.db.live_files() {
-		| Err(e) => Ok(String::from(e)),
-		| Ok(mut files) => {
-			files.sort_by_key(|f| f.name.clone());
-			let mut res = String::new();
-			writeln!(res, "| lev  | sst  | keys | dels | size | column |")?;
-			writeln!(res, "| ---: | :--- | ---: | ---: | ---: | :---   |")?;
-			for file in files {
-				writeln!(
-					res,
-					"| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |",
-					file.level,
-					file.name,
-					file.num_entries,
-					file.num_deletions,
-					file.size,
-					file.column_family_name,
-				)?;
-			}
-
-			Ok(res)
-		},
-	}
+pub fn file_list(&self) -> impl Iterator<Item = Result<SstFile>> + Send {
+	self.db
+		.live_files()
+		.map_err(map_err)
+		.into_iter()
+		.flat_map(Vec::into_iter)
+		.map(Ok)
 }
diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs
index 07b4ac2c..39cb9be1 100644
--- a/src/service/globals/data.rs
+++ b/src/service/globals/data.rs
@@ -79,7 +79,4 @@ impl Data {
 
 	#[inline]
 	pub fn backup_list(&self) -> Result<String> { self.db.db.backup_list() }
-
-	#[inline]
-	pub fn file_list(&self) -> Result<String> { self.db.db.file_list() }
 }