improve admin command error propagation

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-08-28 04:09:46 +00:00
parent f047675a63
commit bb5f2556c3
9 changed files with 192 additions and 78 deletions

View file

@ -1,9 +1,11 @@
use std::time::SystemTime; use std::time::SystemTime;
use conduit_service::Services; use conduit_service::Services;
use ruma::EventId;
pub(crate) struct Command<'a> { pub(crate) struct Command<'a> {
pub(crate) services: &'a Services, pub(crate) services: &'a Services,
pub(crate) body: &'a [&'a str], pub(crate) body: &'a [&'a str],
pub(crate) timer: SystemTime, pub(crate) timer: SystemTime,
pub(crate) reply_id: Option<&'a EventId>,
} }

View file

@ -195,5 +195,6 @@ pub(super) enum DebugCommand {
/// - Developer test stubs /// - Developer test stubs
#[command(subcommand)] #[command(subcommand)]
#[allow(non_snake_case)] #[allow(non_snake_case)]
#[clap(hide(true))]
Tester(TesterCommand), Tester(TesterCommand),
} }

View file

@ -1,3 +1,4 @@
use conduit::Err;
use ruma::events::room::message::RoomMessageEventContent; use ruma::events::room::message::RoomMessageEventContent;
use crate::{admin_command, admin_command_dispatch, Result}; use crate::{admin_command, admin_command_dispatch, Result};
@ -5,13 +6,28 @@ use crate::{admin_command, admin_command_dispatch, Result};
#[admin_command_dispatch] #[admin_command_dispatch]
#[derive(Debug, clap::Subcommand)] #[derive(Debug, clap::Subcommand)]
pub(crate) enum TesterCommand { pub(crate) enum TesterCommand {
Panic,
Failure,
Tester, Tester,
Timer, Timer,
} }
#[rustfmt::skip]
#[admin_command]
async fn panic(&self) -> Result<RoomMessageEventContent> {
panic!("panicked")
}
#[rustfmt::skip]
#[admin_command]
async fn failure(&self) -> Result<RoomMessageEventContent> {
Err!("failed")
}
#[inline(never)] #[inline(never)]
#[rustfmt::skip] #[rustfmt::skip]
#[allow(unused_variables)]
#[admin_command] #[admin_command]
async fn tester(&self) -> Result<RoomMessageEventContent> { async fn tester(&self) -> Result<RoomMessageEventContent> {

View file

@ -23,7 +23,7 @@ use ruma::{
relation::InReplyTo, relation::InReplyTo,
room::message::{Relation::Reply, RoomMessageEventContent}, room::message::{Relation::Reply, RoomMessageEventContent},
}, },
OwnedEventId, EventId,
}; };
use service::{ use service::{
admin::{CommandInput, CommandOutput, ProcessorFuture, ProcessorResult}, admin::{CommandInput, CommandOutput, ProcessorFuture, ProcessorResult},
@ -48,12 +48,12 @@ async fn handle_command(services: Arc<Services>, command: CommandInput) -> Proce
.catch_unwind() .catch_unwind()
.await .await
.map_err(Error::from_panic) .map_err(Error::from_panic)
.or_else(|error| handle_panic(&error, command)) .unwrap_or_else(|error| handle_panic(&error, &command))
} }
async fn process_command(services: Arc<Services>, input: &CommandInput) -> CommandOutput { async fn process_command(services: Arc<Services>, input: &CommandInput) -> ProcessorResult {
let (command, args, body) = match parse(&services, input) { let (command, args, body) = match parse(&services, input) {
Err(error) => return error, Err(error) => return Err(error),
Ok(parsed) => parsed, Ok(parsed) => parsed,
}; };
@ -61,33 +61,22 @@ async fn process_command(services: Arc<Services>, input: &CommandInput) -> Comma
services: &services, services: &services,
body: &body, body: &body,
timer: SystemTime::now(), timer: SystemTime::now(),
reply_id: input.reply_id.as_deref(),
}; };
process(&context, command, &args) process(&context, command, &args).await
.await
.and_then(|content| reply(content, input.reply_id.clone()))
} }
fn handle_panic(error: &Error, command: CommandInput) -> ProcessorResult { fn handle_panic(error: &Error, command: &CommandInput) -> ProcessorResult {
let link = "Please submit a [bug report](https://github.com/girlbossceo/conduwuit/issues/new). 🥺"; let link = "Please submit a [bug report](https://github.com/girlbossceo/conduwuit/issues/new). 🥺";
let msg = format!("Panic occurred while processing command:\n```\n{error:#?}\n```\n{link}"); let msg = format!("Panic occurred while processing command:\n```\n{error:#?}\n```\n{link}");
let content = RoomMessageEventContent::notice_markdown(msg); let content = RoomMessageEventContent::notice_markdown(msg);
error!("Panic while processing command: {error:?}"); error!("Panic while processing command: {error:?}");
Ok(reply(content, command.reply_id)) Err(reply(content, command.reply_id.as_deref()))
}
fn reply(mut content: RoomMessageEventContent, reply_id: Option<OwnedEventId>) -> Option<RoomMessageEventContent> {
content.relates_to = reply_id.map(|event_id| Reply {
in_reply_to: InReplyTo {
event_id,
},
});
Some(content)
} }
// Parse and process a message from the admin room // Parse and process a message from the admin room
async fn process(context: &Command<'_>, command: AdminCommand, args: &[String]) -> CommandOutput { async fn process(context: &Command<'_>, command: AdminCommand, args: &[String]) -> ProcessorResult {
let (capture, logs) = capture_create(context); let (capture, logs) = capture_create(context);
let capture_scope = capture.start(); let capture_scope = capture.start();
@ -112,13 +101,15 @@ async fn process(context: &Command<'_>, command: AdminCommand, args: &[String])
match result { match result {
Ok(content) => { Ok(content) => {
write!(&mut output, "{}", content.body()).expect("failed to format command result to output"); write!(&mut output, "{0}", content.body()).expect("failed to format command result to output buffer");
Ok(Some(reply(RoomMessageEventContent::notice_markdown(output), context.reply_id)))
}, },
Err(error) => write!(&mut output, "Command failed with error:\n```\n{error:#?}\n```") Err(error) => {
.expect("failed to format error to command output"), write!(&mut output, "Command failed with error:\n```\n{error:#?}\n```")
}; .expect("failed to format command result to output");
Err(reply(RoomMessageEventContent::notice_markdown(output), context.reply_id))
Some(RoomMessageEventContent::notice_markdown(output)) },
}
} }
fn capture_create(context: &Command<'_>) -> (Arc<Capture>, Arc<Mutex<String>>) { fn capture_create(context: &Command<'_>) -> (Arc<Capture>, Arc<Mutex<String>>) {
@ -158,7 +149,10 @@ fn parse<'a>(
let message = error let message = error
.to_string() .to_string()
.replace("server.name", services.globals.server_name().as_str()); .replace("server.name", services.globals.server_name().as_str());
Err(Some(RoomMessageEventContent::notice_markdown(message))) Err(reply(
RoomMessageEventContent::notice_markdown(message),
input.reply_id.as_deref(),
))
}, },
} }
} }
@ -255,3 +249,13 @@ fn parse_line(command_line: &str) -> Vec<String> {
trace!(?command_line, ?argv, "parse"); trace!(?command_line, ?argv, "parse");
argv argv
} }
fn reply(mut content: RoomMessageEventContent, reply_id: Option<&EventId>) -> RoomMessageEventContent {
content.relates_to = reply_id.map(|event_id| Reply {
in_reply_to: InReplyTo {
event_id: event_id.to_owned(),
},
});
content
}

View file

@ -338,6 +338,8 @@ pub struct Config {
pub admin_console_automatic: bool, pub admin_console_automatic: bool,
#[serde(default)] #[serde(default)]
pub admin_execute: Vec<String>, pub admin_execute: Vec<String>,
#[serde(default)]
pub admin_execute_errors_ignore: bool,
#[serde(default = "default_admin_log_capture")] #[serde(default = "default_admin_log_capture")]
pub admin_log_capture: String, pub admin_log_capture: String,
@ -601,6 +603,10 @@ impl fmt::Display for Config {
&self.admin_console_automatic.to_string(), &self.admin_console_automatic.to_string(),
); );
line("Execute admin commands after startup", &self.admin_execute.join(", ")); line("Execute admin commands after startup", &self.admin_execute.join(", "));
line(
"Continue startup even if some commands fail",
&self.admin_execute_errors_ignore.to_string(),
);
line("Filter for admin command log capture", &self.admin_log_capture); line("Filter for admin command log capture", &self.admin_log_capture);
line("Allow outgoing federated typing", &self.allow_outgoing_typing.to_string()); line("Allow outgoing federated typing", &self.allow_outgoing_typing.to_string());
line("Allow incoming federated typing", &self.allow_incoming_typing.to_string()); line("Allow incoming federated typing", &self.allow_incoming_typing.to_string());

View file

@ -158,13 +158,18 @@ impl Console {
async fn process(self: Arc<Self>, line: String) { async fn process(self: Arc<Self>, line: String) {
match self.admin.command_in_place(line, None).await { match self.admin.command_in_place(line, None).await {
Ok(Some(content)) => self.output(content).await, Ok(Some(ref content)) => self.output(content),
Err(e) => error!("processing command: {e}"), Err(ref content) => self.output_err(content),
_ => (), _ => unreachable!(),
} }
} }
async fn output(self: Arc<Self>, output_content: RoomMessageEventContent) { fn output_err(self: Arc<Self>, output_content: &RoomMessageEventContent) {
let output = configure_output_err(self.output.clone());
output.print_text(output_content.body());
}
fn output(self: Arc<Self>, output_content: &RoomMessageEventContent) {
self.output.print_text(output_content.body()); self.output.print_text(output_content.body());
} }
@ -194,12 +199,32 @@ impl Console {
} }
} }
/// Standalone/static markdown printer for errors.
pub fn print_err(markdown: &str) {
let output = configure_output_err(MadSkin::default_dark());
output.print_text(markdown);
}
/// Standalone/static markdown printer. /// Standalone/static markdown printer.
pub fn print(markdown: &str) { pub fn print(markdown: &str) {
let output = configure_output(MadSkin::default_dark()); let output = configure_output(MadSkin::default_dark());
output.print_text(markdown); output.print_text(markdown);
} }
fn configure_output_err(mut output: MadSkin) -> MadSkin {
use termimad::{crossterm::style::Color, Alignment, CompoundStyle, LineStyle};
let code_style = CompoundStyle::with_fgbg(Color::AnsiValue(196), Color::AnsiValue(234));
output.inline_code = code_style.clone();
output.code_block = LineStyle {
left_margin: 0,
right_margin: 0,
align: Alignment::Left,
compound_style: code_style,
};
output
}
fn configure_output(mut output: MadSkin) -> MadSkin { fn configure_output(mut output: MadSkin) -> MadSkin {
use termimad::{crossterm::style::Color, Alignment, CompoundStyle, LineStyle}; use termimad::{crossterm::style::Color, Alignment, CompoundStyle, LineStyle};

View file

@ -10,7 +10,7 @@ use std::{
}; };
use async_trait::async_trait; use async_trait::async_trait;
use conduit::{debug, error, error::default_log, pdu::PduBuilder, Err, Error, PduEvent, Result, Server}; use conduit::{debug, err, error, error::default_log, pdu::PduBuilder, Error, PduEvent, Result, Server};
pub use create::create_admin_room; pub use create::create_admin_room;
use loole::{Receiver, Sender}; use loole::{Receiver, Sender};
use ruma::{ use ruma::{
@ -45,18 +45,34 @@ struct Services {
services: StdRwLock<Option<Weak<crate::Services>>>, services: StdRwLock<Option<Weak<crate::Services>>>,
} }
/// Inputs to a command are a multi-line string and optional reply_id.
#[derive(Debug)] #[derive(Debug)]
pub struct CommandInput { pub struct CommandInput {
pub command: String, pub command: String,
pub reply_id: Option<OwnedEventId>, pub reply_id: Option<OwnedEventId>,
} }
/// Prototype of the tab-completer. The input is buffered text when tab
/// asserted; the output will fully replace the input buffer.
pub type Completer = fn(&str) -> String; pub type Completer = fn(&str) -> String;
pub type Processor = fn(Arc<crate::Services>, CommandInput) -> ProcessorFuture;
pub type ProcessorFuture = Pin<Box<dyn Future<Output = ProcessorResult> + Send>>;
pub type ProcessorResult = Result<CommandOutput>;
pub type CommandOutput = Option<RoomMessageEventContent>;
/// Prototype of the command processor. This is a callback supplied by the
/// reloadable admin module.
pub type Processor = fn(Arc<crate::Services>, CommandInput) -> ProcessorFuture;
/// Return type of the processor
pub type ProcessorFuture = Pin<Box<dyn Future<Output = ProcessorResult> + Send>>;
/// Result wrapping of a command's handling. Both variants are complete message
/// events which have digested any prior errors. The wrapping preserves whether
/// the command failed without interpreting the text. Ok(None) outputs are
/// dropped to produce no response.
pub type ProcessorResult = Result<Option<CommandOutput>, CommandOutput>;
/// Alias for the output structure.
pub type CommandOutput = RoomMessageEventContent;
/// Maximum number of commands which can be queued for dispatch.
const COMMAND_QUEUE_LIMIT: usize = 512; const COMMAND_QUEUE_LIMIT: usize = 512;
#[async_trait] #[async_trait]
@ -86,7 +102,7 @@ impl crate::Service for Service {
let receiver = self.receiver.lock().await; let receiver = self.receiver.lock().await;
let mut signals = self.services.server.signal.subscribe(); let mut signals = self.services.server.signal.subscribe();
self.startup_execute().await; self.startup_execute().await?;
self.console_auto_start().await; self.console_auto_start().await;
loop { loop {
@ -120,11 +136,15 @@ impl crate::Service for Service {
} }
impl Service { impl Service {
/// Sends markdown message (not an m.notice for notification reasons) to the
/// admin room as the admin user.
pub async fn send_text(&self, body: &str) { pub async fn send_text(&self, body: &str) {
self.send_message(RoomMessageEventContent::text_markdown(body)) self.send_message(RoomMessageEventContent::text_markdown(body))
.await; .await;
} }
/// Sends a message to the admin room as the admin user (see send_text() for
/// convenience).
pub async fn send_message(&self, message_content: RoomMessageEventContent) { pub async fn send_message(&self, message_content: RoomMessageEventContent) {
if let Ok(Some(room_id)) = self.get_admin_room() { if let Ok(Some(room_id)) = self.get_admin_room() {
let user_id = &self.services.globals.server_user; let user_id = &self.services.globals.server_user;
@ -133,14 +153,20 @@ impl Service {
} }
} }
pub async fn command(&self, command: String, reply_id: Option<OwnedEventId>) { /// Posts a command to the command processor queue and returns. Processing
self.send(CommandInput { /// will take place on the service worker's task asynchronously. Errors if
command, /// the queue is full.
reply_id, pub fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result<()> {
}) self.sender
.await; .send(CommandInput {
command,
reply_id,
})
.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
} }
/// Dispatches a comamnd to the processor on the current task and waits for
/// completion.
pub async fn command_in_place(&self, command: String, reply_id: Option<OwnedEventId>) -> ProcessorResult { pub async fn command_in_place(&self, command: String, reply_id: Option<OwnedEventId>) -> ProcessorResult {
self.process_command(CommandInput { self.process_command(CommandInput {
command, command,
@ -149,6 +175,8 @@ impl Service {
.await .await
} }
/// Invokes the tab-completer to complete the command. When unavailable,
/// None is returned.
pub fn complete_command(&self, command: &str) -> Option<String> { pub fn complete_command(&self, command: &str) -> Option<String> {
self.complete self.complete
.read() .read()
@ -156,11 +184,6 @@ impl Service {
.map(|complete| complete(command)) .map(|complete| complete(command))
} }
async fn send(&self, message: CommandInput) {
debug_assert!(!self.sender.is_closed(), "channel closed");
self.sender.send_async(message).await.expect("message sent");
}
async fn handle_signal(&self, #[allow(unused_variables)] sig: &'static str) { async fn handle_signal(&self, #[allow(unused_variables)] sig: &'static str) {
#[cfg(feature = "console")] #[cfg(feature = "console")]
self.console.handle_signal(sig).await; self.console.handle_signal(sig).await;
@ -168,29 +191,28 @@ impl Service {
async fn handle_command(&self, command: CommandInput) { async fn handle_command(&self, command: CommandInput) {
match self.process_command(command).await { match self.process_command(command).await {
Ok(Some(output)) => self.handle_response(output).await, Ok(Some(output)) | Err(output) => self.handle_response(output).await,
Ok(None) => debug!("Command successful with no response"), Ok(None) => debug!("Command successful with no response"),
Err(e) => error!("Command processing error: {e}"),
} }
} }
async fn process_command(&self, command: CommandInput) -> ProcessorResult { async fn process_command(&self, command: CommandInput) -> ProcessorResult {
let Some(services) = self let handle = &self
.handle
.read()
.await
.expect("Admin module is not loaded");
let services = self
.services .services
.services .services
.read() .read()
.expect("locked") .expect("locked")
.as_ref() .as_ref()
.and_then(Weak::upgrade) .and_then(Weak::upgrade)
else { .expect("Services self-reference not initialized.");
return Err!("Services self-reference not initialized.");
};
if let Some(handle) = self.handle.read().await.as_ref() { handle(services, command).await
handle(services, command).await
} else {
Err!("Admin module is not loaded.")
}
} }
/// Checks whether a given user is an admin of this server /// Checks whether a given user is an admin of this server
@ -233,6 +255,10 @@ impl Service {
}; };
let Ok(Some(pdu)) = self.services.timeline.get_pdu(&in_reply_to.event_id) else { let Ok(Some(pdu)) = self.services.timeline.get_pdu(&in_reply_to.event_id) else {
error!(
event_id = ?in_reply_to.event_id,
"Missing admin command in_reply_to event"
);
return; return;
}; };

View file

@ -1,9 +1,7 @@
use conduit::{debug, debug_info, error, implement, info}; use conduit::{debug, debug_info, error, implement, info, Err, Result};
use ruma::events::room::message::RoomMessageEventContent; use ruma::events::room::message::RoomMessageEventContent;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use super::console;
/// Possibly spawn the terminal console at startup if configured. /// Possibly spawn the terminal console at startup if configured.
#[implement(super::Service)] #[implement(super::Service)]
pub(super) async fn console_auto_start(&self) { pub(super) async fn console_auto_start(&self) {
@ -24,45 +22,82 @@ pub(super) async fn console_auto_stop(&self) {
/// Execute admin commands after startup /// Execute admin commands after startup
#[implement(super::Service)] #[implement(super::Service)]
pub(super) async fn startup_execute(&self) { pub(super) async fn startup_execute(&self) -> Result<()> {
sleep(Duration::from_millis(500)).await; //TODO: remove this after run-states are broadcast // List of comamnds to execute
for (i, command) in self.services.server.config.admin_execute.iter().enumerate() { let commands = &self.services.server.config.admin_execute;
self.startup_execute_command(i, command.clone()).await;
// Determine if we're running in smoketest-mode which will change some behaviors
let smoketest = self.services.server.config.test.contains("smoke");
// When true, errors are ignored and startup continues.
let errors = !smoketest && self.services.server.config.admin_execute_errors_ignore;
//TODO: remove this after run-states are broadcast
sleep(Duration::from_millis(500)).await;
for (i, command) in commands.iter().enumerate() {
if let Err(e) = self.startup_execute_command(i, command.clone()).await {
if !errors {
return Err(e);
}
}
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
// The smoketest functionality is placed here for now and simply initiates // The smoketest functionality is placed here for now and simply initiates
// shutdown after all commands have executed. // shutdown after all commands have executed.
if self.services.server.config.test.contains("smoke") { if smoketest {
debug_info!("Smoketest mode. All commands complete. Shutting down now..."); debug_info!("Smoketest mode. All commands complete. Shutting down now...");
self.services self.services
.server .server
.shutdown() .shutdown()
.unwrap_or_else(error::default_log); .inspect_err(error::inspect_log)
.expect("Error shutting down from smoketest");
} }
Ok(())
} }
/// Execute one admin command after startup /// Execute one admin command after startup
#[implement(super::Service)] #[implement(super::Service)]
async fn startup_execute_command(&self, i: usize, command: String) { async fn startup_execute_command(&self, i: usize, command: String) -> Result<()> {
debug!("Startup command #{i}: executing {command:?}"); debug!("Startup command #{i}: executing {command:?}");
match self.command_in_place(command, None).await { match self.command_in_place(command, None).await {
Err(e) => error!("Startup command #{i} failed: {e:?}"),
Ok(None) => info!("Startup command #{i} completed (no output)."),
Ok(Some(output)) => Self::startup_command_output(i, &output), Ok(Some(output)) => Self::startup_command_output(i, &output),
Err(output) => Self::startup_command_error(i, &output),
Ok(None) => {
info!("Startup command #{i} completed (no output).");
Ok(())
},
} }
} }
#[cfg(feature = "console")] #[cfg(feature = "console")]
#[implement(super::Service)] #[implement(super::Service)]
fn startup_command_output(i: usize, content: &RoomMessageEventContent) { fn startup_command_output(i: usize, content: &RoomMessageEventContent) -> Result<()> {
info!("Startup command #{i} completed:"); debug_info!("Startup command #{i} completed:");
console::print(content.body()); super::console::print(content.body());
Ok(())
}
#[cfg(feature = "console")]
#[implement(super::Service)]
fn startup_command_error(i: usize, content: &RoomMessageEventContent) -> Result<()> {
super::console::print_err(content.body());
Err!(debug_error!("Startup command #{i} failed."))
} }
#[cfg(not(feature = "console"))] #[cfg(not(feature = "console"))]
#[implement(super::Service)] #[implement(super::Service)]
fn startup_command_output(i: usize, content: &RoomMessageEventContent) { fn startup_command_output(i: usize, content: &RoomMessageEventContent) -> Result<()> {
info!("Startup command #{i} completed:\n{:#?}", content.body()); info!("Startup command #{i} completed:\n{:#?}", content.body());
Ok(())
}
#[cfg(not(feature = "console"))]
#[implement(super::Service)]
fn startup_command_error(i: usize, content: &RoomMessageEventContent) -> Result<()> {
Err!(error!("Startup command #{i} failed:\n{:#?}", content.body()))
} }

View file

@ -523,8 +523,7 @@ impl Service {
if self.services.admin.is_admin_command(pdu, &body).await { if self.services.admin.is_admin_command(pdu, &body).await {
self.services self.services
.admin .admin
.command(body, Some((*pdu.event_id).into())) .command(body, Some((*pdu.event_id).into()))?;
.await;
} }
} }
}, },