switch to crate rustyline_async

improve console signal and interrupt stack

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-06-16 19:42:16 +00:00
parent 959fd2e6c4
commit 5aee03d14a
5 changed files with 137 additions and 115 deletions

View file

@ -21,7 +21,7 @@ brotli_compression = [
"reqwest/brotli",
]
console = [
"dep:rustyline",
"dep:rustyline-async",
"dep:termimad",
]
dev_release_log_level = []
@ -61,8 +61,8 @@ regex.workspace = true
reqwest.workspace = true
ruma-identifiers-validation.workspace = true
ruma.workspace = true
rustyline.workspace = true
rustyline.optional = true
rustyline-async.workspace = true
rustyline-async.optional = true
serde_json.workspace = true
serde.workspace = true
serde_yaml.workspace = true

View file

@ -1,44 +1,35 @@
#![cfg(feature = "console")]
use std::sync::Arc;
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};
use conduit::{debug, defer, error, log, trace};
use conduit::{debug, defer, error, log};
use futures_util::future::{AbortHandle, Abortable};
use ruma::events::room::message::RoomMessageEventContent;
use rustyline::{error::ReadlineError, history, Editor};
use rustyline_async::{Readline, ReadlineError, ReadlineEvent};
use termimad::MadSkin;
use tokio::{sync::Mutex, task::JoinHandle};
use tokio::task::JoinHandle;
use crate::services;
pub struct Console {
join: Mutex<Option<JoinHandle<()>>>,
input: Mutex<Editor<(), history::MemHistory>>,
abort: std::sync::Mutex<Option<AbortHandle>>,
worker_join: Mutex<Option<JoinHandle<()>>>,
input_abort: Mutex<Option<AbortHandle>>,
command_abort: Mutex<Option<AbortHandle>>,
history: Mutex<VecDeque<String>>,
output: MadSkin,
}
const PROMPT: &str = "uwu> ";
const HISTORY_LIMIT: usize = 48;
impl Console {
#[must_use]
pub fn new() -> Arc<Self> {
use rustyline::config::{Behavior, BellStyle};
use termimad::{crossterm::style::Color, Alignment, CompoundStyle, LineStyle};
let config = rustyline::Config::builder()
.enable_signals(false)
.behavior(Behavior::PreferTerm)
.bell_style(BellStyle::Visible)
.auto_add_history(true)
.max_history_size(100)
.expect("valid history size")
.indent_size(4)
.tab_stop(4)
.build();
let history = history::MemHistory::with_config(config);
let input = Editor::with_history(config, history).expect("builder configuration succeeded");
let mut output = MadSkin::default_dark();
let code_style = CompoundStyle::with_fgbg(Color::AnsiValue(40), Color::AnsiValue(234));
output.inline_code = code_style.clone();
output.code_block = LineStyle {
@ -49,33 +40,63 @@ impl Console {
};
Arc::new(Self {
join: None.into(),
input: Mutex::new(input),
abort: None.into(),
worker_join: None.into(),
input_abort: None.into(),
command_abort: None.into(),
history: VecDeque::with_capacity(HISTORY_LIMIT).into(),
output,
})
}
pub(super) async fn handle_signal(self: &Arc<Self>, sig: &'static str) {
if !services().server.running() {
self.interrupt();
} else if sig == "SIGINT" {
self.interrupt_command();
self.start().await;
}
}
#[allow(clippy::let_underscore_must_use)]
pub async fn start(self: &Arc<Self>) {
let mut join = self.join.lock().await;
if join.is_none() {
let mut worker_join = self.worker_join.lock().expect("locked");
if worker_join.is_none() {
let self_ = Arc::clone(self);
_ = join.insert(services().server.runtime().spawn(self_.worker()));
_ = worker_join.insert(services().server.runtime().spawn(self_.worker()));
}
}
#[allow(clippy::let_underscore_must_use)]
pub async fn close(self: &Arc<Self>) {
if let Some(join) = self.join.lock().await.take() {
_ = join.await;
}
self.interrupt();
let Some(worker_join) = self.worker_join.lock().expect("locked").take() else {
return;
};
_ = worker_join.await;
}
pub fn interrupt(self: &Arc<Self>) {
if let Some(abort) = self.abort.lock().expect("locked").take() {
self.interrupt_command();
self.interrupt_readline();
self.worker_join
.lock()
.expect("locked")
.as_ref()
.map(JoinHandle::abort);
}
pub fn interrupt_readline(self: &Arc<Self>) {
if let Some(input_abort) = self.input_abort.lock().expect("locked").take() {
debug!("Interrupting console readline...");
input_abort.abort();
}
}
pub fn interrupt_command(self: &Arc<Self>) {
if let Some(command_abort) = self.command_abort.lock().expect("locked").take() {
debug!("Interrupting console command...");
abort.abort();
command_abort.abort();
}
}
@ -83,25 +104,48 @@ impl Console {
async fn worker(self: Arc<Self>) {
debug!("session starting");
while services().server.running() {
let mut input = self.input.lock().await;
let suppression = log::Suppress::new(&services().server);
let line = tokio::task::block_in_place(|| input.readline("uwu> "));
drop(suppression);
trace!(?line, "input");
match line {
Ok(string) => self.clone().handle(string).await,
Err(e) => match e {
ReadlineError::Interrupted | ReadlineError::Eof => break,
ReadlineError::WindowResized => continue,
_ => error!("console: {e:?}"),
match self.readline().await {
Ok(event) => match event {
ReadlineEvent::Line(string) => self.clone().handle(string).await,
ReadlineEvent::Interrupted => continue,
ReadlineEvent::Eof => break,
},
Err(error) => match error {
ReadlineError::Closed => break,
ReadlineError::IO(error) => {
error!("console I/O: {error:?}");
break;
},
},
}
}
debug!("session ending");
self.join.lock().await.take();
self.worker_join.lock().expect("locked").take();
}
#[allow(clippy::let_underscore_must_use)]
async fn readline(self: &Arc<Self>) -> Result<ReadlineEvent, ReadlineError> {
let _suppression = log::Suppress::new(&services().server);
let (mut readline, _writer) = Readline::new(PROMPT.to_owned())?;
self.set_history(&mut readline);
let future = readline.readline();
let (abort, abort_reg) = AbortHandle::new_pair();
let future = Abortable::new(future, abort_reg);
_ = self.input_abort.lock().expect("locked").insert(abort);
defer! {{
_ = self.input_abort.lock().expect("locked").take();
}}
let Ok(result) = future.await else {
return Ok(ReadlineEvent::Eof);
};
readline.flush()?;
result
}
#[allow(clippy::let_underscore_must_use)]
@ -110,12 +154,13 @@ impl Console {
return;
}
self.add_history(line.clone());
let future = self.clone().process(line);
let (abort, abort_reg) = AbortHandle::new_pair();
let future = Abortable::new(future, abort_reg);
_ = self.abort.lock().expect("locked").insert(abort);
_ = self.command_abort.lock().expect("locked").insert(abort);
defer! {{
_ = self.abort.lock().expect("locked").take();
_ = self.command_abort.lock().expect("locked").take();
}}
_ = future.await;
@ -133,4 +178,17 @@ impl Console {
let output = self.output.term_text(output_content.body());
println!("{output}");
}
fn set_history(&self, readline: &mut Readline) {
let history = self.history.lock().expect("locked");
for entry in history.iter().rev() {
readline.add_history_entry(entry.clone());
}
}
fn add_history(&self, line: String) {
let mut history = self.history.lock().expect("locked");
history.push_front(line);
history.truncate(HISTORY_LIMIT);
}
}

View file

@ -130,7 +130,6 @@ impl Service {
let receiver = self.receiver.lock().await;
let mut signals = services().server.signal.subscribe();
loop {
debug_assert!(!receiver.is_closed(), "channel closed");
tokio::select! {
command = receiver.recv_async() => match command {
Ok(command) => self.handle_command(command).await,
@ -146,10 +145,7 @@ impl Service {
async fn handle_signal(&self, #[allow(unused_variables)] sig: &'static str) {
#[cfg(feature = "console")]
if sig == "SIGINT" && services().server.running() {
self.console.interrupt();
self.console.start().await;
}
self.console.handle_signal(sig).await;
}
async fn handle_command(&self, command: Command) {