diff --git a/src/admin/admin.rs b/src/admin/admin.rs index 9097a613..b6de1ec6 100644 --- a/src/admin/admin.rs +++ b/src/admin/admin.rs @@ -1,6 +1,5 @@ use clap::Parser; use conduwuit::Result; -use ruma::events::room::message::RoomMessageEventContent; use crate::{ appservice, appservice::AppserviceCommand, check, check::CheckCommand, command::Command, @@ -50,13 +49,10 @@ pub(super) enum AdminCommand { } #[tracing::instrument(skip_all, name = "command")] -pub(super) async fn process( - command: AdminCommand, - context: &Command<'_>, -) -> Result { +pub(super) async fn process(command: AdminCommand, context: &Command<'_>) -> Result { use AdminCommand::*; - Ok(match command { + match command { | Appservices(command) => appservice::process(command, context).await?, | Media(command) => media::process(command, context).await?, | Users(command) => user::process(command, context).await?, @@ -66,5 +62,7 @@ pub(super) async fn process( | Debug(command) => debug::process(command, context).await?, | Query(command) => query::process(command, context).await?, | Check(command) => check::process(command, context).await?, - }) + }; + + Ok(()) } diff --git a/src/admin/check/mod.rs b/src/admin/check/mod.rs index 4790a6de..30b335c4 100644 --- a/src/admin/check/mod.rs +++ b/src/admin/check/mod.rs @@ -2,20 +2,11 @@ mod commands; use clap::Subcommand; use conduwuit::Result; -use ruma::events::room::message::RoomMessageEventContent; -use crate::Command; +use crate::admin_command_dispatch; +#[admin_command_dispatch] #[derive(Debug, Subcommand)] pub(super) enum CheckCommand { - AllUsers, -} - -pub(super) async fn process( - command: CheckCommand, - context: &Command<'_>, -) -> Result { - Ok(match command { - | CheckCommand::AllUsers => context.check_all_users().await?, - }) + CheckAllUsers, } diff --git a/src/admin/command.rs b/src/admin/command.rs index 5277b976..5ad9e581 100644 --- a/src/admin/command.rs +++ b/src/admin/command.rs @@ -1,6 +1,12 @@ -use std::time::SystemTime; +use std::{fmt, time::SystemTime}; +use conduwuit::Result; use conduwuit_service::Services; +use futures::{ + io::{AsyncWriteExt, BufWriter}, + lock::Mutex, + Future, FutureExt, +}; use ruma::EventId; pub(crate) struct Command<'a> { @@ -8,4 +14,26 @@ pub(crate) struct Command<'a> { pub(crate) body: &'a [&'a str], pub(crate) timer: SystemTime, pub(crate) reply_id: Option<&'a EventId>, + pub(crate) output: Mutex>>, +} + +impl Command<'_> { + pub(crate) fn write_fmt( + &self, + arguments: fmt::Arguments<'_>, + ) -> impl Future + Send + '_ { + let buf = format!("{arguments}"); + self.output.lock().then(|mut output| async move { + output.write_all(buf.as_bytes()).await.map_err(Into::into) + }) + } + + pub(crate) fn write_str<'a>( + &'a self, + s: &'a str, + ) -> impl Future + Send + 'a { + self.output.lock().then(move |mut output| async move { + output.write_all(s.as_bytes()).await.map_err(Into::into) + }) + } } diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index d027fa73..b6189f6a 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -6,7 +6,8 @@ use std::{ }; use conduwuit::{ - debug_error, err, info, trace, utils, utils::string::EMPTY, warn, Error, PduEvent, Result, + debug_error, err, info, trace, utils, utils::string::EMPTY, warn, Error, PduEvent, PduId, + RawPduId, Result, }; use futures::{FutureExt, StreamExt}; use ruma::{ @@ -15,7 +16,10 @@ use ruma::{ CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomOrAliasId, RoomId, RoomVersionId, ServerName, }; -use service::rooms::state_compressor::HashSetCompressStateEvent; +use service::rooms::{ + short::{ShortEventId, ShortRoomId}, + state_compressor::HashSetCompressStateEvent, +}; use tracing_subscriber::EnvFilter; use crate::admin_command; @@ -131,6 +135,35 @@ pub(super) async fn get_pdu(&self, event_id: Box) -> Result Result { + let pdu_id: RawPduId = PduId { + shortroomid, + shorteventid: shorteventid.into(), + } + .into(); + + let pdu_json = self + .services + .rooms + .timeline + .get_pdu_json_from_id(&pdu_id) + .await; + + match pdu_json { + | Ok(json) => { + let json_text = + serde_json::to_string_pretty(&json).expect("canonical json is valid json"); + Ok(RoomMessageEventContent::notice_markdown(format!("```json\n{json_text}\n```",))) + }, + | Err(_) => Ok(RoomMessageEventContent::text_plain("PDU not found locally.")), + } +} + #[admin_command] pub(super) async fn get_remote_pdu_list( &self, @@ -895,7 +928,7 @@ pub(super) async fn list_dependencies(&self, names: bool) -> Result, }, + /// - Retrieve and print a PDU by PduId from the conduwuit database + GetShortPdu { + /// Shortroomid integer + shortroomid: ShortRoomId, + + /// Shorteventid integer + shorteventid: ShortEventId, + }, + /// - Attempts to retrieve a PDU from a remote server. Inserts it into our /// database/timeline if found and we do not have this PDU already /// (following normal event auth rules, handles it as an incoming PDU). diff --git a/src/admin/debug/tester.rs b/src/admin/debug/tester.rs index 5f922ece..5200fa0d 100644 --- a/src/admin/debug/tester.rs +++ b/src/admin/debug/tester.rs @@ -31,7 +31,7 @@ async fn failure(&self) -> Result { #[admin_command] async fn tester(&self) -> Result { - Ok(RoomMessageEventContent::notice_plain("completed")) + Ok(RoomMessageEventContent::notice_plain("legacy")) } #[inline(never)] diff --git a/src/admin/processor.rs b/src/admin/processor.rs index ed7d5ed1..eefcdcd6 100644 --- a/src/admin/processor.rs +++ b/src/admin/processor.rs @@ -1,5 +1,6 @@ use std::{ fmt::Write, + mem::take, panic::AssertUnwindSafe, sync::{Arc, Mutex}, time::SystemTime, @@ -17,7 +18,7 @@ use conduwuit::{ utils::string::{collect_stream, common_prefix}, warn, Error, Result, }; -use futures::future::FutureExt; +use futures::{future::FutureExt, io::BufWriter, AsyncWriteExt}; use ruma::{ events::{ relation::InReplyTo, @@ -62,9 +63,32 @@ async fn process_command(services: Arc, input: &CommandInput) -> Proce body: &body, timer: SystemTime::now(), reply_id: input.reply_id.as_deref(), + output: BufWriter::new(Vec::new()).into(), }; - process(&context, command, &args).await + let (result, mut logs) = process(&context, command, &args).await; + + let output = &mut context.output.lock().await; + output.flush().await.expect("final flush of output stream"); + + let output = + String::from_utf8(take(output.get_mut())).expect("invalid utf8 in command output stream"); + + match result { + | Ok(()) if logs.is_empty() => + Ok(Some(reply(RoomMessageEventContent::notice_markdown(output), context.reply_id))), + + | Ok(()) => { + logs.write_str(output.as_str()).expect("output buffer"); + Ok(Some(reply(RoomMessageEventContent::notice_markdown(logs), context.reply_id))) + }, + | Err(error) => { + write!(&mut logs, "Command failed with error:\n```\n{error:#?}\n```") + .expect("output buffer"); + + Err(reply(RoomMessageEventContent::notice_markdown(logs), context.reply_id)) + }, + } } fn handle_panic(error: &Error, command: &CommandInput) -> ProcessorResult { @@ -81,7 +105,7 @@ async fn process( context: &Command<'_>, command: AdminCommand, args: &[String], -) -> ProcessorResult { +) -> (Result, String) { let (capture, logs) = capture_create(context); let capture_scope = capture.start(); @@ -104,18 +128,7 @@ async fn process( } drop(logs); - match result { - | Ok(content) => { - write!(&mut output, "{0}", content.body()) - .expect("failed to format command result to output buffer"); - Ok(Some(reply(RoomMessageEventContent::notice_markdown(output), context.reply_id))) - }, - | Err(error) => { - write!(&mut output, "Command failed with error:\n```\n{error:#?}\n```") - .expect("failed to format command result to output"); - Err(reply(RoomMessageEventContent::notice_markdown(output), context.reply_id)) - }, - } + (result, output) } fn capture_create(context: &Command<'_>) -> (Arc, Arc>) { diff --git a/src/admin/query/account_data.rs b/src/admin/query/account_data.rs index 43762789..b75d8234 100644 --- a/src/admin/query/account_data.rs +++ b/src/admin/query/account_data.rs @@ -3,8 +3,9 @@ use conduwuit::Result; use futures::StreamExt; use ruma::{events::room::message::RoomMessageEventContent, RoomId, UserId}; -use crate::Command; +use crate::{admin_command, admin_command_dispatch}; +#[admin_command_dispatch] #[derive(Debug, Subcommand)] /// All the getters and iterators from src/database/key_value/account_data.rs pub(crate) enum AccountDataCommand { @@ -19,7 +20,7 @@ pub(crate) enum AccountDataCommand { }, /// - Searches the account data for a specific kind. - Get { + AccountDataGet { /// Full user ID user_id: Box, /// Account data event type @@ -29,38 +30,43 @@ pub(crate) enum AccountDataCommand { }, } -/// All the getters and iterators from src/database/key_value/account_data.rs -pub(super) async fn process( - subcommand: AccountDataCommand, - context: &Command<'_>, +#[admin_command] +async fn changes_since( + &self, + user_id: Box, + since: u64, + room_id: Option>, ) -> Result { - let services = context.services; + let timer = tokio::time::Instant::now(); + let results: Vec<_> = self + .services + .account_data + .changes_since(room_id.as_deref(), &user_id, since) + .collect() + .await; + let query_time = timer.elapsed(); - match subcommand { - | AccountDataCommand::ChangesSince { user_id, since, room_id } => { - let timer = tokio::time::Instant::now(); - let results: Vec<_> = services - .account_data - .changes_since(room_id.as_deref(), &user_id, since) - .collect() - .await; - let query_time = timer.elapsed(); - - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) - }, - | AccountDataCommand::Get { user_id, kind, room_id } => { - let timer = tokio::time::Instant::now(); - let results = services - .account_data - .get_raw(room_id.as_deref(), &user_id, &kind) - .await; - let query_time = timer.elapsed(); - - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) - }, - } + Ok(RoomMessageEventContent::notice_markdown(format!( + "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" + ))) +} + +#[admin_command] +async fn account_data_get( + &self, + user_id: Box, + kind: String, + room_id: Option>, +) -> Result { + let timer = tokio::time::Instant::now(); + let results = self + .services + .account_data + .get_raw(room_id.as_deref(), &user_id, &kind) + .await; + let query_time = timer.elapsed(); + + Ok(RoomMessageEventContent::notice_markdown(format!( + "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" + ))) } diff --git a/src/admin/query/appservice.rs b/src/admin/query/appservice.rs index fe4861bc..f9e1fd2c 100644 --- a/src/admin/query/appservice.rs +++ b/src/admin/query/appservice.rs @@ -1,6 +1,5 @@ use clap::Subcommand; use conduwuit::Result; -use ruma::events::room::message::RoomMessageEventContent; use crate::Command; @@ -18,10 +17,7 @@ pub(crate) enum AppserviceCommand { } /// All the getters and iterators from src/database/key_value/appservice.rs -pub(super) async fn process( - subcommand: AppserviceCommand, - context: &Command<'_>, -) -> Result { +pub(super) async fn process(subcommand: AppserviceCommand, context: &Command<'_>) -> Result { let services = context.services; match subcommand { @@ -31,18 +27,15 @@ pub(super) async fn process( let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) + write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```") }, | AppserviceCommand::All => { let timer = tokio::time::Instant::now(); let results = services.appservice.all().await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) + write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```") }, } + .await } diff --git a/src/admin/query/globals.rs b/src/admin/query/globals.rs index e356453f..1642f7cd 100644 --- a/src/admin/query/globals.rs +++ b/src/admin/query/globals.rs @@ -1,6 +1,6 @@ use clap::Subcommand; use conduwuit::Result; -use ruma::{events::room::message::RoomMessageEventContent, ServerName}; +use ruma::ServerName; use crate::Command; @@ -21,10 +21,7 @@ pub(crate) enum GlobalsCommand { } /// All the getters and iterators from src/database/key_value/globals.rs -pub(super) async fn process( - subcommand: GlobalsCommand, - context: &Command<'_>, -) -> Result { +pub(super) async fn process(subcommand: GlobalsCommand, context: &Command<'_>) -> Result { let services = context.services; match subcommand { @@ -33,36 +30,29 @@ pub(super) async fn process( let results = services.globals.db.database_version().await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) + write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```") }, | GlobalsCommand::CurrentCount => { let timer = tokio::time::Instant::now(); let results = services.globals.db.current_count(); let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) + write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```") }, | GlobalsCommand::LastCheckForUpdatesId => { let timer = tokio::time::Instant::now(); let results = services.updates.last_check_for_updates_id().await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) + write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```") }, | GlobalsCommand::SigningKeysFor { origin } => { let timer = tokio::time::Instant::now(); let results = services.server_keys.verify_keys_for(&origin).await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) + write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```") }, } + .await } diff --git a/src/admin/query/mod.rs b/src/admin/query/mod.rs index ab269a40..da27eb1d 100644 --- a/src/admin/query/mod.rs +++ b/src/admin/query/mod.rs @@ -3,10 +3,13 @@ mod appservice; mod globals; mod presence; mod pusher; +mod raw; mod resolver; mod room_alias; mod room_state_cache; +mod room_timeline; mod sending; +mod short; mod users; use clap::Subcommand; @@ -14,9 +17,10 @@ use conduwuit::Result; use self::{ account_data::AccountDataCommand, appservice::AppserviceCommand, globals::GlobalsCommand, - presence::PresenceCommand, pusher::PusherCommand, resolver::ResolverCommand, + presence::PresenceCommand, pusher::PusherCommand, raw::RawCommand, resolver::ResolverCommand, room_alias::RoomAliasCommand, room_state_cache::RoomStateCacheCommand, - sending::SendingCommand, users::UsersCommand, + room_timeline::RoomTimelineCommand, sending::SendingCommand, short::ShortCommand, + users::UsersCommand, }; use crate::admin_command_dispatch; @@ -44,6 +48,10 @@ pub(super) enum QueryCommand { #[command(subcommand)] RoomStateCache(RoomStateCacheCommand), + /// - rooms/timeline iterators and getters + #[command(subcommand)] + RoomTimeline(RoomTimelineCommand), + /// - globals.rs iterators and getters #[command(subcommand)] Globals(GlobalsCommand), @@ -63,4 +71,12 @@ pub(super) enum QueryCommand { /// - pusher service #[command(subcommand)] Pusher(PusherCommand), + + /// - short service + #[command(subcommand)] + Short(ShortCommand), + + /// - raw service + #[command(subcommand)] + Raw(RawCommand), } diff --git a/src/admin/query/presence.rs b/src/admin/query/presence.rs index 0de6b696..38272749 100644 --- a/src/admin/query/presence.rs +++ b/src/admin/query/presence.rs @@ -1,7 +1,7 @@ use clap::Subcommand; use conduwuit::Result; use futures::StreamExt; -use ruma::{events::room::message::RoomMessageEventContent, UserId}; +use ruma::UserId; use crate::Command; @@ -23,10 +23,7 @@ pub(crate) enum PresenceCommand { } /// All the getters and iterators in key_value/presence.rs -pub(super) async fn process( - subcommand: PresenceCommand, - context: &Command<'_>, -) -> Result { +pub(super) async fn process(subcommand: PresenceCommand, context: &Command<'_>) -> Result { let services = context.services; match subcommand { @@ -35,9 +32,7 @@ pub(super) async fn process( let results = services.presence.get_presence(&user_id).await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) + write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```") }, | PresenceCommand::PresenceSince { since } => { let timer = tokio::time::Instant::now(); @@ -49,9 +44,8 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) + write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```") }, } + .await } diff --git a/src/admin/query/pusher.rs b/src/admin/query/pusher.rs index 55532e54..34edf4db 100644 --- a/src/admin/query/pusher.rs +++ b/src/admin/query/pusher.rs @@ -1,6 +1,6 @@ use clap::Subcommand; use conduwuit::Result; -use ruma::{events::room::message::RoomMessageEventContent, UserId}; +use ruma::UserId; use crate::Command; @@ -13,10 +13,7 @@ pub(crate) enum PusherCommand { }, } -pub(super) async fn process( - subcommand: PusherCommand, - context: &Command<'_>, -) -> Result { +pub(super) async fn process(subcommand: PusherCommand, context: &Command<'_>) -> Result { let services = context.services; match subcommand { @@ -25,9 +22,8 @@ pub(super) async fn process( let results = services.pusher.get_pushers(&user_id).await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) + write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```") }, } + .await } diff --git a/src/admin/query/raw.rs b/src/admin/query/raw.rs new file mode 100644 index 00000000..678d21c9 --- /dev/null +++ b/src/admin/query/raw.rs @@ -0,0 +1,457 @@ +use std::{borrow::Cow, collections::BTreeMap, ops::Deref}; + +use clap::Subcommand; +use conduwuit::{ + apply, at, + utils::{ + stream::{ReadyExt, TryIgnore}, + string::EMPTY, + IterStream, + }, + Result, +}; +use futures::{FutureExt, StreamExt, TryStreamExt}; +use ruma::events::room::message::RoomMessageEventContent; +use tokio::time::Instant; + +use crate::{admin_command, admin_command_dispatch}; + +#[admin_command_dispatch] +#[derive(Debug, Subcommand)] +#[allow(clippy::enum_variant_names)] +/// Query tables from database +pub(crate) enum RawCommand { + /// - List database maps + RawMaps, + + /// - Raw database query + RawGet { + /// Map name + map: String, + + /// Key + key: String, + }, + + /// - Raw database keys iteration + RawKeys { + /// Map name + map: String, + + /// Key prefix + prefix: Option, + }, + + /// - Raw database key size breakdown + RawKeysSizes { + /// Map name + map: Option, + + /// Key prefix + prefix: Option, + }, + + /// - Raw database keys total bytes + RawKeysTotal { + /// Map name + map: Option, + + /// Key prefix + prefix: Option, + }, + + /// - Raw database values size breakdown + RawValsSizes { + /// Map name + map: Option, + + /// Key prefix + prefix: Option, + }, + + /// - Raw database values total bytes + RawValsTotal { + /// Map name + map: Option, + + /// Key prefix + prefix: Option, + }, + + /// - Raw database items iteration + RawIter { + /// Map name + map: String, + + /// Key prefix + prefix: Option, + }, + + /// - Raw database keys iteration + RawKeysFrom { + /// Map name + map: String, + + /// Lower-bound + start: String, + + /// Limit + #[arg(short, long)] + limit: Option, + }, + + /// - Raw database items iteration + RawIterFrom { + /// Map name + map: String, + + /// Lower-bound + start: String, + + /// Limit + #[arg(short, long)] + limit: Option, + }, + + /// - Raw database record count + RawCount { + /// Map name + map: Option, + + /// Key prefix + prefix: Option, + }, +} + +#[admin_command] +pub(super) async fn raw_count( + &self, + map: Option, + prefix: Option, +) -> Result { + let prefix = prefix.as_deref().unwrap_or(EMPTY); + + let default_all_maps = map + .is_none() + .then(|| self.services.db.keys().map(Deref::deref)) + .into_iter() + .flatten(); + + let maps: Vec<_> = map + .iter() + .map(String::as_str) + .chain(default_all_maps) + .map(|map| self.services.db.get(map)) + .filter_map(Result::ok) + .cloned() + .collect(); + + let timer = Instant::now(); + let count = maps + .iter() + .stream() + .then(|map| map.raw_count_prefix(&prefix)) + .ready_fold(0_usize, usize::saturating_add) + .await; + + let query_time = timer.elapsed(); + self.write_str(&format!("Query completed in {query_time:?}:\n\n```rs\n{count:#?}\n```")) + .await?; + + Ok(RoomMessageEventContent::text_plain("")) +} + +#[admin_command] +pub(super) async fn raw_keys( + &self, + map: String, + prefix: Option, +) -> Result { + writeln!(self, "```").boxed().await?; + + let map = self.services.db.get(map.as_str())?; + let timer = Instant::now(); + prefix + .as_deref() + .map_or_else(|| map.raw_keys().boxed(), |prefix| map.raw_keys_prefix(prefix).boxed()) + .map_ok(String::from_utf8_lossy) + .try_for_each(|str| writeln!(self, "{str:?}")) + .boxed() + .await?; + + let query_time = timer.elapsed(); + let out = format!("\n```\n\nQuery completed in {query_time:?}"); + self.write_str(out.as_str()).await?; + + Ok(RoomMessageEventContent::text_plain("")) +} + +#[admin_command] +pub(super) async fn raw_keys_sizes( + &self, + map: Option, + prefix: Option, +) -> Result { + let prefix = prefix.as_deref().unwrap_or(EMPTY); + + let default_all_maps = map + .is_none() + .then(|| self.services.db.keys().map(Deref::deref)) + .into_iter() + .flatten(); + + let maps: Vec<_> = map + .iter() + .map(String::as_str) + .chain(default_all_maps) + .map(|map| self.services.db.get(map)) + .filter_map(Result::ok) + .cloned() + .collect(); + + let timer = Instant::now(); + let result = maps + .iter() + .stream() + .map(|map| map.raw_keys_prefix(&prefix)) + .flatten() + .ignore_err() + .map(<[u8]>::len) + .ready_fold_default(|mut map: BTreeMap<_, usize>, len| { + let entry = map.entry(len).or_default(); + *entry = entry.saturating_add(1); + map + }) + .await; + + let query_time = timer.elapsed(); + let result = format!("```\n{result:#?}\n```\n\nQuery completed in {query_time:?}"); + self.write_str(result.as_str()).await?; + + Ok(RoomMessageEventContent::text_plain("")) +} + +#[admin_command] +pub(super) async fn raw_keys_total( + &self, + map: Option, + prefix: Option, +) -> Result { + let prefix = prefix.as_deref().unwrap_or(EMPTY); + + let default_all_maps = map + .is_none() + .then(|| self.services.db.keys().map(Deref::deref)) + .into_iter() + .flatten(); + + let maps: Vec<_> = map + .iter() + .map(String::as_str) + .chain(default_all_maps) + .map(|map| self.services.db.get(map)) + .filter_map(Result::ok) + .cloned() + .collect(); + + let timer = Instant::now(); + let result = maps + .iter() + .stream() + .map(|map| map.raw_keys_prefix(&prefix)) + .flatten() + .ignore_err() + .map(<[u8]>::len) + .ready_fold_default(|acc: usize, len| acc.saturating_add(len)) + .await; + + let query_time = timer.elapsed(); + + self.write_str(&format!("```\n{result:#?}\n\n```\n\nQuery completed in {query_time:?}")) + .await?; + + Ok(RoomMessageEventContent::text_plain("")) +} + +#[admin_command] +pub(super) async fn raw_vals_sizes( + &self, + map: Option, + prefix: Option, +) -> Result { + let prefix = prefix.as_deref().unwrap_or(EMPTY); + + let default_all_maps = map + .is_none() + .then(|| self.services.db.keys().map(Deref::deref)) + .into_iter() + .flatten(); + + let maps: Vec<_> = map + .iter() + .map(String::as_str) + .chain(default_all_maps) + .map(|map| self.services.db.get(map)) + .filter_map(Result::ok) + .cloned() + .collect(); + + let timer = Instant::now(); + let result = maps + .iter() + .stream() + .map(|map| map.raw_stream_prefix(&prefix)) + .flatten() + .ignore_err() + .map(at!(1)) + .map(<[u8]>::len) + .ready_fold_default(|mut map: BTreeMap<_, usize>, len| { + let entry = map.entry(len).or_default(); + *entry = entry.saturating_add(1); + map + }) + .await; + + let query_time = timer.elapsed(); + let result = format!("```\n{result:#?}\n```\n\nQuery completed in {query_time:?}"); + self.write_str(result.as_str()).await?; + + Ok(RoomMessageEventContent::text_plain("")) +} + +#[admin_command] +pub(super) async fn raw_vals_total( + &self, + map: Option, + prefix: Option, +) -> Result { + let prefix = prefix.as_deref().unwrap_or(EMPTY); + + let default_all_maps = map + .is_none() + .then(|| self.services.db.keys().map(Deref::deref)) + .into_iter() + .flatten(); + + let maps: Vec<_> = map + .iter() + .map(String::as_str) + .chain(default_all_maps) + .map(|map| self.services.db.get(map)) + .filter_map(Result::ok) + .cloned() + .collect(); + + let timer = Instant::now(); + let result = maps + .iter() + .stream() + .map(|map| map.raw_stream_prefix(&prefix)) + .flatten() + .ignore_err() + .map(at!(1)) + .map(<[u8]>::len) + .ready_fold_default(|acc: usize, len| acc.saturating_add(len)) + .await; + + let query_time = timer.elapsed(); + + self.write_str(&format!("```\n{result:#?}\n\n```\n\nQuery completed in {query_time:?}")) + .await?; + + Ok(RoomMessageEventContent::text_plain("")) +} + +#[admin_command] +pub(super) async fn raw_iter( + &self, + map: String, + prefix: Option, +) -> Result { + writeln!(self, "```").await?; + + let map = self.services.db.get(&map)?; + let timer = Instant::now(); + prefix + .as_deref() + .map_or_else(|| map.raw_stream().boxed(), |prefix| map.raw_stream_prefix(prefix).boxed()) + .map_ok(apply!(2, String::from_utf8_lossy)) + .map_ok(apply!(2, Cow::into_owned)) + .try_for_each(|keyval| writeln!(self, "{keyval:?}")) + .boxed() + .await?; + + let query_time = timer.elapsed(); + self.write_str(&format!("\n```\n\nQuery completed in {query_time:?}")) + .await?; + + Ok(RoomMessageEventContent::text_plain("")) +} + +#[admin_command] +pub(super) async fn raw_keys_from( + &self, + map: String, + start: String, + limit: Option, +) -> Result { + writeln!(self, "```").await?; + + let map = self.services.db.get(&map)?; + let timer = Instant::now(); + map.raw_keys_from(&start) + .map_ok(String::from_utf8_lossy) + .take(limit.unwrap_or(usize::MAX)) + .try_for_each(|str| writeln!(self, "{str:?}")) + .boxed() + .await?; + + let query_time = timer.elapsed(); + self.write_str(&format!("\n```\n\nQuery completed in {query_time:?}")) + .await?; + + Ok(RoomMessageEventContent::text_plain("")) +} + +#[admin_command] +pub(super) async fn raw_iter_from( + &self, + map: String, + start: String, + limit: Option, +) -> Result { + let map = self.services.db.get(&map)?; + let timer = Instant::now(); + let result = map + .raw_stream_from(&start) + .map_ok(apply!(2, String::from_utf8_lossy)) + .map_ok(apply!(2, Cow::into_owned)) + .take(limit.unwrap_or(usize::MAX)) + .try_collect::>() + .await?; + + let query_time = timer.elapsed(); + Ok(RoomMessageEventContent::notice_markdown(format!( + "Query completed in {query_time:?}:\n\n```rs\n{result:#?}\n```" + ))) +} + +#[admin_command] +pub(super) async fn raw_get(&self, map: String, key: String) -> Result { + let map = self.services.db.get(&map)?; + let timer = Instant::now(); + let handle = map.get(&key).await?; + let query_time = timer.elapsed(); + let result = String::from_utf8_lossy(&handle); + + Ok(RoomMessageEventContent::notice_markdown(format!( + "Query completed in {query_time:?}:\n\n```rs\n{result:?}\n```" + ))) +} + +#[admin_command] +pub(super) async fn raw_maps(&self) -> Result { + let list: Vec<_> = self.services.db.iter().map(at!(0)).copied().collect(); + + Ok(RoomMessageEventContent::notice_markdown(format!("{list:#?}"))) +} diff --git a/src/admin/query/resolver.rs b/src/admin/query/resolver.rs index 3b950d13..b53661fc 100644 --- a/src/admin/query/resolver.rs +++ b/src/admin/query/resolver.rs @@ -28,56 +28,66 @@ async fn destinations_cache( ) -> Result { use service::resolver::cache::CachedDest; + writeln!(self, "| Server Name | Destination | Hostname | Expires |").await?; + writeln!(self, "| ----------- | ----------- | -------- | ------- |").await?; + let mut out = String::new(); - writeln!(out, "| Server Name | Destination | Hostname | Expires |")?; - writeln!(out, "| ----------- | ----------- | -------- | ------- |")?; - let row = |(name, &CachedDest { ref dest, ref host, expire })| { - let expire = time::format(expire, "%+"); - writeln!(out, "| {name} | {dest} | {host} | {expire} |").expect("wrote line"); - }; + { + let map = self + .services + .resolver + .cache + .destinations + .read() + .expect("locked"); - let map = self - .services - .resolver - .cache - .destinations - .read() - .expect("locked"); + for (name, &CachedDest { ref dest, ref host, expire }) in map.iter() { + if let Some(server_name) = server_name.as_ref() { + if name != server_name { + continue; + } + } - if let Some(server_name) = server_name.as_ref() { - map.get_key_value(server_name).map(row); - } else { - map.iter().for_each(row); + let expire = time::format(expire, "%+"); + writeln!(out, "| {name} | {dest} | {host} | {expire} |")?; + } } - Ok(RoomMessageEventContent::notice_markdown(out)) + self.write_str(out.as_str()).await?; + + Ok(RoomMessageEventContent::notice_plain("")) } #[admin_command] async fn overrides_cache(&self, server_name: Option) -> Result { use service::resolver::cache::CachedOverride; + writeln!(self, "| Server Name | IP | Port | Expires |").await?; + writeln!(self, "| ----------- | --- | ----:| ------- |").await?; + let mut out = String::new(); - writeln!(out, "| Server Name | IP | Port | Expires |")?; - writeln!(out, "| ----------- | --- | ----:| ------- |")?; - let row = |(name, &CachedOverride { ref ips, port, expire })| { - let expire = time::format(expire, "%+"); - writeln!(out, "| {name} | {ips:?} | {port} | {expire} |").expect("wrote line"); - }; + { + let map = self + .services + .resolver + .cache + .overrides + .read() + .expect("locked"); - let map = self - .services - .resolver - .cache - .overrides - .read() - .expect("locked"); + for (name, &CachedOverride { ref ips, port, expire }) in map.iter() { + if let Some(server_name) = server_name.as_ref() { + if name != server_name { + continue; + } + } - if let Some(server_name) = server_name.as_ref() { - map.get_key_value(server_name).map(row); - } else { - map.iter().for_each(row); + let expire = time::format(expire, "%+"); + writeln!(out, "| {name} | {ips:?} | {port} | {expire} |")?; + } } - Ok(RoomMessageEventContent::notice_markdown(out)) + self.write_str(out.as_str()).await?; + + Ok(RoomMessageEventContent::notice_plain("")) } diff --git a/src/admin/query/room_alias.rs b/src/admin/query/room_alias.rs index e1bf1622..2d4d8104 100644 --- a/src/admin/query/room_alias.rs +++ b/src/admin/query/room_alias.rs @@ -1,7 +1,7 @@ use clap::Subcommand; use conduwuit::Result; use futures::StreamExt; -use ruma::{events::room::message::RoomMessageEventContent, RoomAliasId, RoomId}; +use ruma::{RoomAliasId, RoomId}; use crate::Command; @@ -24,10 +24,7 @@ pub(crate) enum RoomAliasCommand { } /// All the getters and iterators in src/database/key_value/rooms/alias.rs -pub(super) async fn process( - subcommand: RoomAliasCommand, - context: &Command<'_>, -) -> Result { +pub(super) async fn process(subcommand: RoomAliasCommand, context: &Command<'_>) -> Result { let services = context.services; match subcommand { @@ -36,9 +33,7 @@ pub(super) async fn process( let results = services.rooms.alias.resolve_local_alias(&alias).await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) + write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```") }, | RoomAliasCommand::LocalAliasesForRoom { room_id } => { let timer = tokio::time::Instant::now(); @@ -51,9 +46,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{aliases:#?}\n```" - ))) + write!(context, "Query completed in {query_time:?}:\n\n```rs\n{aliases:#?}\n```") }, | RoomAliasCommand::AllLocalAliases => { let timer = tokio::time::Instant::now(); @@ -66,9 +59,8 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{aliases:#?}\n```" - ))) + write!(context, "Query completed in {query_time:?}:\n\n```rs\n{aliases:#?}\n```") }, } + .await } diff --git a/src/admin/query/room_state_cache.rs b/src/admin/query/room_state_cache.rs index cd7f5af7..71dadc99 100644 --- a/src/admin/query/room_state_cache.rs +++ b/src/admin/query/room_state_cache.rs @@ -1,5 +1,5 @@ use clap::Subcommand; -use conduwuit::Result; +use conduwuit::{Error, Result}; use futures::StreamExt; use ruma::{events::room::message::RoomMessageEventContent, RoomId, ServerName, UserId}; @@ -76,13 +76,10 @@ pub(crate) enum RoomStateCacheCommand { }, } -pub(super) async fn process( - subcommand: RoomStateCacheCommand, - context: &Command<'_>, -) -> Result { +pub(super) async fn process(subcommand: RoomStateCacheCommand, context: &Command<'_>) -> Result { let services = context.services; - match subcommand { + let c = match subcommand { | RoomStateCacheCommand::ServerInRoom { server, room_id } => { let timer = tokio::time::Instant::now(); let result = services @@ -92,7 +89,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{result:#?}\n```" ))) }, @@ -107,7 +104,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -122,7 +119,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -137,7 +134,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -152,7 +149,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -167,7 +164,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -176,7 +173,7 @@ pub(super) async fn process( let results = services.rooms.state_cache.room_joined_count(&room_id).await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -189,7 +186,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -204,7 +201,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -219,7 +216,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -232,7 +229,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -245,7 +242,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -260,7 +257,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -274,7 +271,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -288,7 +285,7 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, @@ -301,9 +298,13 @@ pub(super) async fn process( .await; let query_time = timer.elapsed(); - Ok(RoomMessageEventContent::notice_markdown(format!( + Result::<_, Error>::Ok(RoomMessageEventContent::notice_markdown(format!( "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, - } + }?; + + context.write_str(c.body()).await?; + + Ok(()) } diff --git a/src/admin/query/sending.rs b/src/admin/query/sending.rs index 3edbbe87..8c6fb25f 100644 --- a/src/admin/query/sending.rs +++ b/src/admin/query/sending.rs @@ -62,7 +62,14 @@ pub(crate) enum SendingCommand { } /// All the getters and iterators in key_value/sending.rs -pub(super) async fn process( +pub(super) async fn process(subcommand: SendingCommand, context: &Command<'_>) -> Result { + let c = reprocess(subcommand, context).await?; + context.write_str(c.body()).await?; + Ok(()) +} + +/// All the getters and iterators in key_value/sending.rs +pub(super) async fn reprocess( subcommand: SendingCommand, context: &Command<'_>, ) -> Result { diff --git a/src/admin/query/short.rs b/src/admin/query/short.rs new file mode 100644 index 00000000..7f0f3449 --- /dev/null +++ b/src/admin/query/short.rs @@ -0,0 +1,45 @@ +use clap::Subcommand; +use conduwuit::Result; +use ruma::{events::room::message::RoomMessageEventContent, OwnedEventId, OwnedRoomOrAliasId}; + +use crate::{admin_command, admin_command_dispatch}; + +#[admin_command_dispatch] +#[derive(Debug, Subcommand)] +/// Query tables from database +pub(crate) enum ShortCommand { + ShortEventId { + event_id: OwnedEventId, + }, + + ShortRoomId { + room_id: OwnedRoomOrAliasId, + }, +} + +#[admin_command] +pub(super) async fn short_event_id( + &self, + event_id: OwnedEventId, +) -> Result { + let shortid = self + .services + .rooms + .short + .get_shorteventid(&event_id) + .await?; + + Ok(RoomMessageEventContent::notice_markdown(format!("{shortid:#?}"))) +} + +#[admin_command] +pub(super) async fn short_room_id( + &self, + room_id: OwnedRoomOrAliasId, +) -> Result { + let room_id = self.services.rooms.alias.resolve(&room_id).await?; + + let shortid = self.services.rooms.short.get_shortroomid(&room_id).await?; + + Ok(RoomMessageEventContent::notice_markdown(format!("{shortid:#?}"))) +} diff --git a/src/admin/query/users.rs b/src/admin/query/users.rs index 2149a103..3715ac25 100644 --- a/src/admin/query/users.rs +++ b/src/admin/query/users.rs @@ -15,6 +15,8 @@ pub(crate) enum UsersCommand { IterUsers, + IterUsers2, + PasswordHash { user_id: OwnedUserId, }, @@ -89,6 +91,33 @@ pub(crate) enum UsersCommand { room_id: OwnedRoomId, session_id: String, }, + + GetSharedRooms { + user_a: OwnedUserId, + user_b: OwnedUserId, + }, +} + +#[admin_command] +async fn get_shared_rooms( + &self, + user_a: OwnedUserId, + user_b: OwnedUserId, +) -> Result { + let timer = tokio::time::Instant::now(); + let result: Vec<_> = self + .services + .rooms + .state_cache + .get_shared_rooms(&user_a, &user_b) + .map(ToOwned::to_owned) + .collect() + .await; + let query_time = timer.elapsed(); + + Ok(RoomMessageEventContent::notice_markdown(format!( + "Query completed in {query_time:?}:\n\n```rs\n{result:#?}\n```" + ))) } #[admin_command] @@ -207,6 +236,23 @@ async fn iter_users(&self) -> Result { ))) } +#[admin_command] +async fn iter_users2(&self) -> Result { + let timer = tokio::time::Instant::now(); + let result: Vec<_> = self.services.users.stream().collect().await; + let result: Vec<_> = result + .into_iter() + .map(ruma::UserId::as_bytes) + .map(String::from_utf8_lossy) + .collect(); + + let query_time = timer.elapsed(); + + Ok(RoomMessageEventContent::notice_markdown(format!( + "Query completed in {query_time:?}:\n\n```rs\n{result:?}\n```" + ))) +} + #[admin_command] async fn count_users(&self) -> Result { let timer = tokio::time::Instant::now(); diff --git a/src/admin/room/alias.rs b/src/admin/room/alias.rs index 4490600d..9710cfc8 100644 --- a/src/admin/room/alias.rs +++ b/src/admin/room/alias.rs @@ -44,7 +44,14 @@ pub(crate) enum RoomAliasCommand { }, } -pub(super) async fn process( +pub(super) async fn process(command: RoomAliasCommand, context: &Command<'_>) -> Result { + let c = reprocess(command, context).await?; + context.write_str(c.body()).await?; + + Ok(()) +} + +pub(super) async fn reprocess( command: RoomAliasCommand, context: &Command<'_>, ) -> Result { diff --git a/src/admin/room/directory.rs b/src/admin/room/directory.rs index 81f25478..791b9204 100644 --- a/src/admin/room/directory.rs +++ b/src/admin/room/directory.rs @@ -25,7 +25,13 @@ pub(crate) enum RoomDirectoryCommand { }, } -pub(super) async fn process( +pub(super) async fn process(command: RoomDirectoryCommand, context: &Command<'_>) -> Result { + let c = reprocess(command, context).await?; + context.write_str(c.body()).await?; + Ok(()) +} + +pub(super) async fn reprocess( command: RoomDirectoryCommand, context: &Command<'_>, ) -> Result { diff --git a/src/admin/user/commands.rs b/src/admin/user/commands.rs index 5758d937..57aedd9c 100644 --- a/src/admin/user/commands.rs +++ b/src/admin/user/commands.rs @@ -31,19 +31,21 @@ const BULK_JOIN_REASON: &str = "Bulk force joining this room as initiated by the #[admin_command] pub(super) async fn list_users(&self) -> Result { - let users = self + let users: Vec<_> = self .services .users .list_local_users() .map(ToString::to_string) - .collect::>() + .collect() .await; let mut plain_msg = format!("Found {} local user account(s):\n```\n", users.len()); plain_msg += users.join("\n").as_str(); plain_msg += "\n```"; - Ok(RoomMessageEventContent::notice_markdown(plain_msg)) + self.write_str(plain_msg.as_str()).await?; + + Ok(RoomMessageEventContent::text_plain("")) } #[admin_command] @@ -912,29 +914,30 @@ pub(super) async fn redact_event( self.services.globals.server_name() ); - let state_lock = self.services.rooms.state.mutex.lock(&room_id).await; + let redaction_event_id = { + let state_lock = self.services.rooms.state.mutex.lock(&room_id).await; - let redaction_event_id = self - .services - .rooms - .timeline - .build_and_append_pdu( - PduBuilder { - redacts: Some(event.event_id.clone()), - ..PduBuilder::timeline(&RoomRedactionEventContent { + self.services + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { redacts: Some(event.event_id.clone()), - reason: Some(reason), - }) - }, - &sender_user, - &room_id, - &state_lock, - ) - .await?; + ..PduBuilder::timeline(&RoomRedactionEventContent { + redacts: Some(event.event_id.clone()), + reason: Some(reason), + }) + }, + &sender_user, + &room_id, + &state_lock, + ) + .await? + }; - drop(state_lock); + let out = format!("Successfully redacted event. Redaction event ID: {redaction_event_id}"); - Ok(RoomMessageEventContent::text_plain(format!( - "Successfully redacted event. Redaction event ID: {redaction_event_id}" - ))) + self.write_str(out.as_str()).await?; + + Ok(RoomMessageEventContent::text_plain("")) } diff --git a/src/macros/admin.rs b/src/macros/admin.rs index e98e914c..e35bd586 100644 --- a/src/macros/admin.rs +++ b/src/macros/admin.rs @@ -22,7 +22,7 @@ pub(super) fn command_dispatch(item: ItemEnum, _args: &[Meta]) -> Result - ) -> Result { + ) -> Result { use #name::*; #[allow(non_snake_case)] Ok(match command { @@ -46,7 +46,10 @@ fn dispatch_arm(v: &Variant) -> Result { let field = fields.named.iter().filter_map(|f| f.ident.as_ref()); let arg = field.clone(); quote! { - #name { #( #field ),* } => Box::pin(context.#handler(#( #arg ),*)).await?, + #name { #( #field ),* } => { + let c = Box::pin(context.#handler(#( #arg ),*)).await?; + Box::pin(context.write_str(c.body())).await?; + }, } }, | Fields::Unnamed(fields) => { @@ -54,12 +57,17 @@ fn dispatch_arm(v: &Variant) -> Result { return Err(Error::new(Span::call_site().into(), "One unnamed field required")); }; quote! { - #name ( #field ) => Box::pin(#handler::process(#field, context)).await?, + #name ( #field ) => { + Box::pin(#handler::process(#field, context)).await?; + } } }, | Fields::Unit => { quote! { - #name => Box::pin(context.#handler()).await?, + #name => { + let c = Box::pin(context.#handler()).await?; + Box::pin(context.write_str(c.body())).await?; + }, } }, };