additional interruption points to hasten shutdown

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-12-22 22:59:43 +00:00 committed by strawberry
parent 03f2ac9caf
commit e5a1309583
4 changed files with 11 additions and 3 deletions

View file

@ -43,7 +43,7 @@ pub(super) async fn serve(
let app = app.into_make_service_with_connect_info::<net::SocketAddr>(); let app = app.into_make_service_with_connect_info::<net::SocketAddr>();
let builder = server::conn::auto::Builder::new(executor); let builder = server::conn::auto::Builder::new(executor);
let listener = init(server).await?; let listener = init(server).await?;
loop { while server.running() {
let app = app.clone(); let app = app.clone();
let builder = builder.clone(); let builder = builder.clone();
tokio::select! { tokio::select! {

View file

@ -232,11 +232,12 @@ impl super::Service {
#[tracing::instrument(skip_all, name = "well-known")] #[tracing::instrument(skip_all, name = "well-known")]
async fn request_well_known(&self, dest: &str) -> Result<Option<String>> { async fn request_well_known(&self, dest: &str) -> Result<Option<String>> {
trace!("Requesting well known for {dest}");
if !self.has_cached_override(dest) { if !self.has_cached_override(dest) {
self.query_and_cache_override(dest, dest, 8448).await?; self.query_and_cache_override(dest, dest, 8448).await?;
} }
self.services.server.check_running()?;
trace!("Requesting well known for {dest}");
let response = self let response = self
.services .services
.client .client
@ -304,6 +305,9 @@ impl super::Service {
hostname: &'_ str, hostname: &'_ str,
port: u16, port: u16,
) -> Result<()> { ) -> Result<()> {
self.services.server.check_running()?;
debug!("querying IP for {overname:?} ({hostname:?}:{port})");
match self.resolver.resolver.lookup_ip(hostname.to_owned()).await { match self.resolver.resolver.lookup_ip(hostname.to_owned()).await {
| Err(e) => Self::handle_resolve_error(&e, hostname), | Err(e) => Self::handle_resolve_error(&e, hostname),
| Ok(override_ip) => { | Ok(override_ip) => {
@ -328,6 +332,8 @@ impl super::Service {
[format!("_matrix-fed._tcp.{hostname}."), format!("_matrix._tcp.{hostname}.")]; [format!("_matrix-fed._tcp.{hostname}."), format!("_matrix._tcp.{hostname}.")];
for hostname in hostnames { for hostname in hostnames {
self.services.server.check_running()?;
debug!("querying SRV for {hostname:?}"); debug!("querying SRV for {hostname:?}");
let hostname = hostname.trim_end_matches('.'); let hostname = hostname.trim_end_matches('.');
match self.resolver.resolver.srv_lookup(hostname).await { match self.resolver.resolver.srv_lookup(hostname).await {

View file

@ -93,6 +93,7 @@ impl super::Service {
let request = Request::try_from(request)?; let request = Request::try_from(request)?;
self.validate_url(request.url())?; self.validate_url(request.url())?;
self.server.check_running()?;
Ok(request) Ok(request)
} }

View file

@ -105,7 +105,8 @@ impl Service {
.get(id) .get(id)
.map(|(_, receiver)| receiver.clone()) .map(|(_, receiver)| receiver.clone())
.expect("Missing channel for sender worker"); .expect("Missing channel for sender worker");
loop {
while !receiver.is_closed() {
tokio::select! { tokio::select! {
Some(response) = futures.next() => { Some(response) = futures.next() => {
self.handle_response(response, futures, statuses).await; self.handle_response(response, futures, statuses).await;