From a22524496d6e6f0fa8e9730c6deb6dca83e07fbd Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 16 Jun 2024 05:17:15 +0000 Subject: [PATCH] console command interruption Signed-off-by: Jason Volk --- src/service/admin/console.rs | 49 +++++++++++++++++++++++------------- src/service/admin/mod.rs | 1 + 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/src/service/admin/console.rs b/src/service/admin/console.rs index 13cfdb27..ba0d7ecc 100644 --- a/src/service/admin/console.rs +++ b/src/service/admin/console.rs @@ -1,7 +1,8 @@ #![cfg(feature = "console")] use std::sync::Arc; -use conduit::{error, log, trace}; +use conduit::{debug, defer, error, log, trace}; +use futures_util::future::{AbortHandle, Abortable}; use ruma::events::room::message::RoomMessageEventContent; use rustyline::{error::ReadlineError, history, Editor}; use termimad::MadSkin; @@ -12,6 +13,7 @@ use crate::services; pub struct Console { join: Mutex>>, input: Mutex>, + abort: std::sync::Mutex>, output: MadSkin, } @@ -49,6 +51,7 @@ impl Console { Arc::new(Self { join: None.into(), input: Mutex::new(input), + abort: None.into(), output, }) } @@ -62,8 +65,6 @@ impl Console { } } - pub fn interrupt(self: &Arc) { Self::handle_interrupt(); } - #[allow(clippy::let_underscore_must_use)] pub async fn close(self: &Arc) { if let Some(join) = self.join.lock().await.take() { @@ -71,8 +72,16 @@ impl Console { } } + pub fn interrupt(self: &Arc) { + if let Some(abort) = self.abort.lock().expect("locked").take() { + debug!("Interrupting console command..."); + abort.abort(); + } + } + #[tracing::instrument(skip_all, name = "console")] async fn worker(self: Arc) { + debug!("session starting"); while services().server.running() { let mut input = self.input.lock().await; @@ -80,25 +89,39 @@ impl Console { let line = tokio::task::block_in_place(|| input.readline("uwu> ")); drop(suppression); + trace!(?line, "input"); match line { - Ok(string) => self.handle(string).await, + Ok(string) => self.clone().handle(string).await, Err(e) => match e { - ReadlineError::Eof => break, - ReadlineError::Interrupted => Self::handle_interrupt(), - ReadlineError::WindowResized => Self::handle_winch(), + ReadlineError::Interrupted | ReadlineError::Eof => break, + ReadlineError::WindowResized => continue, _ => error!("console: {e:?}"), }, } } + debug!("session ending"); self.join.lock().await.take(); } - async fn handle(&self, line: String) { + #[allow(clippy::let_underscore_must_use)] + async fn handle(self: Arc, line: String) { if line.is_empty() { return; } + let future = self.clone().process(line); + let (abort, abort_reg) = AbortHandle::new_pair(); + let future = Abortable::new(future, abort_reg); + _ = self.abort.lock().expect("locked").insert(abort); + defer! {{ + _ = self.abort.lock().expect("locked").take(); + }} + + _ = future.await; + } + + async fn process(self: Arc, line: String) { match services().admin.command_in_place(line, None).await { Ok(Some(content)) => self.output(content).await, Err(e) => error!("processing command: {e}"), @@ -106,16 +129,8 @@ impl Console { } } - async fn output(&self, output_content: RoomMessageEventContent) { + async fn output(self: Arc, output_content: RoomMessageEventContent) { let output = self.output.term_text(output_content.body()); println!("{output}"); } - - fn handle_interrupt() { - trace!("interrupted"); - } - - fn handle_winch() { - trace!("winch"); - } } diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 0c949937..3f983b75 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -147,6 +147,7 @@ impl Service { async fn handle_signal(&self, #[allow(unused_variables)] sig: &'static str) { #[cfg(feature = "console")] if sig == "SIGINT" && services().server.running() { + self.console.interrupt(); self.console.start().await; } }