move db files command w/ filter args; misc related cleanup

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-29 21:10:33 +00:00
parent 3c8376d897
commit 31c2968bb2
6 changed files with 75 additions and 55 deletions

View file

@ -7,7 +7,10 @@ use std::{
use conduwuit::{
debug_error, err, info, trace, utils,
utils::{stream::ReadyExt, string::EMPTY},
utils::{
stream::{IterStream, ReadyExt},
string::EMPTY,
},
warn, Error, PduEvent, PduId, RawPduId, Result,
};
use futures::{FutureExt, StreamExt, TryStreamExt};
@ -640,6 +643,7 @@ pub(super) async fn force_set_room_state_from_server(
room_id: room_id.clone().into(),
event_id: first_pdu.event_id.clone(),
})
.boxed()
.await?;
for pdu in remote_state_response.pdus.clone() {
@ -648,6 +652,7 @@ pub(super) async fn force_set_room_state_from_server(
.rooms
.event_handler
.parse_incoming_pdu(&pdu)
.boxed()
.await
{
| Ok(t) => t,
@ -711,6 +716,7 @@ pub(super) async fn force_set_room_state_from_server(
.rooms
.event_handler
.resolve_state(&room_id, &room_version, state)
.boxed()
.await?;
info!("Forcing new room state");
@ -946,21 +952,57 @@ pub(super) async fn database_stats(
property: Option<String>,
map: Option<String>,
) -> Result<RoomMessageEventContent> {
let property = property.unwrap_or_else(|| "rocksdb.stats".to_owned());
let map_name = map.as_ref().map_or(EMPTY, String::as_str);
let property = property.unwrap_or_else(|| "rocksdb.stats".to_owned());
self.services
.db
.iter()
.filter(|(&name, _)| map_name.is_empty() || map_name == name)
.try_stream()
.try_for_each(|(&name, map)| {
let res = map.property(&property).expect("invalid property");
writeln!(self, "##### {name}:\n```\n{}\n```", res.trim())
})
.await?;
let mut out = String::new();
for (&name, map) in self.services.db.iter() {
if !map_name.is_empty() && map_name != name {
continue;
Ok(RoomMessageEventContent::notice_plain(""))
}
let res = map.property(&property)?;
let res = res.trim();
writeln!(out, "##### {name}:\n```\n{res}\n```")?;
}
#[admin_command]
pub(super) async fn database_files(
&self,
map: Option<String>,
level: Option<i32>,
) -> Result<RoomMessageEventContent> {
let mut files: Vec<_> = self.services.db.db.file_list().collect::<Result<_>>()?;
Ok(RoomMessageEventContent::notice_markdown(out))
files.sort_by_key(|f| f.name.clone());
writeln!(self, "| lev | sst | keys | dels | size | column |").await?;
writeln!(self, "| ---: | :--- | ---: | ---: | ---: | :--- |").await?;
files
.into_iter()
.filter(|file| {
map.as_deref()
.is_none_or(|map| map == file.column_family_name)
})
.filter(|file| level.as_ref().is_none_or(|&level| level == file.level))
.try_stream()
.try_for_each(|file| {
writeln!(
self,
"| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |",
file.level,
file.name,
file.num_entries,
file.num_deletions,
file.size,
file.column_family_name,
)
})
.await?;
Ok(RoomMessageEventContent::notice_plain(""))
}
#[admin_command]

View file

@ -226,6 +226,14 @@ pub(super) enum DebugCommand {
/// - Trim memory usage
TrimMemory,
/// - List database files
DatabaseFiles {
map: Option<String>,
#[arg(long)]
level: Option<i32>,
},
/// - Developer test stubs
#[command(subcommand)]
#[allow(non_snake_case)]

View file

@ -92,7 +92,7 @@ pub(super) async fn clear_caches(&self) -> Result<RoomMessageEventContent> {
#[admin_command]
pub(super) async fn list_backups(&self) -> Result<RoomMessageEventContent> {
let result = self.services.globals.db.backup_list()?;
let result = self.services.db.db.backup_list()?;
if result.is_empty() {
Ok(RoomMessageEventContent::text_plain("No backups found."))
@ -103,31 +103,24 @@ pub(super) async fn list_backups(&self) -> Result<RoomMessageEventContent> {
#[admin_command]
pub(super) async fn backup_database(&self) -> Result<RoomMessageEventContent> {
let globals = Arc::clone(&self.services.globals);
let db = Arc::clone(&self.services.db);
let mut result = self
.services
.server
.runtime()
.spawn_blocking(move || match globals.db.backup() {
.spawn_blocking(move || match db.db.backup() {
| Ok(()) => String::new(),
| Err(e) => e.to_string(),
})
.await?;
if result.is_empty() {
result = self.services.globals.db.backup_list()?;
result = self.services.db.db.backup_list()?;
}
Ok(RoomMessageEventContent::notice_markdown(result))
}
#[admin_command]
pub(super) async fn list_database_files(&self) -> Result<RoomMessageEventContent> {
let result = self.services.globals.db.file_list()?;
Ok(RoomMessageEventContent::notice_markdown(result))
}
#[admin_command]
pub(super) async fn admin_notice(&self, message: Vec<String>) -> Result<RoomMessageEventContent> {
let message = message.join(" ");

View file

@ -46,9 +46,6 @@ pub(super) enum ServerCommand {
/// - List database backups
ListBackups,
/// - List database files
ListDatabaseFiles,
/// - Send a message to the admin room.
AdminNotice {
message: Vec<String>,

View file

@ -1,32 +1,15 @@
use std::fmt::Write;
use conduwuit::{implement, Result};
use rocksdb::LiveFile as SstFile;
use super::Engine;
use crate::util::map_err;
#[implement(Engine)]
pub fn file_list(&self) -> Result<String> {
match self.db.live_files() {
| Err(e) => Ok(String::from(e)),
| Ok(mut files) => {
files.sort_by_key(|f| f.name.clone());
let mut res = String::new();
writeln!(res, "| lev | sst | keys | dels | size | column |")?;
writeln!(res, "| ---: | :--- | ---: | ---: | ---: | :--- |")?;
for file in files {
writeln!(
res,
"| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |",
file.level,
file.name,
file.num_entries,
file.num_deletions,
file.size,
file.column_family_name,
)?;
}
Ok(res)
},
}
pub fn file_list(&self) -> impl Iterator<Item = Result<SstFile>> + Send {
self.db
.live_files()
.map_err(map_err)
.into_iter()
.flat_map(Vec::into_iter)
.map(Ok)
}

View file

@ -79,7 +79,4 @@ impl Data {
#[inline]
pub fn backup_list(&self) -> Result<String> { self.db.db.backup_list() }
#[inline]
pub fn file_list(&self) -> Result<String> { self.db.db.file_list() }
}