diff --git a/src/admin/check/commands.rs b/src/admin/check/commands.rs index a757d504..0a983046 100644 --- a/src/admin/check/commands.rs +++ b/src/admin/check/commands.rs @@ -2,7 +2,7 @@ use conduit::Result; use conduit_macros::implement; use ruma::events::room::message::RoomMessageEventContent; -use crate::{services, Command}; +use crate::Command; /// Uses the iterator in `src/database/key_value/users.rs` to iterator over /// every user in our database (remote and local). Reports total count, any @@ -10,7 +10,7 @@ use crate::{services, Command}; #[implement(Command, params = "<'_>")] pub(super) async fn check_all_users(&self) -> Result { let timer = tokio::time::Instant::now(); - let results = services().users.db.iter(); + let results = self.services.users.db.iter(); let query_time = timer.elapsed(); let users = results.collect::>(); diff --git a/src/admin/handler.rs b/src/admin/handler.rs index 32360c85..a7e4d79a 100644 --- a/src/admin/handler.rs +++ b/src/admin/handler.rs @@ -1,4 +1,4 @@ -use std::{panic::AssertUnwindSafe, time::Instant}; +use std::{panic::AssertUnwindSafe, sync::Arc, time::Instant}; use clap::{CommandFactory, Parser}; use conduit::{error, trace, utils::string::common_prefix, Error, Result}; @@ -17,37 +17,27 @@ use service::{ use crate::{admin, admin::AdminCommand, Command}; -struct Handler { - services: &'static Services, -} +#[must_use] +pub(super) fn complete(line: &str) -> String { complete_command(AdminCommand::command(), line) } #[must_use] -pub(super) fn complete(line: &str) -> String { - Handler { - services: service::services(), - } - .complete_command(AdminCommand::command(), line) +pub(super) fn handle(services: Arc, command: CommandInput) -> HandlerResult { + Box::pin(handle_command(services, command)) } -#[must_use] -pub(super) fn handle(command: CommandInput) -> HandlerResult { Box::pin(handle_command(command)) } - #[tracing::instrument(skip_all, name = "admin")] -async fn handle_command(command: CommandInput) -> CommandResult { - AssertUnwindSafe(Box::pin(process_command(&command))) +async fn handle_command(services: Arc, command: CommandInput) -> CommandResult { + AssertUnwindSafe(Box::pin(process_command(services, &command))) .catch_unwind() .await .map_err(Error::from_panic) .or_else(|error| handle_panic(&error, command)) } -async fn process_command(command: &CommandInput) -> CommandOutput { - Handler { - services: service::services(), - } - .process(&command.command) - .await - .and_then(|content| reply(content, command.reply_id.clone())) +async fn process_command(services: Arc, command: &CommandInput) -> CommandOutput { + process(services, &command.command) + .await + .and_then(|content| reply(content, command.reply_id.clone())) } fn handle_panic(error: &Error, command: CommandInput) -> CommandResult { @@ -68,129 +58,126 @@ fn reply(mut content: RoomMessageEventContent, reply_id: Option) - Some(content) } -impl Handler { - // Parse and process a message from the admin room - async fn process(&self, msg: &str) -> CommandOutput { - let mut lines = msg.lines().filter(|l| !l.trim().is_empty()); - let command = lines.next().expect("each string has at least one line"); - let (parsed, body) = match self.parse_command(command) { - Ok(parsed) => parsed, - Err(error) => { - let server_name = self.services.globals.server_name(); - let message = error.replace("server.name", server_name.as_str()); - return Some(RoomMessageEventContent::notice_markdown(message)); - }, - }; +// Parse and process a message from the admin room +async fn process(services: Arc, msg: &str) -> CommandOutput { + let mut lines = msg.lines().filter(|l| !l.trim().is_empty()); + let command = lines.next().expect("each string has at least one line"); + let (parsed, body) = match parse_command(command) { + Ok(parsed) => parsed, + Err(error) => { + let server_name = services.globals.server_name(); + let message = error.replace("server.name", server_name.as_str()); + return Some(RoomMessageEventContent::notice_markdown(message)); + }, + }; - let timer = Instant::now(); - let body: Vec<&str> = body.iter().map(String::as_str).collect(); - let context = Command { - services: self.services, - body: &body, - }; - let result = Box::pin(admin::process(parsed, &context)).await; - let elapsed = timer.elapsed(); - conduit::debug!(?command, ok = result.is_ok(), "command processed in {elapsed:?}"); - match result { - Ok(reply) => Some(reply), - Err(error) => Some(RoomMessageEventContent::notice_markdown(format!( - "Encountered an error while handling the command:\n```\n{error:#?}\n```" - ))), - } - } - - // Parse chat messages from the admin room into an AdminCommand object - fn parse_command(&self, command_line: &str) -> Result<(AdminCommand, Vec), String> { - let argv = self.parse_line(command_line); - let com = AdminCommand::try_parse_from(&argv).map_err(|error| error.to_string())?; - Ok((com, argv)) - } - - fn complete_command(&self, mut cmd: clap::Command, line: &str) -> String { - let argv = self.parse_line(line); - let mut ret = Vec::::with_capacity(argv.len().saturating_add(1)); - - 'token: for token in argv.into_iter().skip(1) { - let cmd_ = cmd.clone(); - let mut choice = Vec::new(); - - for sub in cmd_.get_subcommands() { - let name = sub.get_name(); - if *name == token { - // token already complete; recurse to subcommand - ret.push(token); - cmd.clone_from(sub); - continue 'token; - } else if name.starts_with(&token) { - // partial match; add to choices - choice.push(name); - } - } - - if choice.len() == 1 { - // One choice. Add extra space because it's complete - let choice = *choice.first().expect("only choice"); - ret.push(choice.to_owned()); - ret.push(String::new()); - } else if choice.is_empty() { - // Nothing found, return original string - ret.push(token); - } else { - // Find the common prefix - ret.push(common_prefix(&choice).into()); - } - - // Return from completion - return ret.join(" "); - } - - // Return from no completion. Needs a space though. - ret.push(String::new()); - ret.join(" ") - } - - // Parse chat messages from the admin room into an AdminCommand object - fn parse_line(&self, command_line: &str) -> Vec { - let mut argv = command_line - .split_whitespace() - .map(str::to_owned) - .collect::>(); - - // Remove any escapes that came with a server-side escape command - if !argv.is_empty() && argv[0].ends_with("admin") { - argv[0] = argv[0].trim_start_matches('\\').into(); - } - - // First indice has to be "admin" but for console convenience we add it here - let server_user = self.services.globals.server_user.as_str(); - if !argv.is_empty() && !argv[0].ends_with("admin") && !argv[0].starts_with(server_user) { - argv.insert(0, "admin".to_owned()); - } - - // Replace `help command` with `command --help` - // Clap has a help subcommand, but it omits the long help description. - if argv.len() > 1 && argv[1] == "help" { - argv.remove(1); - argv.push("--help".to_owned()); - } - - // Backwards compatibility with `register_appservice`-style commands - if argv.len() > 1 && argv[1].contains('_') { - argv[1] = argv[1].replace('_', "-"); - } - - // Backwards compatibility with `register_appservice`-style commands - if argv.len() > 2 && argv[2].contains('_') { - argv[2] = argv[2].replace('_', "-"); - } - - // if the user is using the `query` command (argv[1]), replace the database - // function/table calls with underscores to match the codebase - if argv.len() > 3 && argv[1].eq("query") { - argv[3] = argv[3].replace('_', "-"); - } - - trace!(?command_line, ?argv, "parse"); - argv + let timer = Instant::now(); + let body: Vec<&str> = body.iter().map(String::as_str).collect(); + let context = Command { + services: &services, + body: &body, + }; + let result = Box::pin(admin::process(parsed, &context)).await; + let elapsed = timer.elapsed(); + conduit::debug!(?command, ok = result.is_ok(), "command processed in {elapsed:?}"); + match result { + Ok(reply) => Some(reply), + Err(error) => Some(RoomMessageEventContent::notice_markdown(format!( + "Encountered an error while handling the command:\n```\n{error:#?}\n```" + ))), } } + +// Parse chat messages from the admin room into an AdminCommand object +fn parse_command(command_line: &str) -> Result<(AdminCommand, Vec), String> { + let argv = parse_line(command_line); + let com = AdminCommand::try_parse_from(&argv).map_err(|error| error.to_string())?; + Ok((com, argv)) +} + +fn complete_command(mut cmd: clap::Command, line: &str) -> String { + let argv = parse_line(line); + let mut ret = Vec::::with_capacity(argv.len().saturating_add(1)); + + 'token: for token in argv.into_iter().skip(1) { + let cmd_ = cmd.clone(); + let mut choice = Vec::new(); + + for sub in cmd_.get_subcommands() { + let name = sub.get_name(); + if *name == token { + // token already complete; recurse to subcommand + ret.push(token); + cmd.clone_from(sub); + continue 'token; + } else if name.starts_with(&token) { + // partial match; add to choices + choice.push(name); + } + } + + if choice.len() == 1 { + // One choice. Add extra space because it's complete + let choice = *choice.first().expect("only choice"); + ret.push(choice.to_owned()); + ret.push(String::new()); + } else if choice.is_empty() { + // Nothing found, return original string + ret.push(token); + } else { + // Find the common prefix + ret.push(common_prefix(&choice).into()); + } + + // Return from completion + return ret.join(" "); + } + + // Return from no completion. Needs a space though. + ret.push(String::new()); + ret.join(" ") +} + +// Parse chat messages from the admin room into an AdminCommand object +fn parse_line(command_line: &str) -> Vec { + let mut argv = command_line + .split_whitespace() + .map(str::to_owned) + .collect::>(); + + // Remove any escapes that came with a server-side escape command + if !argv.is_empty() && argv[0].ends_with("admin") { + argv[0] = argv[0].trim_start_matches('\\').into(); + } + + // First indice has to be "admin" but for console convenience we add it here + if !argv.is_empty() && !argv[0].ends_with("admin") && !argv[0].starts_with('@') { + argv.insert(0, "admin".to_owned()); + } + + // Replace `help command` with `command --help` + // Clap has a help subcommand, but it omits the long help description. + if argv.len() > 1 && argv[1] == "help" { + argv.remove(1); + argv.push("--help".to_owned()); + } + + // Backwards compatibility with `register_appservice`-style commands + if argv.len() > 1 && argv[1].contains('_') { + argv[1] = argv[1].replace('_', "-"); + } + + // Backwards compatibility with `register_appservice`-style commands + if argv.len() > 2 && argv[2].contains('_') { + argv[2] = argv[2].replace('_', "-"); + } + + // if the user is using the `query` command (argv[1]), replace the database + // function/table calls with underscores to match the codebase + if argv.len() > 3 && argv[1].eq("query") { + argv[3] = argv[3].replace('_', "-"); + } + + trace!(?command_line, ?argv, "parse"); + argv +} diff --git a/src/admin/mod.rs b/src/admin/mod.rs index 5d4c8f5e..fb1c02be 100644 --- a/src/admin/mod.rs +++ b/src/admin/mod.rs @@ -1,4 +1,4 @@ -#![recursion_limit = "168"] +#![recursion_limit = "192"] #![allow(clippy::wildcard_imports)] #![allow(clippy::enum_glob_use)] @@ -24,7 +24,6 @@ extern crate conduit_service as service; pub(crate) use conduit::Result; pub(crate) use conduit_macros::{admin_command, admin_command_dispatch}; -pub(crate) use service::services; pub(crate) use crate::{ command::Command, @@ -38,26 +37,19 @@ conduit::mod_dtor! {} conduit::rustc_flags_capture! {} /// Install the admin command handler -pub async fn init() { - _ = services() - .admin +pub async fn init(admin_service: &service::admin::Service) { + _ = admin_service .complete .write() .expect("locked for writing") .insert(handler::complete); - _ = services() - .admin - .handle - .write() - .await - .insert(handler::handle); + _ = admin_service.handle.write().await.insert(handler::handle); } /// Uninstall the admin command handler -pub async fn fini() { - _ = services().admin.handle.write().await.take(); - _ = services() - .admin +pub async fn fini(admin_service: &service::admin::Service) { + _ = admin_service.handle.write().await.take(); + _ = admin_service .complete .write() .expect("locked for writing") diff --git a/src/api/client/account.rs b/src/api/client/account.rs index 7c2bb0b6..f093c459 100644 --- a/src/api/client/account.rs +++ b/src/api/client/account.rs @@ -370,7 +370,7 @@ pub(crate) async fn register_route( if let Some(room_id_server_name) = room.server_name() { if let Err(e) = join_room_by_id_helper( - services, + &services, &user_id, room, Some("Automatically joining this room upon registration".to_owned()), @@ -562,11 +562,11 @@ pub(crate) async fn deactivate_route( .rooms_joined(sender_user) .filter_map(Result::ok) .collect(); - super::update_displayname(services, sender_user.clone(), None, all_joined_rooms.clone()).await?; - super::update_avatar_url(services, sender_user.clone(), None, None, all_joined_rooms).await?; + super::update_displayname(&services, sender_user.clone(), None, all_joined_rooms.clone()).await?; + super::update_avatar_url(&services, sender_user.clone(), None, None, all_joined_rooms).await?; // Make the user leave all rooms before deactivation - super::leave_all_rooms(services, sender_user).await; + super::leave_all_rooms(&services, sender_user).await; info!("User {sender_user} deactivated their account."); services diff --git a/src/api/client/alias.rs b/src/api/client/alias.rs index dbc75e64..18d1c5b0 100644 --- a/src/api/client/alias.rs +++ b/src/api/client/alias.rs @@ -107,7 +107,7 @@ pub(crate) async fn get_alias_route( return Err(Error::BadRequest(ErrorKind::NotFound, "Room with alias not found.")); }; - let servers = room_available_servers(services, &room_id, &room_alias, &pre_servers); + let servers = room_available_servers(&services, &room_id, &room_alias, &pre_servers); debug!(?room_alias, ?room_id, "available servers: {servers:?}"); Ok(get_alias::v3::Response::new(room_id, servers)) diff --git a/src/api/client/config.rs b/src/api/client/config.rs index 56d33ba7..61cc97ff 100644 --- a/src/api/client/config.rs +++ b/src/api/client/config.rs @@ -20,7 +20,7 @@ pub(crate) async fn set_global_account_data_route( State(services): State, body: Ruma, ) -> Result { set_account_data( - services, + &services, None, &body.sender_user, &body.event_type.to_string(), @@ -37,7 +37,7 @@ pub(crate) async fn set_room_account_data_route( State(services): State, body: Ruma, ) -> Result { set_account_data( - services, + &services, Some(&body.room_id), &body.sender_user, &body.event_type.to_string(), diff --git a/src/api/client/directory.rs b/src/api/client/directory.rs index cb30b60a..6054bd9c 100644 --- a/src/api/client/directory.rs +++ b/src/api/client/directory.rs @@ -48,7 +48,7 @@ pub(crate) async fn get_public_rooms_filtered_route( } let response = get_public_rooms_filtered_helper( - services, + &services, body.server.as_deref(), body.limit, body.since.as_deref(), @@ -88,7 +88,7 @@ pub(crate) async fn get_public_rooms_route( } let response = get_public_rooms_filtered_helper( - services, + &services, body.server.as_deref(), body.limit, body.since.as_deref(), @@ -124,7 +124,7 @@ pub(crate) async fn set_room_visibility_route( return Err(Error::BadRequest(ErrorKind::NotFound, "Room not found")); } - if !user_can_publish_room(services, sender_user, &body.room_id)? { + if !user_can_publish_room(&services, sender_user, &body.room_id)? { return Err(Error::BadRequest( ErrorKind::forbidden(), "User is not allowed to publish this room", diff --git a/src/api/client/keys.rs b/src/api/client/keys.rs index 8489dde3..6fa7a8ad 100644 --- a/src/api/client/keys.rs +++ b/src/api/client/keys.rs @@ -77,7 +77,7 @@ pub(crate) async fn get_keys_route( let sender_user = body.sender_user.as_ref().expect("user is authenticated"); get_keys_helper( - services, + &services, Some(sender_user), &body.device_keys, |u| u == sender_user, @@ -92,7 +92,7 @@ pub(crate) async fn get_keys_route( pub(crate) async fn claim_keys_route( State(services): State, body: Ruma, ) -> Result { - claim_keys_helper(services, &body.one_time_keys).await + claim_keys_helper(&services, &body.one_time_keys).await } /// # `POST /_matrix/client/r0/keys/device_signing/upload` diff --git a/src/api/client/media.rs b/src/api/client/media.rs index 78463fc6..f0afa290 100644 --- a/src/api/client/media.rs +++ b/src/api/client/media.rs @@ -76,12 +76,12 @@ pub(crate) async fn get_media_preview_route( let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let url = &body.url; - if !url_preview_allowed(services, url) { + if !url_preview_allowed(&services, url) { warn!(%sender_user, "URL is not allowed to be previewed: {url}"); return Err(Error::BadRequest(ErrorKind::forbidden(), "URL is not allowed to be previewed")); } - match get_url_preview(services, url).await { + match get_url_preview(&services, url).await { Ok(preview) => { let res = serde_json::value::to_raw_value(&preview).map_err(|e| { error!(%sender_user, "Failed to convert UrlPreviewData into a serde json value: {e}"); @@ -221,7 +221,7 @@ pub(crate) async fn get_content_route( }) } else if !services.globals.server_is_ours(&body.server_name) && body.allow_remote { let response = get_remote_content( - services, + &services, &mxc, &body.server_name, body.media_id.clone(), @@ -311,7 +311,7 @@ pub(crate) async fn get_content_as_filename_route( }) } else if !services.globals.server_is_ours(&body.server_name) && body.allow_remote { match get_remote_content( - services, + &services, &mxc, &body.server_name, body.media_id.clone(), diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index d3b2d8f6..ca7e6b6f 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -167,7 +167,7 @@ pub(crate) async fn join_room_by_id_route( let sender_user = body.sender_user.as_ref().expect("user is authenticated"); banned_room_check( - services, + &services, sender_user, Some(&body.room_id), body.room_id.server_name(), @@ -202,7 +202,7 @@ pub(crate) async fn join_room_by_id_route( } join_room_by_id_helper( - services, + &services, sender_user, &body.room_id, body.reason.clone(), @@ -231,7 +231,7 @@ pub(crate) async fn join_room_by_id_or_alias_route( let (servers, room_id) = match OwnedRoomId::try_from(body.room_id_or_alias) { Ok(room_id) => { - banned_room_check(services, sender_user, Some(&room_id), room_id.server_name(), client).await?; + banned_room_check(&services, sender_user, Some(&room_id), room_id.server_name(), client).await?; let mut servers = body.server_name.clone(); servers.extend( @@ -270,7 +270,7 @@ pub(crate) async fn join_room_by_id_or_alias_route( .await?; let (room_id, mut pre_servers) = response; - banned_room_check(services, sender_user, Some(&room_id), Some(room_alias.server_name()), client).await?; + banned_room_check(&services, sender_user, Some(&room_id), Some(room_alias.server_name()), client).await?; let mut servers = body.server_name; if let Some(pre_servers) = &mut pre_servers { @@ -303,7 +303,7 @@ pub(crate) async fn join_room_by_id_or_alias_route( }; let join_room_response = join_room_by_id_helper( - services, + &services, sender_user, &room_id, body.reason.clone(), @@ -327,7 +327,7 @@ pub(crate) async fn leave_room_route( ) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - leave_room(services, sender_user, &body.room_id, body.reason.clone()).await?; + leave_room(&services, sender_user, &body.room_id, body.reason.clone()).await?; Ok(leave_room::v3::Response::new()) } @@ -353,13 +353,13 @@ pub(crate) async fn invite_user_route( )); } - banned_room_check(services, sender_user, Some(&body.room_id), body.room_id.server_name(), client).await?; + banned_room_check(&services, sender_user, Some(&body.room_id), body.room_id.server_name(), client).await?; if let invite_user::v3::InvitationRecipient::UserId { user_id, } = &body.recipient { - invite_helper(services, sender_user, user_id, &body.room_id, body.reason.clone(), false).await?; + invite_helper(&services, sender_user, user_id, &body.room_id, body.reason.clone(), false).await?; Ok(invite_user::v3::Response {}) } else { Err(Error::BadRequest(ErrorKind::NotFound, "User not found.")) diff --git a/src/api/client/message.rs b/src/api/client/message.rs index c0b5cf0c..9aae4aaf 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -146,7 +146,7 @@ pub(crate) async fn get_message_events_route( .timeline .pdus_after(sender_user, &body.room_id, from)? .filter_map(Result::ok) // Filter out buggy events - .filter(|(_, pdu)| { contains_url_filter(pdu, &body.filter) && visibility_filter(services, pdu, sender_user, &body.room_id) + .filter(|(_, pdu)| { contains_url_filter(pdu, &body.filter) && visibility_filter(&services, pdu, sender_user, &body.room_id) }) .take_while(|&(k, _)| Some(k) != to) // Stop at `to` @@ -193,7 +193,7 @@ pub(crate) async fn get_message_events_route( .timeline .pdus_until(sender_user, &body.room_id, from)? .filter_map(Result::ok) // Filter out buggy events - .filter(|(_, pdu)| {contains_url_filter(pdu, &body.filter) && visibility_filter(services, pdu, sender_user, &body.room_id)}) + .filter(|(_, pdu)| {contains_url_filter(pdu, &body.filter) && visibility_filter(&services, pdu, sender_user, &body.room_id)}) .take_while(|&(k, _)| Some(k) != to) // Stop at `to` .take(limit) .collect(); diff --git a/src/api/client/profile.rs b/src/api/client/profile.rs index 9e9bcf8e..8f6d9056 100644 --- a/src/api/client/profile.rs +++ b/src/api/client/profile.rs @@ -33,7 +33,7 @@ pub(crate) async fn set_displayname_route( .filter_map(Result::ok) .collect(); - update_displayname(services, sender_user.clone(), body.displayname.clone(), all_joined_rooms).await?; + update_displayname(&services, sender_user.clone(), body.displayname.clone(), all_joined_rooms).await?; if services.globals.allow_local_presence() { // Presence update @@ -118,7 +118,7 @@ pub(crate) async fn set_avatar_url_route( .collect(); update_avatar_url( - services, + &services, sender_user.clone(), body.avatar_url.clone(), body.blurhash.clone(), diff --git a/src/api/client/report.rs b/src/api/client/report.rs index a16df444..dc87fd21 100644 --- a/src/api/client/report.rs +++ b/src/api/client/report.rs @@ -40,7 +40,7 @@ pub(crate) async fn report_event_route( }; is_report_valid( - services, + &services, &pdu.event_id, &body.room_id, sender_user, diff --git a/src/api/client/room.rs b/src/api/client/room.rs index c4d82822..c78ba6ed 100644 --- a/src/api/client/room.rs +++ b/src/api/client/room.rs @@ -79,7 +79,7 @@ pub(crate) async fn create_room_route( } let room_id: OwnedRoomId = if let Some(custom_room_id) = &body.room_id { - custom_room_id_check(services, custom_room_id)? + custom_room_id_check(&services, custom_room_id)? } else { RoomId::new(&services.globals.config.server_name) }; @@ -96,7 +96,7 @@ pub(crate) async fn create_room_route( let state_lock = services.rooms.state.mutex.lock(&room_id).await; let alias: Option = if let Some(alias) = &body.room_alias_name { - Some(room_alias_check(services, alias, &body.appservice_info).await?) + Some(room_alias_check(&services, alias, &body.appservice_info).await?) } else { None }; @@ -438,7 +438,7 @@ pub(crate) async fn create_room_route( // 8. Events implied by invite (and TODO: invite_3pid) drop(state_lock); for user_id in &body.invite { - if let Err(e) = invite_helper(services, sender_user, user_id, &room_id, None, body.is_direct).await { + if let Err(e) = invite_helper(&services, sender_user, user_id, &room_id, None, body.is_direct).await { warn!(%e, "Failed to send invite"); } } diff --git a/src/api/client/state.rs b/src/api/client/state.rs index 7af4f5f9..d0fb83d1 100644 --- a/src/api/client/state.rs +++ b/src/api/client/state.rs @@ -37,7 +37,7 @@ pub(crate) async fn send_state_event_for_key_route( Ok(send_state_event::v3::Response { event_id: send_state_event_for_key_helper( - services, + &services, sender_user, &body.room_id, &body.event_type, diff --git a/src/api/client/sync.rs b/src/api/client/sync.rs index 6eeb8fff..34baf7c1 100644 --- a/src/api/client/sync.rs +++ b/src/api/client/sync.rs @@ -137,7 +137,7 @@ pub(crate) async fn sync_events_route( ); if services.globals.allow_local_presence() { - process_presence_updates(services, &mut presence_updates, since, &sender_user).await?; + process_presence_updates(&services, &mut presence_updates, since, &sender_user).await?; } let all_joined_rooms = services @@ -152,7 +152,7 @@ pub(crate) async fn sync_events_route( for room_id in all_joined_rooms { let room_id = room_id?; if let Ok(joined_room) = load_joined_room( - services, + &services, &sender_user, &sender_device, &room_id, @@ -182,7 +182,7 @@ pub(crate) async fn sync_events_route( .collect(); for result in all_left_rooms { handle_left_room( - services, + &services, since, &result?.0, &sender_user, @@ -1214,7 +1214,7 @@ pub(crate) async fn sync_events_v4_route( match new_membership { MembershipState::Join => { // A new user joined an encrypted room - if !share_encrypted_room(services, &sender_user, &user_id, room_id)? { + if !share_encrypted_room(&services, &sender_user, &user_id, room_id)? { device_list_changes.insert(user_id); } }, @@ -1243,7 +1243,7 @@ pub(crate) async fn sync_events_v4_route( .filter(|user_id| { // Only send keys if the sender doesn't share an encrypted room with the target // already - !share_encrypted_room(services, &sender_user, user_id, room_id).unwrap_or(false) + !share_encrypted_room(&services, &sender_user, user_id, room_id).unwrap_or(false) }), ); } @@ -1407,7 +1407,8 @@ pub(crate) async fn sync_events_v4_route( for (room_id, (required_state_request, timeline_limit, roomsince)) in &todo_rooms { let roomsincecount = PduCount::Normal(*roomsince); - let (timeline_pdus, limited) = load_timeline(services, &sender_user, room_id, roomsincecount, *timeline_limit)?; + let (timeline_pdus, limited) = + load_timeline(&services, &sender_user, room_id, roomsincecount, *timeline_limit)?; if roomsince != &0 && timeline_pdus.is_empty() { continue; diff --git a/src/api/mod.rs b/src/api/mod.rs index c7411b6c..7fa70873 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,4 +1,4 @@ -#![recursion_limit = "160"] +#![recursion_limit = "192"] pub mod client; pub mod router; diff --git a/src/api/router.rs b/src/api/router.rs index 761c173c..d624de32 100644 --- a/src/api/router.rs +++ b/src/api/router.rs @@ -4,6 +4,8 @@ mod handler; mod request; mod response; +use std::sync::Arc; + use axum::{ response::IntoResponse, routing::{any, get, post}, @@ -16,7 +18,7 @@ use self::handler::RouterExt; pub(super) use self::{args::Args as Ruma, response::RumaResponse}; use crate::{client, server}; -pub type State = &'static service::Services; +pub type State = Arc; pub fn build(router: Router, server: &Server) -> Router { let config = &server.config; diff --git a/src/api/router/args.rs b/src/api/router/args.rs index fa5b1e43..a3d09dff 100644 --- a/src/api/router/args.rs +++ b/src/api/router/args.rs @@ -7,7 +7,7 @@ use ruma::{api::IncomingRequest, CanonicalJsonValue, OwnedDeviceId, OwnedServerN use service::Services; use super::{auth, auth::Auth, request, request::Request}; -use crate::service::appservice::RegistrationInfo; +use crate::{service::appservice::RegistrationInfo, State}; /// Extractor for Ruma request structs pub(crate) struct Args { @@ -36,14 +36,13 @@ pub(crate) struct Args { } #[async_trait] -impl FromRequest for Args +impl FromRequest for Args where T: IncomingRequest, { type Rejection = Error; - async fn from_request(request: hyper::Request, _: &S) -> Result { - let services = service::services(); // ??? + async fn from_request(request: hyper::Request, services: &State) -> Result { let mut request = request::from(services, request).await?; let mut json_body = serde_json::from_slice::(&request.body).ok(); let auth = auth::auth(services, &mut request, &json_body, &T::METADATA).await?; diff --git a/src/api/server/make_join.rs b/src/api/server/make_join.rs index e1beaa33..e9b1a6c7 100644 --- a/src/api/server/make_join.rs +++ b/src/api/server/make_join.rs @@ -82,7 +82,7 @@ pub(crate) async fn create_join_event_template_route( .state_cache .is_left(&body.user_id, &body.room_id) .unwrap_or(true)) - && user_can_perform_restricted_join(services, &body.user_id, &body.room_id, &room_version_id)? + && user_can_perform_restricted_join(&services, &body.user_id, &body.room_id, &room_version_id)? { let auth_user = services .rooms diff --git a/src/api/server/publicrooms.rs b/src/api/server/publicrooms.rs index 1876dde1..af8a5846 100644 --- a/src/api/server/publicrooms.rs +++ b/src/api/server/publicrooms.rs @@ -26,7 +26,7 @@ pub(crate) async fn get_public_rooms_filtered_route( } let response = crate::client::get_public_rooms_filtered_helper( - services, + &services, None, body.limit, body.since.as_deref(), @@ -60,7 +60,7 @@ pub(crate) async fn get_public_rooms_route( } let response = crate::client::get_public_rooms_filtered_helper( - services, + &services, None, body.limit, body.since.as_deref(), diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 2f698d33..394289a6 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -62,8 +62,8 @@ pub(crate) async fn send_transaction_message_route( "Starting txn", ); - let resolved_map = handle_pdus(services, &client, &body, origin, &txn_start_time).await?; - handle_edus(services, &client, &body, origin).await?; + let resolved_map = handle_pdus(&services, &client, &body, origin, &txn_start_time).await?; + handle_edus(&services, &client, &body, origin).await?; debug!( pdus = ?body.pdus.len(), diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 4cd29795..17f56383 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -241,7 +241,7 @@ pub(crate) async fn create_join_event_v1_route( } } - let room_state = create_join_event(services, origin, &body.room_id, &body.pdu).await?; + let room_state = create_join_event(&services, origin, &body.room_id, &body.pdu).await?; Ok(create_join_event::v1::Response { room_state, @@ -286,7 +286,7 @@ pub(crate) async fn create_join_event_v2_route( auth_chain, state, event, - } = create_join_event(services, origin, &body.room_id, &body.pdu).await?; + } = create_join_event(&services, origin, &body.room_id, &body.pdu).await?; let room_state = create_join_event::v2::RoomState { members_omitted: false, auth_chain, diff --git a/src/api/server/send_leave.rs b/src/api/server/send_leave.rs index b1b8fec8..ef4c8c45 100644 --- a/src/api/server/send_leave.rs +++ b/src/api/server/send_leave.rs @@ -28,7 +28,7 @@ pub(crate) async fn create_leave_event_v1_route( ) -> Result { let origin = body.origin.as_ref().expect("server is authenticated"); - create_leave_event(services, origin, &body.room_id, &body.pdu).await?; + create_leave_event(&services, origin, &body.room_id, &body.pdu).await?; Ok(create_leave_event::v1::Response::new()) } @@ -41,7 +41,7 @@ pub(crate) async fn create_leave_event_v2_route( ) -> Result { let origin = body.origin.as_ref().expect("server is authenticated"); - create_leave_event(services, origin, &body.room_id, &body.pdu).await?; + create_leave_event(&services, origin, &body.room_id, &body.pdu).await?; Ok(create_leave_event::v2::Response::new()) } diff --git a/src/api/server/user.rs b/src/api/server/user.rs index bd0372e6..e9a400a7 100644 --- a/src/api/server/user.rs +++ b/src/api/server/user.rs @@ -84,7 +84,7 @@ pub(crate) async fn get_keys_route( } let result = get_keys_helper( - services, + &services, None, &body.device_keys, |u| Some(u.server_name()) == body.origin.as_deref(), @@ -116,7 +116,7 @@ pub(crate) async fn claim_keys_route( )); } - let result = claim_keys_helper(services, &body.one_time_keys).await?; + let result = claim_keys_helper(&services, &body.one_time_keys).await?; Ok(claim_keys::v1::Response { one_time_keys: result.one_time_keys, diff --git a/src/database/database.rs b/src/database/database.rs index 44bb655c..1d7bbc33 100644 --- a/src/database/database.rs +++ b/src/database/database.rs @@ -11,12 +11,12 @@ pub struct Database { impl Database { /// Load an existing database or create a new one. - pub async fn open(server: &Arc) -> Result { + pub async fn open(server: &Arc) -> Result> { let db = Engine::open(server)?; - Ok(Self { + Ok(Arc::new(Self { db: db.clone(), map: maps::open(&db)?, - }) + })) } #[inline] diff --git a/src/main/main.rs b/src/main/main.rs index b8cb24ff..8703eef2 100644 --- a/src/main/main.rs +++ b/src/main/main.rs @@ -1,3 +1,5 @@ +#![recursion_limit = "192"] + pub(crate) mod clap; mod mods; mod restart; @@ -57,17 +59,38 @@ fn main() -> Result<(), Error> { async fn async_main(server: &Arc) -> Result<(), Error> { extern crate conduit_router as router; - if let Err(error) = router::start(&server.server).await { - error!("Critical error starting server: {error}"); - return Err(error); - } + match router::start(&server.server).await { + Ok(services) => server.services.lock().await.insert(services), + Err(error) => { + error!("Critical error starting server: {error}"); + return Err(error); + }, + }; - if let Err(error) = router::run(&server.server).await { + if let Err(error) = router::run( + server + .services + .lock() + .await + .as_ref() + .expect("services initialized"), + ) + .await + { error!("Critical error running server: {error}"); return Err(error); } - if let Err(error) = router::stop(&server.server).await { + if let Err(error) = router::stop( + server + .services + .lock() + .await + .take() + .expect("services initialied"), + ) + .await + { error!("Critical error stopping server: {error}"); return Err(error); } diff --git a/src/main/server.rs b/src/main/server.rs index 71cdadce..e435b2f4 100644 --- a/src/main/server.rs +++ b/src/main/server.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use conduit::{config::Config, info, log::Log, utils::sys, Error, Result}; -use tokio::runtime; +use tokio::{runtime, sync::Mutex}; use crate::{clap::Args, tracing::TracingFlameGuard}; @@ -10,6 +10,8 @@ pub(crate) struct Server { /// Server runtime state; public portion pub(crate) server: Arc, + pub(crate) services: Mutex>>, + _tracing_flame_guard: TracingFlameGuard, #[cfg(feature = "sentry_telemetry")] @@ -54,6 +56,8 @@ impl Server { }, )), + services: None.into(), + _tracing_flame_guard: tracing_flame_guard, #[cfg(feature = "sentry_telemetry")] diff --git a/src/router/layers.rs b/src/router/layers.rs index 67342eb3..2b143666 100644 --- a/src/router/layers.rs +++ b/src/router/layers.rs @@ -6,6 +6,7 @@ use axum::{ }; use axum_client_ip::SecureClientIpSource; use conduit::{error, Result, Server}; +use conduit_service::Services; use http::{ header::{self, HeaderName}, HeaderValue, Method, StatusCode, @@ -34,7 +35,8 @@ const CONDUWUIT_CSP: &[&str] = &[ const CONDUWUIT_PERMISSIONS_POLICY: &[&str] = &["interest-cohort=()", "browsing-topics=()"]; -pub(crate) fn build(server: &Arc) -> Result { +pub(crate) fn build(services: &Arc) -> Result { + let server = &services.server; let layers = ServiceBuilder::new(); #[cfg(feature = "sentry_telemetry")] @@ -83,7 +85,7 @@ pub(crate) fn build(server: &Arc) -> Result { .layer(body_limit_layer(server)) .layer(CatchPanicLayer::custom(catch_panic)); - Ok(router::build(server).layer(layers)) + Ok(router::build(services).layer(layers)) } #[cfg(any(feature = "zstd_compression", feature = "gzip_compression", feature = "brotli_compression"))] @@ -151,12 +153,14 @@ fn body_limit_layer(server: &Server) -> DefaultBodyLimit { DefaultBodyLimit::max #[allow(clippy::needless_pass_by_value)] #[tracing::instrument(skip_all, name = "panic")] fn catch_panic(err: Box) -> http::Response> { - conduit_service::services() - .server - .metrics - .requests_panic - .fetch_add(1, std::sync::atomic::Ordering::Release); - + //TODO: XXX + /* + conduit_service::services() + .server + .metrics + .requests_panic + .fetch_add(1, std::sync::atomic::Ordering::Release); + */ let details = if let Some(s) = err.downcast_ref::() { s.clone() } else if let Some(s) = err.downcast_ref::<&str>() { diff --git a/src/router/mod.rs b/src/router/mod.rs index 13fe3908..67ebc0e3 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -11,22 +11,23 @@ extern crate conduit_core as conduit; use std::{future::Future, pin::Pin, sync::Arc}; use conduit::{Result, Server}; +use conduit_service::Services; conduit::mod_ctor! {} conduit::mod_dtor! {} conduit::rustc_flags_capture! {} #[no_mangle] -pub extern "Rust" fn start(server: &Arc) -> Pin> + Send>> { +pub extern "Rust" fn start(server: &Arc) -> Pin>> + Send>> { Box::pin(run::start(server.clone())) } #[no_mangle] -pub extern "Rust" fn stop(server: &Arc) -> Pin> + Send>> { - Box::pin(run::stop(server.clone())) +pub extern "Rust" fn stop(services: Arc) -> Pin> + Send>> { + Box::pin(run::stop(services)) } #[no_mangle] -pub extern "Rust" fn run(server: &Arc) -> Pin> + Send>> { - Box::pin(run::run(server.clone())) +pub extern "Rust" fn run(services: &Arc) -> Pin> + Send>> { + Box::pin(run::run(services.clone())) } diff --git a/src/router/router.rs b/src/router/router.rs index 7c374b47..3527f1e6 100644 --- a/src/router/router.rs +++ b/src/router/router.rs @@ -1,20 +1,20 @@ use std::sync::Arc; use axum::{response::IntoResponse, routing::get, Router}; -use conduit::{Error, Server}; +use conduit::Error; +use conduit_api::State; +use conduit_service::Services; use http::{StatusCode, Uri}; use ruma::api::client::error::ErrorKind; -extern crate conduit_api as api; -extern crate conduit_service as service; +pub(crate) fn build(services: &Arc) -> Router { + let router = Router::::new(); + let state = services.clone(); -pub(crate) fn build(server: &Arc) -> Router { - let router = Router::::new(); - - api::router::build(router, server) + conduit_api::router::build(router, &services.server) .route("/", get(it_works)) .fallback(not_found) - .with_state(service::services()) + .with_state(state) } async fn not_found(_uri: Uri) -> impl IntoResponse { diff --git a/src/router/run.rs b/src/router/run.rs index cb5d2abf..395aa8c4 100644 --- a/src/router/run.rs +++ b/src/router/run.rs @@ -1,28 +1,30 @@ -use std::{sync::Arc, time::Duration}; +extern crate conduit_admin as admin; +extern crate conduit_core as conduit; +extern crate conduit_service as service; + +use std::{ + sync::{atomic::Ordering, Arc}, + time::Duration, +}; use axum_server::Handle as ServerHandle; +use conduit::{debug, debug_error, debug_info, error, info, Error, Result, Server}; +use service::Services; use tokio::{ sync::broadcast::{self, Sender}, task::JoinHandle, }; -extern crate conduit_admin as admin; -extern crate conduit_core as conduit; -extern crate conduit_service as service; - -use std::sync::atomic::Ordering; - -use conduit::{debug, debug_info, error, info, Error, Result, Server}; - use crate::serve; /// Main loop base #[tracing::instrument(skip_all)] -pub(crate) async fn run(server: Arc) -> Result<()> { +pub(crate) async fn run(services: Arc) -> Result<()> { + let server = &services.server; debug!("Start"); // Install the admin room callback here for now - admin::init().await; + admin::init(&services.admin).await; // Setup shutdown/signal handling let handle = ServerHandle::new(); @@ -33,13 +35,13 @@ pub(crate) async fn run(server: Arc) -> Result<()> { let mut listener = server .runtime() - .spawn(serve::serve(server.clone(), handle.clone(), tx.subscribe())); + .spawn(serve::serve(services.clone(), handle.clone(), tx.subscribe())); // Focal point debug!("Running"); let res = tokio::select! { res = &mut listener => res.map_err(Error::from).unwrap_or_else(Err), - res = service::services().poll() => handle_services_poll(&server, res, listener).await, + res = services.poll() => handle_services_poll(server, res, listener).await, }; // Join the signal handler before we leave. @@ -47,7 +49,7 @@ pub(crate) async fn run(server: Arc) -> Result<()> { _ = sigs.await; // Remove the admin room callback - admin::fini().await; + admin::fini(&services.admin).await; debug_info!("Finish"); res @@ -55,26 +57,33 @@ pub(crate) async fn run(server: Arc) -> Result<()> { /// Async initializations #[tracing::instrument(skip_all)] -pub(crate) async fn start(server: Arc) -> Result<()> { +pub(crate) async fn start(server: Arc) -> Result> { debug!("Starting..."); - service::start(&server).await?; + let services = Services::build(server).await?.start().await?; #[cfg(feature = "systemd")] sd_notify::notify(true, &[sd_notify::NotifyState::Ready]).expect("failed to notify systemd of ready state"); debug!("Started"); - Ok(()) + Ok(services) } /// Async destructions #[tracing::instrument(skip_all)] -pub(crate) async fn stop(_server: Arc) -> Result<()> { +pub(crate) async fn stop(services: Arc) -> Result<()> { debug!("Shutting down..."); // Wait for all completions before dropping or we'll lose them to the module // unload and explode. - service::stop().await; + services.stop().await; + + if let Err(services) = Arc::try_unwrap(services) { + debug_error!( + "{} dangling references to Services after shutdown", + Arc::strong_count(&services) + ); + } debug!("Cleaning up..."); diff --git a/src/router/serve/mod.rs b/src/router/serve/mod.rs index 4e923444..58bf2de8 100644 --- a/src/router/serve/mod.rs +++ b/src/router/serve/mod.rs @@ -5,22 +5,26 @@ mod unix; use std::sync::Arc; use axum_server::Handle as ServerHandle; -use conduit::{Result, Server}; +use conduit::Result; +use conduit_service::Services; use tokio::sync::broadcast; -use crate::layers; +use super::layers; /// Serve clients -pub(super) async fn serve(server: Arc, handle: ServerHandle, shutdown: broadcast::Receiver<()>) -> Result<()> { +pub(super) async fn serve( + services: Arc, handle: ServerHandle, shutdown: broadcast::Receiver<()>, +) -> Result<()> { + let server = &services.server; let config = &server.config; let addrs = config.get_bind_addrs(); - let app = layers::build(&server)?; + let app = layers::build(&services)?; if cfg!(unix) && config.unix_socket_path.is_some() { - unix::serve(&server, app, shutdown).await + unix::serve(server, app, shutdown).await } else if config.tls.is_some() { - tls::serve(&server, app, handle, addrs).await + tls::serve(server, app, handle, addrs).await } else { - plain::serve(&server, app, handle, addrs).await + plain::serve(server, app, handle, addrs).await } } diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 6241c668..71f3b73e 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -9,7 +9,7 @@ use std::{ }; use async_trait::async_trait; -use conduit::{debug, error, error::default_log, pdu::PduBuilder, Error, PduEvent, Result, Server}; +use conduit::{debug, error, error::default_log, pdu::PduBuilder, Err, Error, PduEvent, Result, Server}; pub use create::create_admin_room; use loole::{Receiver, Sender}; use ruma::{ @@ -41,6 +41,7 @@ struct Services { timeline: Dep, state: Dep, state_cache: Dep, + services: StdRwLock>>, } #[derive(Debug)] @@ -50,7 +51,7 @@ pub struct CommandInput { } pub type Completer = fn(&str) -> String; -pub type Handler = fn(CommandInput) -> HandlerResult; +pub type Handler = fn(Arc, CommandInput) -> HandlerResult; pub type HandlerResult = Pin + Send>>; pub type CommandResult = Result; pub type CommandOutput = Option; @@ -69,6 +70,7 @@ impl crate::Service for Service { timeline: args.depend::("rooms::timeline"), state: args.depend::("rooms::state"), state_cache: args.depend::("rooms::state_cache"), + services: None.into(), }, sender, receiver: Mutex::new(receiver), @@ -172,10 +174,14 @@ impl Service { } async fn process_command(&self, command: CommandInput) -> CommandResult { + let Some(services) = self.services.services.read().expect("locked").clone() else { + return Err!("Services self-reference not initialized."); + }; + if let Some(handle) = self.handle.read().await.as_ref() { - handle(command).await + handle(services, command).await } else { - Err(Error::Err("Admin module is not loaded.".into())) + Err!("Admin module is not loaded.") } } @@ -356,4 +362,10 @@ impl Service { #[cfg(feature = "console")] self.console.close().await; } + + /// Sets the self-reference to crate::Services which will provide context to + /// the admin commands. + pub(super) fn set_services(&self, services: Option>) { + *self.services.services.write().expect("locked for writing") = services; + } } diff --git a/src/service/mod.rs b/src/service/mod.rs index b6ec58b5..084bfac8 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,4 +1,4 @@ -#![recursion_limit = "160"] +#![recursion_limit = "192"] #![allow(refining_impl_trait)] mod manager; @@ -26,11 +26,7 @@ pub mod users; extern crate conduit_core as conduit; extern crate conduit_database as database; -use std::sync::{Arc, RwLock}; - pub use conduit::{pdu, PduBuilder, PduCount, PduEvent}; -use conduit::{Result, Server}; -use database::Database; pub(crate) use service::{Args, Dep, Service}; pub use crate::services::Services; @@ -38,50 +34,3 @@ pub use crate::services::Services; conduit::mod_ctor! {} conduit::mod_dtor! {} conduit::rustc_flags_capture! {} - -static SERVICES: RwLock> = RwLock::new(None); - -pub async fn start(server: &Arc) -> Result<()> { - let d = Arc::new(Database::open(server).await?); - let s = Box::new(Services::build(server.clone(), d)?); - _ = SERVICES.write().expect("write locked").insert(Box::leak(s)); - - services().start().await -} - -pub async fn stop() { - services().stop().await; - - // Deactivate services(). Any further use will panic the caller. - let s = SERVICES - .write() - .expect("write locked") - .take() - .expect("services initialized"); - - let s: *mut Services = std::ptr::from_ref(s).cast_mut(); - //SAFETY: Services was instantiated in init() and leaked into the SERVICES - // global perusing as 'static for the duration of service. Now we reclaim - // it to drop it before unloading the module. If this is not done there wil - // be multiple instances after module reload. - let s = unsafe { Box::from_raw(s) }; - - // Drop it so we encounter any trouble before the infolog message - drop(s); -} - -#[must_use] -pub fn services() -> &'static Services { - SERVICES - .read() - .expect("SERVICES locked for reading") - .expect("SERVICES initialized with Services instance") -} - -#[inline] -pub fn available() -> bool { - SERVICES - .read() - .expect("SERVICES locked for reading") - .is_some() -} diff --git a/src/service/services.rs b/src/service/services.rs index 59909f8c..b283db6c 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -44,7 +44,8 @@ pub struct Services { impl Services { #[allow(clippy::cognitive_complexity)] - pub fn build(server: Arc, db: Arc) -> Result { + pub async fn build(server: Arc) -> Result> { + let db = Database::open(&server).await?; let service: Arc = Arc::new(RwLock::new(BTreeMap::new())); macro_rules! build { ($tyname:ty) => {{ @@ -58,7 +59,7 @@ impl Services { }}; } - Ok(Self { + Ok(Arc::new(Self { account_data: build!(account_data::Service), admin: build!(admin::Service), appservice: build!(appservice::Service), @@ -102,12 +103,13 @@ impl Services { service, server, db, - }) + })) } - pub(super) async fn start(&self) -> Result<()> { + pub async fn start(self: &Arc) -> Result> { debug_info!("Starting services..."); + self.admin.set_services(Some(Arc::clone(self))); globals::migrations::migrations(self).await?; self.manager .lock() @@ -118,10 +120,10 @@ impl Services { .await?; debug_info!("Services startup complete."); - Ok(()) + Ok(Arc::clone(self)) } - pub(super) async fn stop(&self) { + pub async fn stop(&self) { info!("Shutting down services..."); self.interrupt(); @@ -129,6 +131,8 @@ impl Services { manager.stop().await; } + self.admin.set_services(None); + debug_info!("Services shutdown complete."); }