From 37a2ba59d0af70eaacf61afe004a3fb765110b93 Mon Sep 17 00:00:00 2001 From: strawberry Date: Tue, 1 Oct 2024 01:27:45 -0400 Subject: [PATCH] improve UX of admin media deletion commands, ignore errors by default, support deleting local media too Signed-off-by: strawberry --- src/admin/media/commands.rs | 133 ++++++++++++++++++++++-------------- src/admin/media/mod.rs | 47 +++++++------ src/service/media/mod.rs | 128 +++++++++++++++++----------------- 3 files changed, 171 insertions(+), 137 deletions(-) diff --git a/src/admin/media/commands.rs b/src/admin/media/commands.rs index 535ad31a..3c4bf2ef 100644 --- a/src/admin/media/commands.rs +++ b/src/admin/media/commands.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use conduit::{debug, info, trace, utils::time::parse_timepoint_ago, warn, Result}; +use conduit::{debug, debug_info, debug_warn, error, info, trace, utils::time::parse_timepoint_ago, Result}; use conduit_service::media::Dim; use ruma::{ events::room::message::RoomMessageEventContent, EventId, Mxc, MxcUri, OwnedMxcUri, OwnedServerName, ServerName, @@ -19,7 +19,7 @@ pub(super) async fn delete( } if let Some(mxc) = mxc { - debug!("Got MXC URL: {mxc}"); + trace!("Got MXC URL: {mxc}"); self.services .media .delete(&mxc.as_str().try_into()?) @@ -28,11 +28,12 @@ pub(super) async fn delete( return Ok(RoomMessageEventContent::text_plain( "Deleted the MXC from our database and on our filesystem.", )); - } else if let Some(event_id) = event_id { - debug!("Got event ID to delete media from: {event_id}"); + } - let mut mxc_urls = vec![]; - let mut mxc_deletion_count: usize = 0; + if let Some(event_id) = event_id { + trace!("Got event ID to delete media from: {event_id}"); + + let mut mxc_urls = Vec::with_capacity(4); // parsing the PDU for any MXC URLs begins here if let Some(event_json) = self.services.rooms.timeline.get_pdu_json(&event_id)? { @@ -124,18 +125,28 @@ pub(super) async fn delete( } if mxc_urls.is_empty() { - // we shouldn't get here (should have errored earlier) but just in case for - // whatever reason we do... info!("Parsed event ID {event_id} but did not contain any MXC URLs."); return Ok(RoomMessageEventContent::text_plain("Parsed event ID but found no MXC URLs.")); } + let mut mxc_deletion_count: usize = 0; + for mxc_url in mxc_urls { - self.services + match self + .services .media .delete(&mxc_url.as_str().try_into()?) - .await?; - mxc_deletion_count = mxc_deletion_count.saturating_add(1); + .await + { + Ok(()) => { + debug_info!("Successfully deleted {mxc_url} from filesystem and database"); + mxc_deletion_count = mxc_deletion_count.saturating_add(1); + }, + Err(e) => { + debug_warn!("Failed to delete {mxc_url}, ignoring error and skipping: {e}"); + continue; + }, + } } return Ok(RoomMessageEventContent::text_plain(format!( @@ -158,34 +169,62 @@ pub(super) async fn delete_list(&self) -> Result { )); } + let mut failed_parsed_mxcs: usize = 0; + let mxc_list = self .body .to_vec() .drain(1..self.body.len().checked_sub(1).unwrap()) - .collect::>(); + .filter_map(|mxc_s| { + mxc_s + .try_into() + .inspect_err(|e| { + debug_warn!("Failed to parse user-provided MXC URI: {e}"); + + failed_parsed_mxcs = failed_parsed_mxcs.saturating_add(1); + }) + .ok() + }) + .collect::>>(); let mut mxc_deletion_count: usize = 0; - for mxc in mxc_list { - debug!("Deleting MXC {mxc} in bulk"); - self.services.media.delete(&mxc.try_into()?).await?; - mxc_deletion_count = mxc_deletion_count - .checked_add(1) - .expect("mxc_deletion_count should not get this high"); + for mxc in &mxc_list { + trace!(%failed_parsed_mxcs, %mxc_deletion_count, "Deleting MXC {mxc} in bulk"); + match self.services.media.delete(mxc).await { + Ok(()) => { + debug_info!("Successfully deleted {mxc} from filesystem and database"); + mxc_deletion_count = mxc_deletion_count.saturating_add(1); + }, + Err(e) => { + debug_warn!("Failed to delete {mxc}, ignoring error and skipping: {e}"); + continue; + }, + } } Ok(RoomMessageEventContent::text_plain(format!( - "Finished bulk MXC deletion, deleted {mxc_deletion_count} total MXCs from our database and the filesystem.", + "Finished bulk MXC deletion, deleted {mxc_deletion_count} total MXCs from our database and the filesystem. \ + {failed_parsed_mxcs} MXCs failed to be parsed from the database.", ))) } #[admin_command] -pub(super) async fn delete_past_remote_media(&self, duration: String, force: bool) -> Result { +pub(super) async fn delete_past_remote_media( + &self, duration: String, before: bool, after: bool, yes_i_want_to_delete_local_media: bool, +) -> Result { + if before && after { + return Ok(RoomMessageEventContent::text_plain( + "Please only pick one argument, --before or --after.", + )); + } + assert!(!(before && after), "--before and --after should not be specified together"); + let duration = parse_timepoint_ago(&duration)?; let deleted_count = self .services .media - .delete_all_remote_media_at_after_time(duration, force) + .delete_all_remote_media_at_after_time(duration, before, after, yes_i_want_to_delete_local_media) .await?; Ok(RoomMessageEventContent::text_plain(format!( @@ -194,14 +233,10 @@ pub(super) async fn delete_past_remote_media(&self, duration: String, force: boo } #[admin_command] -pub(super) async fn delete_all_from_user(&self, username: String, force: bool) -> Result { +pub(super) async fn delete_all_from_user(&self, username: String) -> Result { let user_id = parse_local_user_id(self.services, &username)?; - let deleted_count = self - .services - .media - .delete_from_user(&user_id, force) - .await?; + let deleted_count = self.services.media.delete_from_user(&user_id).await?; Ok(RoomMessageEventContent::text_plain(format!( "Deleted {deleted_count} total files.", @@ -210,34 +245,36 @@ pub(super) async fn delete_all_from_user(&self, username: String, force: bool) - #[admin_command] pub(super) async fn delete_all_from_server( - &self, server_name: Box, force: bool, + &self, server_name: Box, yes_i_want_to_delete_local_media: bool, ) -> Result { - if server_name == self.services.globals.server_name() { - return Ok(RoomMessageEventContent::text_plain("This command only works for remote media.")); + if server_name == self.services.globals.server_name() && !yes_i_want_to_delete_local_media { + return Ok(RoomMessageEventContent::text_plain( + "This command only works for remote media by default.", + )); } - let Ok(all_mxcs) = self.services.media.get_all_mxcs().await else { + let Ok(all_mxcs) = self + .services + .media + .get_all_mxcs() + .await + .inspect_err(|e| error!("Failed to get MXC URIs from our database: {e}")) + else { return Ok(RoomMessageEventContent::text_plain("Failed to get MXC URIs from our database")); }; let mut deleted_count: usize = 0; for mxc in all_mxcs { - let mxc_server_name = match mxc.server_name() { - Ok(server_name) => server_name, - Err(e) => { - if force { - warn!("Failed to parse MXC {mxc} server name from database, ignoring error and skipping: {e}"); - continue; - } - - return Ok(RoomMessageEventContent::text_plain(format!( - "Failed to parse MXC {mxc} server name from database: {e}", - ))); - }, + let Ok(mxc_server_name) = mxc.server_name().inspect_err(|e| { + debug_warn!("Failed to parse MXC {mxc} server name from database, ignoring error and skipping: {e}"); + }) else { + continue; }; - if mxc_server_name != server_name || self.services.globals.server_is_ours(mxc_server_name) { + if mxc_server_name != server_name + || (self.services.globals.server_is_ours(mxc_server_name) && !yes_i_want_to_delete_local_media) + { trace!("skipping MXC URI {mxc}"); continue; } @@ -249,12 +286,8 @@ pub(super) async fn delete_all_from_server( deleted_count = deleted_count.saturating_add(1); }, Err(e) => { - if force { - warn!("Failed to delete {mxc}, ignoring error and skipping: {e}"); - continue; - } - - return Ok(RoomMessageEventContent::text_plain(format!("Failed to delete MXC {mxc}: {e}"))); + debug_warn!("Failed to delete {mxc}, ignoring error and skipping: {e}"); + continue; }, } } diff --git a/src/admin/media/mod.rs b/src/admin/media/mod.rs index 5977c0fa..fbf6532b 100644 --- a/src/admin/media/mod.rs +++ b/src/admin/media/mod.rs @@ -10,7 +10,7 @@ use crate::admin_command_dispatch; #[derive(Debug, Subcommand)] pub(super) enum MediaCommand { /// - Deletes a single media file from our database and on the filesystem - /// via a single MXC URL + /// via a single MXC URL or event ID (not redacted) Delete { /// The MXC URL to delete #[arg(long)] @@ -23,37 +23,44 @@ pub(super) enum MediaCommand { }, /// - Deletes a codeblock list of MXC URLs from our database and on the - /// filesystem + /// filesystem. This will always ignore errors. DeleteList, - /// - Deletes all remote media in the last X amount of time using filesystem - /// metadata first created at date. + /// - Deletes all remote media in the last/after "X" time using filesystem + /// metadata first created at date, or fallback to last modified date. + /// This will always ignore errors by default. + /// + /// Synapse DeletePastRemoteMedia { - /// - The duration (at or after), e.g. "5m" to delete all media in the - /// past 5 minutes + /// - The duration (at or after/before), e.g. "5m" to delete all media + /// in the past or up to 5 minutes duration: String, - /// Continues deleting remote media if an undeletable object is found - #[arg(short, long)] - force: bool, + #[arg(long, short)] + before: bool, + + #[arg(long, short)] + after: bool, + + /// Long argument to delete local media + #[arg(long)] + yes_i_want_to_delete_local_media: bool, }, - /// - Deletes all the local media from a local user on our server + /// - Deletes all the local media from a local user on our server. This will + /// always ignore errors by default. DeleteAllFromUser { username: String, - - /// Continues deleting media if an undeletable object is found - #[arg(short, long)] - force: bool, }, - /// - Deletes all remote media from the specified remote server + /// - Deletes all remote media from the specified remote server. This will + /// always ignore errors by default. DeleteAllFromServer { server_name: Box, - /// Continues deleting media if an undeletable object is found - #[arg(short, long)] - force: bool, + /// Long argument to delete local media + #[arg(long)] + yes_i_want_to_delete_local_media: bool, }, GetFileInfo { @@ -82,10 +89,10 @@ pub(super) enum MediaCommand { #[arg(short, long, default_value("10000"))] timeout: u32, - #[arg(short, long)] + #[arg(short, long, default_value("800"))] width: u32, - #[arg(short, long)] + #[arg(short, long, default_value("800"))] height: u32, }, } diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 2faa13d8..d3765a17 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -10,7 +10,7 @@ use std::{path::PathBuf, sync::Arc, time::SystemTime}; use async_trait::async_trait; use base64::{engine::general_purpose, Engine as _}; use conduit::{ - debug, debug_error, debug_info, err, error, trace, + debug, debug_error, debug_info, debug_warn, err, error, trace, utils::{self, MutexMap}, warn, Err, Result, Server, }; @@ -99,45 +99,46 @@ impl Service { pub async fn delete(&self, mxc: &Mxc<'_>) -> Result<()> { if let Ok(keys) = self.db.search_mxc_metadata_prefix(mxc) { for key in keys { - trace!(?mxc, ?key, "Deleting from filesystem"); + trace!(?mxc, "MXC Key: {key:?}"); + debug_info!(?mxc, "Deleting from filesystem"); + if let Err(e) = self.remove_media_file(&key).await { - error!(?mxc, ?key, "Failed to remove media file: {e}"); + debug_error!(?mxc, "Failed to remove media file: {e}"); } - trace!(?mxc, ?key, "Deleting from database"); - if let Err(e) = self.db.delete_file_mxc(mxc) { - error!(?mxc, ?key, "Failed to remove media from database: {e}"); - } + debug_info!(?mxc, "Deleting from database"); + _ = self.db.delete_file_mxc(mxc); } Ok(()) } else { - Err!(Database(error!( - "Failed to find any media keys for MXC {mxc:?} in our database." - ))) + Err!(Database(error!("Failed to find any media keys for MXC {mxc} in our database."))) } } /// Deletes all media by the specified user /// /// currently, this is only practical for local users - pub async fn delete_from_user(&self, user: &UserId, force: bool) -> Result { + pub async fn delete_from_user(&self, user: &UserId) -> Result { let mxcs = self.db.get_all_user_mxcs(user); let mut deletion_count: usize = 0; for mxc in mxcs { - let mxc: Mxc<'_> = mxc.as_str().try_into()?; - debug_info!("Deleting MXC {mxc} by user {user} from database and filesystem"); - if force { - _ = self - .delete(&mxc) - .await - .inspect_err(|e| warn!("Failed to delete {mxc} from user {user}, ignoring error: {e}")); - } else { - self.delete(&mxc).await?; - } + let Ok(mxc) = mxc.as_str().try_into().inspect_err(|e| { + debug_error!(?mxc, "Failed to parse MXC URI from database: {e}"); + }) else { + continue; + }; - deletion_count = deletion_count.saturating_add(1); + debug_info!(%deletion_count, "Deleting MXC {mxc} by user {user} from database and filesystem"); + match self.delete(&mxc).await { + Ok(()) => { + deletion_count = deletion_count.saturating_add(1); + }, + Err(e) => { + debug_error!(%deletion_count, "Failed to delete {mxc} from user {user}, ignoring error: {e}"); + }, + } } Ok(deletion_count) @@ -176,9 +177,6 @@ impl Service { for key in all_keys { trace!("Full MXC key from database: {key:?}"); - // we need to get the MXC URL from the first part of the key (the first 0xff / - // 255 push). this is all necessary because of conduit using magic keys for - // media let mut parts = key.split(|&b| b == 0xFF); let mxc = parts .next() @@ -189,31 +187,33 @@ impl Service { .transpose()?; let Some(mxc_s) = mxc else { - return Err!(Database("Parsed MXC URL unicode bytes from database but still is None")); + debug_warn!(?mxc, "Parsed MXC URL unicode bytes from database but is still invalid"); + continue; }; trace!("Parsed MXC key to URL: {mxc_s}"); let mxc = OwnedMxcUri::from(mxc_s); - mxcs.push(mxc); + if mxc.is_valid() { + mxcs.push(mxc); + } else { + debug_warn!("{mxc:?} from database was found to not be valid"); + } } Ok(mxcs) } /// Deletes all remote only media files in the given at or after - /// time/duration. Returns a u32 with the amount of media files deleted. - pub async fn delete_all_remote_media_at_after_time(&self, time: SystemTime, force: bool) -> Result { + /// time/duration. Returns a usize with the amount of media files deleted. + pub async fn delete_all_remote_media_at_after_time( + &self, time: SystemTime, before: bool, after: bool, yes_i_want_to_delete_local_media: bool, + ) -> Result { let all_keys = self.db.get_all_media_keys(); - let mut remote_mxcs = Vec::with_capacity(all_keys.len()); for key in all_keys { trace!("Full MXC key from database: {key:?}"); - - // we need to get the MXC URL from the first part of the key (the first 0xff / - // 255 push). this is all necessary because of conduit using magic keys for - // media let mut parts = key.split(|&b| b == 0xFF); let mxc = parts .next() @@ -224,35 +224,30 @@ impl Service { .transpose()?; let Some(mxc_s) = mxc else { - return Err!(Database("Parsed MXC URL unicode bytes from database but still is None")); + debug_warn!(?mxc, "Parsed MXC URL unicode bytes from database but is still invalid"); + continue; }; trace!("Parsed MXC key to URL: {mxc_s}"); let mxc = OwnedMxcUri::from(mxc_s); - if mxc.server_name() == Ok(self.services.globals.server_name()) { - debug!("Ignoring local media MXC: {mxc}"); - // ignore our own MXC URLs as this would be local media. + if (mxc.server_name() == Ok(self.services.globals.server_name()) && !yes_i_want_to_delete_local_media) + || !mxc.is_valid() + { + debug!("Ignoring local or broken media MXC: {mxc}"); continue; } let path = self.get_media_file(&key); - debug!("MXC path: {path:?}"); let file_metadata = match fs::metadata(path.clone()).await { Ok(file_metadata) => file_metadata, Err(e) => { - if force { - error!("Failed to obtain file metadata for MXC {mxc} at file path \"{path:?}\", skipping: {e}"); - continue; - } - - return Err!(Database( - "Failed to obtain file metadata for MXC {mxc} at file path \"{path:?}\": {e}" - )); + error!("Failed to obtain file metadata for MXC {mxc} at file path \"{path:?}\", skipping: {e}"); + continue; }, }; - debug!("File metadata: {file_metadata:?}"); + trace!(%mxc, ?path, "File metadata: {file_metadata:?}"); let file_created_at = match file_metadata.created() { Ok(value) => value, @@ -261,33 +256,36 @@ impl Service { file_metadata.modified()? }, Err(err) => { - if force { - error!("Could not delete MXC {mxc} at path {path:?}: {err:?}. Skipping..."); - continue; - } - - return Err(err.into()); + error!("Could not delete MXC {mxc} at path {path:?}: {err:?}. Skipping..."); + continue; }, }; debug!("File created at: {file_created_at:?}"); - if file_created_at <= time { - debug!("File is within user duration, pushing to list of file paths and keys to delete."); + + if file_created_at >= time && before { + debug!("File is within (before) user duration, pushing to list of file paths and keys to delete."); + remote_mxcs.push(mxc.to_string()); + } else if file_created_at <= time && after { + debug!("File is not within (after) user duration, pushing to list of file paths and keys to delete."); remote_mxcs.push(mxc.to_string()); } } - debug!( - "Finished going through all our media in database for eligible keys to delete, checking if these are empty" - ); if remote_mxcs.is_empty() { return Err!(Database("Did not found any eligible MXCs to delete.")); } - debug_info!("Deleting media now in the past {time:?}."); + debug_info!("Deleting media now in the past {time:?}"); + let mut deletion_count: usize = 0; + for mxc in remote_mxcs { - let mxc: Mxc<'_> = mxc.as_str().try_into()?; + let Ok(mxc) = mxc.as_str().try_into() else { + debug_warn!("Invalid MXC in database, skipping"); + continue; + }; + debug_info!("Deleting MXC {mxc} from database and filesystem"); match self.delete(&mxc).await { @@ -295,12 +293,8 @@ impl Service { deletion_count = deletion_count.saturating_add(1); }, Err(e) => { - if force { - warn!("Failed to delete {mxc}, ignoring error and skipping: {e}"); - continue; - } - - return Err!(Database(warn!("Failed to delete MXC {mxc}: {e}"))); + warn!("Failed to delete {mxc}, ignoring error and skipping: {e}"); + continue; }, } }