From b08c1241a89514046f666fc21f817af2feb8bce2 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 22 Oct 2024 07:15:28 +0000 Subject: [PATCH] add some interruption points in recursive event handling to prevent shutdown hangs Signed-off-by: Jason Volk --- src/api/server/send.rs | 7 ++++--- src/core/server.rs | 9 ++++++++- src/service/rooms/event_handler/mod.rs | 3 +++ 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 4f526052..d5d3ffbb 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -71,7 +71,7 @@ pub(crate) async fn send_transaction_message_route( "Starting txn", ); - let resolved_map = handle_pdus(&services, &client, &body.pdus, origin, &txn_start_time).await; + let resolved_map = handle_pdus(&services, &client, &body.pdus, origin, &txn_start_time).await?; handle_edus(&services, &client, &body.edus, origin).await; debug!( @@ -93,7 +93,7 @@ pub(crate) async fn send_transaction_message_route( async fn handle_pdus( services: &Services, _client: &IpAddr, pdus: &[Box], origin: &ServerName, txn_start_time: &Instant, -) -> ResolvedMap { +) -> Result { let mut parsed_pdus = Vec::with_capacity(pdus.len()); for pdu in pdus { parsed_pdus.push(match services.rooms.event_handler.parse_incoming_pdu(pdu).await { @@ -110,6 +110,7 @@ async fn handle_pdus( let mut resolved_map = BTreeMap::new(); for (event_id, value, room_id) in parsed_pdus { + services.server.check_running()?; let pdu_start_time = Instant::now(); let mutex_lock = services .rooms @@ -143,7 +144,7 @@ async fn handle_pdus( } } - resolved_map + Ok(resolved_map) } async fn handle_edus(services: &Services, client: &IpAddr, edus: &[Raw], origin: &ServerName) { diff --git a/src/core/server.rs b/src/core/server.rs index 89f1dea5..627e125d 100644 --- a/src/core/server.rs +++ b/src/core/server.rs @@ -5,7 +5,7 @@ use std::{ use tokio::{runtime, sync::broadcast}; -use crate::{config::Config, log::Log, metrics::Metrics, Err, Result}; +use crate::{config::Config, err, log::Log, metrics::Metrics, Err, Result}; /// Server runtime state; public portion pub struct Server { @@ -107,6 +107,13 @@ impl Server { .expect("runtime handle available in Server") } + #[inline] + pub fn check_running(&self) -> Result { + self.running() + .then_some(()) + .ok_or_else(|| err!(debug_warn!("Server is shutting down."))) + } + #[inline] pub fn running(&self) -> bool { !self.stopping.load(Ordering::Acquire) } diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 24c2692d..0b2bbf73 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -205,6 +205,7 @@ impl Service { debug!(events = ?sorted_prev_events, "Got previous events"); for prev_id in sorted_prev_events { + self.services.server.check_running()?; match self .handle_prev_pdu( origin, @@ -1268,6 +1269,8 @@ impl Service { let mut amount = 0; while let Some(prev_event_id) = todo_outlier_stack.pop() { + self.services.server.check_running()?; + if let Some((pdu, mut json_opt)) = self .fetch_and_handle_outliers(origin, &[prev_event_id.clone()], create_event, room_id, room_version_id) .boxed()