improve UX of admin media deletion commands, ignore errors by default, support deleting local media too

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2024-10-01 01:27:45 -04:00
parent 724711218a
commit 37a2ba59d0
3 changed files with 171 additions and 137 deletions

View file

@ -1,6 +1,6 @@
use std::time::Duration; use std::time::Duration;
use conduit::{debug, info, trace, utils::time::parse_timepoint_ago, warn, Result}; use conduit::{debug, debug_info, debug_warn, error, info, trace, utils::time::parse_timepoint_ago, Result};
use conduit_service::media::Dim; use conduit_service::media::Dim;
use ruma::{ use ruma::{
events::room::message::RoomMessageEventContent, EventId, Mxc, MxcUri, OwnedMxcUri, OwnedServerName, ServerName, events::room::message::RoomMessageEventContent, EventId, Mxc, MxcUri, OwnedMxcUri, OwnedServerName, ServerName,
@ -19,7 +19,7 @@ pub(super) async fn delete(
} }
if let Some(mxc) = mxc { if let Some(mxc) = mxc {
debug!("Got MXC URL: {mxc}"); trace!("Got MXC URL: {mxc}");
self.services self.services
.media .media
.delete(&mxc.as_str().try_into()?) .delete(&mxc.as_str().try_into()?)
@ -28,11 +28,12 @@ pub(super) async fn delete(
return Ok(RoomMessageEventContent::text_plain( return Ok(RoomMessageEventContent::text_plain(
"Deleted the MXC from our database and on our filesystem.", "Deleted the MXC from our database and on our filesystem.",
)); ));
} else if let Some(event_id) = event_id { }
debug!("Got event ID to delete media from: {event_id}");
let mut mxc_urls = vec![]; if let Some(event_id) = event_id {
let mut mxc_deletion_count: usize = 0; trace!("Got event ID to delete media from: {event_id}");
let mut mxc_urls = Vec::with_capacity(4);
// parsing the PDU for any MXC URLs begins here // parsing the PDU for any MXC URLs begins here
if let Some(event_json) = self.services.rooms.timeline.get_pdu_json(&event_id)? { if let Some(event_json) = self.services.rooms.timeline.get_pdu_json(&event_id)? {
@ -124,18 +125,28 @@ pub(super) async fn delete(
} }
if mxc_urls.is_empty() { if mxc_urls.is_empty() {
// we shouldn't get here (should have errored earlier) but just in case for
// whatever reason we do...
info!("Parsed event ID {event_id} but did not contain any MXC URLs."); info!("Parsed event ID {event_id} but did not contain any MXC URLs.");
return Ok(RoomMessageEventContent::text_plain("Parsed event ID but found no MXC URLs.")); return Ok(RoomMessageEventContent::text_plain("Parsed event ID but found no MXC URLs."));
} }
let mut mxc_deletion_count: usize = 0;
for mxc_url in mxc_urls { for mxc_url in mxc_urls {
self.services match self
.services
.media .media
.delete(&mxc_url.as_str().try_into()?) .delete(&mxc_url.as_str().try_into()?)
.await?; .await
mxc_deletion_count = mxc_deletion_count.saturating_add(1); {
Ok(()) => {
debug_info!("Successfully deleted {mxc_url} from filesystem and database");
mxc_deletion_count = mxc_deletion_count.saturating_add(1);
},
Err(e) => {
debug_warn!("Failed to delete {mxc_url}, ignoring error and skipping: {e}");
continue;
},
}
} }
return Ok(RoomMessageEventContent::text_plain(format!( return Ok(RoomMessageEventContent::text_plain(format!(
@ -158,34 +169,62 @@ pub(super) async fn delete_list(&self) -> Result<RoomMessageEventContent> {
)); ));
} }
let mut failed_parsed_mxcs: usize = 0;
let mxc_list = self let mxc_list = self
.body .body
.to_vec() .to_vec()
.drain(1..self.body.len().checked_sub(1).unwrap()) .drain(1..self.body.len().checked_sub(1).unwrap())
.collect::<Vec<_>>(); .filter_map(|mxc_s| {
mxc_s
.try_into()
.inspect_err(|e| {
debug_warn!("Failed to parse user-provided MXC URI: {e}");
failed_parsed_mxcs = failed_parsed_mxcs.saturating_add(1);
})
.ok()
})
.collect::<Vec<Mxc<'_>>>();
let mut mxc_deletion_count: usize = 0; let mut mxc_deletion_count: usize = 0;
for mxc in mxc_list { for mxc in &mxc_list {
debug!("Deleting MXC {mxc} in bulk"); trace!(%failed_parsed_mxcs, %mxc_deletion_count, "Deleting MXC {mxc} in bulk");
self.services.media.delete(&mxc.try_into()?).await?; match self.services.media.delete(mxc).await {
mxc_deletion_count = mxc_deletion_count Ok(()) => {
.checked_add(1) debug_info!("Successfully deleted {mxc} from filesystem and database");
.expect("mxc_deletion_count should not get this high"); mxc_deletion_count = mxc_deletion_count.saturating_add(1);
},
Err(e) => {
debug_warn!("Failed to delete {mxc}, ignoring error and skipping: {e}");
continue;
},
}
} }
Ok(RoomMessageEventContent::text_plain(format!( Ok(RoomMessageEventContent::text_plain(format!(
"Finished bulk MXC deletion, deleted {mxc_deletion_count} total MXCs from our database and the filesystem.", "Finished bulk MXC deletion, deleted {mxc_deletion_count} total MXCs from our database and the filesystem. \
{failed_parsed_mxcs} MXCs failed to be parsed from the database.",
))) )))
} }
#[admin_command] #[admin_command]
pub(super) async fn delete_past_remote_media(&self, duration: String, force: bool) -> Result<RoomMessageEventContent> { pub(super) async fn delete_past_remote_media(
&self, duration: String, before: bool, after: bool, yes_i_want_to_delete_local_media: bool,
) -> Result<RoomMessageEventContent> {
if before && after {
return Ok(RoomMessageEventContent::text_plain(
"Please only pick one argument, --before or --after.",
));
}
assert!(!(before && after), "--before and --after should not be specified together");
let duration = parse_timepoint_ago(&duration)?; let duration = parse_timepoint_ago(&duration)?;
let deleted_count = self let deleted_count = self
.services .services
.media .media
.delete_all_remote_media_at_after_time(duration, force) .delete_all_remote_media_at_after_time(duration, before, after, yes_i_want_to_delete_local_media)
.await?; .await?;
Ok(RoomMessageEventContent::text_plain(format!( Ok(RoomMessageEventContent::text_plain(format!(
@ -194,14 +233,10 @@ pub(super) async fn delete_past_remote_media(&self, duration: String, force: boo
} }
#[admin_command] #[admin_command]
pub(super) async fn delete_all_from_user(&self, username: String, force: bool) -> Result<RoomMessageEventContent> { pub(super) async fn delete_all_from_user(&self, username: String) -> Result<RoomMessageEventContent> {
let user_id = parse_local_user_id(self.services, &username)?; let user_id = parse_local_user_id(self.services, &username)?;
let deleted_count = self let deleted_count = self.services.media.delete_from_user(&user_id).await?;
.services
.media
.delete_from_user(&user_id, force)
.await?;
Ok(RoomMessageEventContent::text_plain(format!( Ok(RoomMessageEventContent::text_plain(format!(
"Deleted {deleted_count} total files.", "Deleted {deleted_count} total files.",
@ -210,34 +245,36 @@ pub(super) async fn delete_all_from_user(&self, username: String, force: bool) -
#[admin_command] #[admin_command]
pub(super) async fn delete_all_from_server( pub(super) async fn delete_all_from_server(
&self, server_name: Box<ServerName>, force: bool, &self, server_name: Box<ServerName>, yes_i_want_to_delete_local_media: bool,
) -> Result<RoomMessageEventContent> { ) -> Result<RoomMessageEventContent> {
if server_name == self.services.globals.server_name() { if server_name == self.services.globals.server_name() && !yes_i_want_to_delete_local_media {
return Ok(RoomMessageEventContent::text_plain("This command only works for remote media.")); return Ok(RoomMessageEventContent::text_plain(
"This command only works for remote media by default.",
));
} }
let Ok(all_mxcs) = self.services.media.get_all_mxcs().await else { let Ok(all_mxcs) = self
.services
.media
.get_all_mxcs()
.await
.inspect_err(|e| error!("Failed to get MXC URIs from our database: {e}"))
else {
return Ok(RoomMessageEventContent::text_plain("Failed to get MXC URIs from our database")); return Ok(RoomMessageEventContent::text_plain("Failed to get MXC URIs from our database"));
}; };
let mut deleted_count: usize = 0; let mut deleted_count: usize = 0;
for mxc in all_mxcs { for mxc in all_mxcs {
let mxc_server_name = match mxc.server_name() { let Ok(mxc_server_name) = mxc.server_name().inspect_err(|e| {
Ok(server_name) => server_name, debug_warn!("Failed to parse MXC {mxc} server name from database, ignoring error and skipping: {e}");
Err(e) => { }) else {
if force { continue;
warn!("Failed to parse MXC {mxc} server name from database, ignoring error and skipping: {e}");
continue;
}
return Ok(RoomMessageEventContent::text_plain(format!(
"Failed to parse MXC {mxc} server name from database: {e}",
)));
},
}; };
if mxc_server_name != server_name || self.services.globals.server_is_ours(mxc_server_name) { if mxc_server_name != server_name
|| (self.services.globals.server_is_ours(mxc_server_name) && !yes_i_want_to_delete_local_media)
{
trace!("skipping MXC URI {mxc}"); trace!("skipping MXC URI {mxc}");
continue; continue;
} }
@ -249,12 +286,8 @@ pub(super) async fn delete_all_from_server(
deleted_count = deleted_count.saturating_add(1); deleted_count = deleted_count.saturating_add(1);
}, },
Err(e) => { Err(e) => {
if force { debug_warn!("Failed to delete {mxc}, ignoring error and skipping: {e}");
warn!("Failed to delete {mxc}, ignoring error and skipping: {e}"); continue;
continue;
}
return Ok(RoomMessageEventContent::text_plain(format!("Failed to delete MXC {mxc}: {e}")));
}, },
} }
} }

View file

@ -10,7 +10,7 @@ use crate::admin_command_dispatch;
#[derive(Debug, Subcommand)] #[derive(Debug, Subcommand)]
pub(super) enum MediaCommand { pub(super) enum MediaCommand {
/// - Deletes a single media file from our database and on the filesystem /// - Deletes a single media file from our database and on the filesystem
/// via a single MXC URL /// via a single MXC URL or event ID (not redacted)
Delete { Delete {
/// The MXC URL to delete /// The MXC URL to delete
#[arg(long)] #[arg(long)]
@ -23,37 +23,44 @@ pub(super) enum MediaCommand {
}, },
/// - Deletes a codeblock list of MXC URLs from our database and on the /// - Deletes a codeblock list of MXC URLs from our database and on the
/// filesystem /// filesystem. This will always ignore errors.
DeleteList, DeleteList,
/// - Deletes all remote media in the last X amount of time using filesystem /// - Deletes all remote media in the last/after "X" time using filesystem
/// metadata first created at date. /// metadata first created at date, or fallback to last modified date.
/// This will always ignore errors by default.
///
/// Synapse
DeletePastRemoteMedia { DeletePastRemoteMedia {
/// - The duration (at or after), e.g. "5m" to delete all media in the /// - The duration (at or after/before), e.g. "5m" to delete all media
/// past 5 minutes /// in the past or up to 5 minutes
duration: String, duration: String,
/// Continues deleting remote media if an undeletable object is found #[arg(long, short)]
#[arg(short, long)] before: bool,
force: bool,
#[arg(long, short)]
after: bool,
/// Long argument to delete local media
#[arg(long)]
yes_i_want_to_delete_local_media: bool,
}, },
/// - Deletes all the local media from a local user on our server /// - Deletes all the local media from a local user on our server. This will
/// always ignore errors by default.
DeleteAllFromUser { DeleteAllFromUser {
username: String, username: String,
/// Continues deleting media if an undeletable object is found
#[arg(short, long)]
force: bool,
}, },
/// - Deletes all remote media from the specified remote server /// - Deletes all remote media from the specified remote server. This will
/// always ignore errors by default.
DeleteAllFromServer { DeleteAllFromServer {
server_name: Box<ServerName>, server_name: Box<ServerName>,
/// Continues deleting media if an undeletable object is found /// Long argument to delete local media
#[arg(short, long)] #[arg(long)]
force: bool, yes_i_want_to_delete_local_media: bool,
}, },
GetFileInfo { GetFileInfo {
@ -82,10 +89,10 @@ pub(super) enum MediaCommand {
#[arg(short, long, default_value("10000"))] #[arg(short, long, default_value("10000"))]
timeout: u32, timeout: u32,
#[arg(short, long)] #[arg(short, long, default_value("800"))]
width: u32, width: u32,
#[arg(short, long)] #[arg(short, long, default_value("800"))]
height: u32, height: u32,
}, },
} }

View file

@ -10,7 +10,7 @@ use std::{path::PathBuf, sync::Arc, time::SystemTime};
use async_trait::async_trait; use async_trait::async_trait;
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose, Engine as _};
use conduit::{ use conduit::{
debug, debug_error, debug_info, err, error, trace, debug, debug_error, debug_info, debug_warn, err, error, trace,
utils::{self, MutexMap}, utils::{self, MutexMap},
warn, Err, Result, Server, warn, Err, Result, Server,
}; };
@ -99,45 +99,46 @@ impl Service {
pub async fn delete(&self, mxc: &Mxc<'_>) -> Result<()> { pub async fn delete(&self, mxc: &Mxc<'_>) -> Result<()> {
if let Ok(keys) = self.db.search_mxc_metadata_prefix(mxc) { if let Ok(keys) = self.db.search_mxc_metadata_prefix(mxc) {
for key in keys { for key in keys {
trace!(?mxc, ?key, "Deleting from filesystem"); trace!(?mxc, "MXC Key: {key:?}");
debug_info!(?mxc, "Deleting from filesystem");
if let Err(e) = self.remove_media_file(&key).await { if let Err(e) = self.remove_media_file(&key).await {
error!(?mxc, ?key, "Failed to remove media file: {e}"); debug_error!(?mxc, "Failed to remove media file: {e}");
} }
trace!(?mxc, ?key, "Deleting from database"); debug_info!(?mxc, "Deleting from database");
if let Err(e) = self.db.delete_file_mxc(mxc) { _ = self.db.delete_file_mxc(mxc);
error!(?mxc, ?key, "Failed to remove media from database: {e}");
}
} }
Ok(()) Ok(())
} else { } else {
Err!(Database(error!( Err!(Database(error!("Failed to find any media keys for MXC {mxc} in our database.")))
"Failed to find any media keys for MXC {mxc:?} in our database."
)))
} }
} }
/// Deletes all media by the specified user /// Deletes all media by the specified user
/// ///
/// currently, this is only practical for local users /// currently, this is only practical for local users
pub async fn delete_from_user(&self, user: &UserId, force: bool) -> Result<usize> { pub async fn delete_from_user(&self, user: &UserId) -> Result<usize> {
let mxcs = self.db.get_all_user_mxcs(user); let mxcs = self.db.get_all_user_mxcs(user);
let mut deletion_count: usize = 0; let mut deletion_count: usize = 0;
for mxc in mxcs { for mxc in mxcs {
let mxc: Mxc<'_> = mxc.as_str().try_into()?; let Ok(mxc) = mxc.as_str().try_into().inspect_err(|e| {
debug_info!("Deleting MXC {mxc} by user {user} from database and filesystem"); debug_error!(?mxc, "Failed to parse MXC URI from database: {e}");
if force { }) else {
_ = self continue;
.delete(&mxc) };
.await
.inspect_err(|e| warn!("Failed to delete {mxc} from user {user}, ignoring error: {e}"));
} else {
self.delete(&mxc).await?;
}
deletion_count = deletion_count.saturating_add(1); debug_info!(%deletion_count, "Deleting MXC {mxc} by user {user} from database and filesystem");
match self.delete(&mxc).await {
Ok(()) => {
deletion_count = deletion_count.saturating_add(1);
},
Err(e) => {
debug_error!(%deletion_count, "Failed to delete {mxc} from user {user}, ignoring error: {e}");
},
}
} }
Ok(deletion_count) Ok(deletion_count)
@ -176,9 +177,6 @@ impl Service {
for key in all_keys { for key in all_keys {
trace!("Full MXC key from database: {key:?}"); trace!("Full MXC key from database: {key:?}");
// we need to get the MXC URL from the first part of the key (the first 0xff /
// 255 push). this is all necessary because of conduit using magic keys for
// media
let mut parts = key.split(|&b| b == 0xFF); let mut parts = key.split(|&b| b == 0xFF);
let mxc = parts let mxc = parts
.next() .next()
@ -189,31 +187,33 @@ impl Service {
.transpose()?; .transpose()?;
let Some(mxc_s) = mxc else { let Some(mxc_s) = mxc else {
return Err!(Database("Parsed MXC URL unicode bytes from database but still is None")); debug_warn!(?mxc, "Parsed MXC URL unicode bytes from database but is still invalid");
continue;
}; };
trace!("Parsed MXC key to URL: {mxc_s}"); trace!("Parsed MXC key to URL: {mxc_s}");
let mxc = OwnedMxcUri::from(mxc_s); let mxc = OwnedMxcUri::from(mxc_s);
mxcs.push(mxc); if mxc.is_valid() {
mxcs.push(mxc);
} else {
debug_warn!("{mxc:?} from database was found to not be valid");
}
} }
Ok(mxcs) Ok(mxcs)
} }
/// Deletes all remote only media files in the given at or after /// Deletes all remote only media files in the given at or after
/// time/duration. Returns a u32 with the amount of media files deleted. /// time/duration. Returns a usize with the amount of media files deleted.
pub async fn delete_all_remote_media_at_after_time(&self, time: SystemTime, force: bool) -> Result<usize> { pub async fn delete_all_remote_media_at_after_time(
&self, time: SystemTime, before: bool, after: bool, yes_i_want_to_delete_local_media: bool,
) -> Result<usize> {
let all_keys = self.db.get_all_media_keys(); let all_keys = self.db.get_all_media_keys();
let mut remote_mxcs = Vec::with_capacity(all_keys.len()); let mut remote_mxcs = Vec::with_capacity(all_keys.len());
for key in all_keys { for key in all_keys {
trace!("Full MXC key from database: {key:?}"); trace!("Full MXC key from database: {key:?}");
// we need to get the MXC URL from the first part of the key (the first 0xff /
// 255 push). this is all necessary because of conduit using magic keys for
// media
let mut parts = key.split(|&b| b == 0xFF); let mut parts = key.split(|&b| b == 0xFF);
let mxc = parts let mxc = parts
.next() .next()
@ -224,35 +224,30 @@ impl Service {
.transpose()?; .transpose()?;
let Some(mxc_s) = mxc else { let Some(mxc_s) = mxc else {
return Err!(Database("Parsed MXC URL unicode bytes from database but still is None")); debug_warn!(?mxc, "Parsed MXC URL unicode bytes from database but is still invalid");
continue;
}; };
trace!("Parsed MXC key to URL: {mxc_s}"); trace!("Parsed MXC key to URL: {mxc_s}");
let mxc = OwnedMxcUri::from(mxc_s); let mxc = OwnedMxcUri::from(mxc_s);
if mxc.server_name() == Ok(self.services.globals.server_name()) { if (mxc.server_name() == Ok(self.services.globals.server_name()) && !yes_i_want_to_delete_local_media)
debug!("Ignoring local media MXC: {mxc}"); || !mxc.is_valid()
// ignore our own MXC URLs as this would be local media. {
debug!("Ignoring local or broken media MXC: {mxc}");
continue; continue;
} }
let path = self.get_media_file(&key); let path = self.get_media_file(&key);
debug!("MXC path: {path:?}");
let file_metadata = match fs::metadata(path.clone()).await { let file_metadata = match fs::metadata(path.clone()).await {
Ok(file_metadata) => file_metadata, Ok(file_metadata) => file_metadata,
Err(e) => { Err(e) => {
if force { error!("Failed to obtain file metadata for MXC {mxc} at file path \"{path:?}\", skipping: {e}");
error!("Failed to obtain file metadata for MXC {mxc} at file path \"{path:?}\", skipping: {e}"); continue;
continue;
}
return Err!(Database(
"Failed to obtain file metadata for MXC {mxc} at file path \"{path:?}\": {e}"
));
}, },
}; };
debug!("File metadata: {file_metadata:?}"); trace!(%mxc, ?path, "File metadata: {file_metadata:?}");
let file_created_at = match file_metadata.created() { let file_created_at = match file_metadata.created() {
Ok(value) => value, Ok(value) => value,
@ -261,33 +256,36 @@ impl Service {
file_metadata.modified()? file_metadata.modified()?
}, },
Err(err) => { Err(err) => {
if force { error!("Could not delete MXC {mxc} at path {path:?}: {err:?}. Skipping...");
error!("Could not delete MXC {mxc} at path {path:?}: {err:?}. Skipping..."); continue;
continue;
}
return Err(err.into());
}, },
}; };
debug!("File created at: {file_created_at:?}"); debug!("File created at: {file_created_at:?}");
if file_created_at <= time {
debug!("File is within user duration, pushing to list of file paths and keys to delete."); if file_created_at >= time && before {
debug!("File is within (before) user duration, pushing to list of file paths and keys to delete.");
remote_mxcs.push(mxc.to_string());
} else if file_created_at <= time && after {
debug!("File is not within (after) user duration, pushing to list of file paths and keys to delete.");
remote_mxcs.push(mxc.to_string()); remote_mxcs.push(mxc.to_string());
} }
} }
debug!(
"Finished going through all our media in database for eligible keys to delete, checking if these are empty"
);
if remote_mxcs.is_empty() { if remote_mxcs.is_empty() {
return Err!(Database("Did not found any eligible MXCs to delete.")); return Err!(Database("Did not found any eligible MXCs to delete."));
} }
debug_info!("Deleting media now in the past {time:?}."); debug_info!("Deleting media now in the past {time:?}");
let mut deletion_count: usize = 0; let mut deletion_count: usize = 0;
for mxc in remote_mxcs { for mxc in remote_mxcs {
let mxc: Mxc<'_> = mxc.as_str().try_into()?; let Ok(mxc) = mxc.as_str().try_into() else {
debug_warn!("Invalid MXC in database, skipping");
continue;
};
debug_info!("Deleting MXC {mxc} from database and filesystem"); debug_info!("Deleting MXC {mxc} from database and filesystem");
match self.delete(&mxc).await { match self.delete(&mxc).await {
@ -295,12 +293,8 @@ impl Service {
deletion_count = deletion_count.saturating_add(1); deletion_count = deletion_count.saturating_add(1);
}, },
Err(e) => { Err(e) => {
if force { warn!("Failed to delete {mxc}, ignoring error and skipping: {e}");
warn!("Failed to delete {mxc}, ignoring error and skipping: {e}"); continue;
continue;
}
return Err!(Database(warn!("Failed to delete MXC {mxc}: {e}")));
}, },
} }
} }