From 0cf368a327627a946c3a536530a90e94ee042fe7 Mon Sep 17 00:00:00 2001 From: strawberry Date: Sat, 20 Apr 2024 19:13:18 -0400 Subject: [PATCH] refactor a ton of the admin room code (50% done) Signed-off-by: strawberry --- src/service/admin/appservice.rs | 100 ----- .../admin/appservice/appservice_command.rs | 66 ++++ src/service/admin/appservice/mod.rs | 52 +++ src/service/admin/debug/debug.rs | 357 ------------------ src/service/admin/debug/debug_commands.rs | 339 +++++++++++++++++ src/service/admin/debug/mod.rs | 38 +- src/service/admin/fsck.rs | 5 +- src/service/admin/mod.rs | 4 +- src/service/admin/query/account_data.rs | 2 +- src/service/admin/query/appservice.rs | 2 +- src/service/admin/query/globals.rs | 2 +- src/service/admin/query/mod.rs | 14 +- src/service/admin/query/presence.rs | 2 +- src/service/admin/query/room_alias.rs | 2 +- src/service/admin/server.rs | 106 ------ src/service/admin/server/mod.rs | 58 +++ src/service/admin/server/server_commands.rs | 71 ++++ src/service/admin/user/mod.rs | 34 +- src/service/admin/user/user.rs | 349 ----------------- src/service/admin/user/user_commands.rs | 334 ++++++++++++++++ 20 files changed, 1002 insertions(+), 935 deletions(-) delete mode 100644 src/service/admin/appservice.rs create mode 100644 src/service/admin/appservice/appservice_command.rs create mode 100644 src/service/admin/appservice/mod.rs delete mode 100644 src/service/admin/debug/debug.rs create mode 100644 src/service/admin/debug/debug_commands.rs delete mode 100644 src/service/admin/server.rs create mode 100644 src/service/admin/server/mod.rs create mode 100644 src/service/admin/server/server_commands.rs delete mode 100644 src/service/admin/user/user.rs create mode 100644 src/service/admin/user/user_commands.rs diff --git a/src/service/admin/appservice.rs b/src/service/admin/appservice.rs deleted file mode 100644 index ff0611e0..00000000 --- a/src/service/admin/appservice.rs +++ /dev/null @@ -1,100 +0,0 @@ -use clap::Subcommand; -use ruma::{api::appservice::Registration, events::room::message::RoomMessageEventContent}; - -use crate::{service::admin::escape_html, services, Result}; - -#[cfg_attr(test, derive(Debug))] -#[derive(Subcommand)] -pub(crate) enum AppserviceCommand { - /// - Register an appservice using its registration YAML - /// - /// This command needs a YAML generated by an appservice (such as a bridge), - /// which must be provided in a Markdown code block below the command. - /// - /// Registering a new bridge using the ID of an existing bridge will replace - /// the old one. - Register, - - /// - Unregister an appservice using its ID - /// - /// You can find the ID using the `list-appservices` command. - Unregister { - /// The appservice to unregister - appservice_identifier: String, - }, - - /// - Show an appservice's config using its ID - /// - /// You can find the ID using the `list-appservices` command. - Show { - /// The appservice to show - appservice_identifier: String, - }, - - /// - List all the currently registered appservices - List, -} - -pub(crate) async fn process(command: AppserviceCommand, body: Vec<&str>) -> Result { - match command { - AppserviceCommand::Register => { - if body.len() > 2 && body[0].trim().starts_with("```") && body.last().unwrap().trim() == "```" { - let appservice_config = body[1..body.len() - 1].join("\n"); - let parsed_config = serde_yaml::from_str::(&appservice_config); - match parsed_config { - Ok(yaml) => match services().appservice.register_appservice(yaml).await { - Ok(id) => Ok(RoomMessageEventContent::text_plain(format!( - "Appservice registered with ID: {id}." - ))), - Err(e) => Ok(RoomMessageEventContent::text_plain(format!( - "Failed to register appservice: {e}" - ))), - }, - Err(e) => Ok(RoomMessageEventContent::text_plain(format!( - "Could not parse appservice config: {e}" - ))), - } - } else { - Ok(RoomMessageEventContent::text_plain( - "Expected code block in command body. Add --help for details.", - )) - } - }, - AppserviceCommand::Unregister { - appservice_identifier, - } => match services() - .appservice - .unregister_appservice(&appservice_identifier) - .await - { - Ok(()) => Ok(RoomMessageEventContent::text_plain("Appservice unregistered.")), - Err(e) => Ok(RoomMessageEventContent::text_plain(format!( - "Failed to unregister appservice: {e}" - ))), - }, - AppserviceCommand::Show { - appservice_identifier, - } => match services() - .appservice - .get_registration(&appservice_identifier) - .await - { - Some(config) => { - let config_str = serde_yaml::to_string(&config).expect("config should've been validated on register"); - let output = format!("Config for {}:\n\n```yaml\n{}\n```", appservice_identifier, config_str,); - let output_html = format!( - "Config for {}:\n\n
{}
", - escape_html(&appservice_identifier), - escape_html(&config_str), - ); - Ok(RoomMessageEventContent::text_html(output, output_html)) - }, - None => Ok(RoomMessageEventContent::text_plain("Appservice does not exist.")), - }, - AppserviceCommand::List => { - let appservices = services().appservice.iter_ids().await; - let output = format!("Appservices ({}): {}", appservices.len(), appservices.join(", ")); - Ok(RoomMessageEventContent::text_plain(output)) - }, - } -} diff --git a/src/service/admin/appservice/appservice_command.rs b/src/service/admin/appservice/appservice_command.rs new file mode 100644 index 00000000..e2c47a50 --- /dev/null +++ b/src/service/admin/appservice/appservice_command.rs @@ -0,0 +1,66 @@ +use ruma::{api::appservice::Registration, events::room::message::RoomMessageEventContent}; + +use crate::{service::admin::escape_html, services, Result}; + +pub(super) async fn register(body: Vec<&str>) -> Result { + if body.len() > 2 && body[0].trim().starts_with("```") && body.last().unwrap().trim() == "```" { + let appservice_config = body[1..body.len() - 1].join("\n"); + let parsed_config = serde_yaml::from_str::(&appservice_config); + match parsed_config { + Ok(yaml) => match services().appservice.register_appservice(yaml).await { + Ok(id) => Ok(RoomMessageEventContent::text_plain(format!( + "Appservice registered with ID: {id}." + ))), + Err(e) => Ok(RoomMessageEventContent::text_plain(format!( + "Failed to register appservice: {e}" + ))), + }, + Err(e) => Ok(RoomMessageEventContent::text_plain(format!( + "Could not parse appservice config: {e}" + ))), + } + } else { + Ok(RoomMessageEventContent::text_plain( + "Expected code block in command body. Add --help for details.", + )) + } +} + +pub(super) async fn unregister(_body: Vec<&str>, appservice_identifier: String) -> Result { + match services() + .appservice + .unregister_appservice(&appservice_identifier) + .await + { + Ok(()) => Ok(RoomMessageEventContent::text_plain("Appservice unregistered.")), + Err(e) => Ok(RoomMessageEventContent::text_plain(format!( + "Failed to unregister appservice: {e}" + ))), + } +} + +pub(super) async fn show(_body: Vec<&str>, appservice_identifier: String) -> Result { + match services() + .appservice + .get_registration(&appservice_identifier) + .await + { + Some(config) => { + let config_str = serde_yaml::to_string(&config).expect("config should've been validated on register"); + let output = format!("Config for {}:\n\n```yaml\n{}\n```", appservice_identifier, config_str,); + let output_html = format!( + "Config for {}:\n\n
{}
", + escape_html(&appservice_identifier), + escape_html(&config_str), + ); + Ok(RoomMessageEventContent::text_html(output, output_html)) + }, + None => Ok(RoomMessageEventContent::text_plain("Appservice does not exist.")), + } +} + +pub(super) async fn list(_body: Vec<&str>) -> Result { + let appservices = services().appservice.iter_ids().await; + let output = format!("Appservices ({}): {}", appservices.len(), appservices.join(", ")); + Ok(RoomMessageEventContent::text_plain(output)) +} diff --git a/src/service/admin/appservice/mod.rs b/src/service/admin/appservice/mod.rs new file mode 100644 index 00000000..b0d225aa --- /dev/null +++ b/src/service/admin/appservice/mod.rs @@ -0,0 +1,52 @@ +use clap::Subcommand; +use ruma::events::room::message::RoomMessageEventContent; + +use self::appservice_command::{list, register, show, unregister}; +use crate::Result; + +pub(crate) mod appservice_command; + +#[cfg_attr(test, derive(Debug))] +#[derive(Subcommand)] +pub(crate) enum AppserviceCommand { + /// - Register an appservice using its registration YAML + /// + /// This command needs a YAML generated by an appservice (such as a bridge), + /// which must be provided in a Markdown code block below the command. + /// + /// Registering a new bridge using the ID of an existing bridge will replace + /// the old one. + Register, + + /// - Unregister an appservice using its ID + /// + /// You can find the ID using the `list-appservices` command. + Unregister { + /// The appservice to unregister + appservice_identifier: String, + }, + + /// - Show an appservice's config using its ID + /// + /// You can find the ID using the `list-appservices` command. + Show { + /// The appservice to show + appservice_identifier: String, + }, + + /// - List all the currently registered appservices + List, +} + +pub(crate) async fn process(command: AppserviceCommand, body: Vec<&str>) -> Result { + Ok(match command { + AppserviceCommand::Register => register(body).await?, + AppserviceCommand::Unregister { + appservice_identifier, + } => unregister(body, appservice_identifier).await?, + AppserviceCommand::Show { + appservice_identifier, + } => show(body, appservice_identifier).await?, + AppserviceCommand::List => list(body).await?, + }) +} diff --git a/src/service/admin/debug/debug.rs b/src/service/admin/debug/debug.rs deleted file mode 100644 index 0f77b2fd..00000000 --- a/src/service/admin/debug/debug.rs +++ /dev/null @@ -1,357 +0,0 @@ -use std::{collections::BTreeMap, sync::Arc, time::Instant}; - -use ruma::{ - api::client::error::ErrorKind, events::room::message::RoomMessageEventContent, CanonicalJsonObject, EventId, - RoomId, RoomVersionId, -}; -use tokio::sync::RwLock; -use tracing::{debug, error, info, warn}; -use tracing_subscriber::EnvFilter; - -use super::DebugCommand; -use crate::{api::server_server::parse_incoming_pdu, services, utils::HtmlEscape, Error, PduEvent, Result}; - -pub(crate) async fn process(command: DebugCommand, body: Vec<&str>) -> Result { - Ok(match command { - DebugCommand::GetAuthChain { - event_id, - } => { - let event_id = Arc::::from(event_id); - if let Some(event) = services().rooms.timeline.get_pdu_json(&event_id)? { - let room_id_str = event - .get("room_id") - .and_then(|val| val.as_str()) - .ok_or_else(|| Error::bad_database("Invalid event in database"))?; - - let room_id = <&RoomId>::try_from(room_id_str) - .map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; - let start = Instant::now(); - let count = services() - .rooms - .auth_chain - .event_ids_iter(room_id, vec![event_id]) - .await? - .count(); - let elapsed = start.elapsed(); - RoomMessageEventContent::text_plain(format!("Loaded auth chain with length {count} in {elapsed:?}")) - } else { - RoomMessageEventContent::text_plain("Event not found.") - } - }, - DebugCommand::ParsePdu => { - if body.len() > 2 && body[0].trim().starts_with("```") && body.last().unwrap().trim() == "```" { - let string = body[1..body.len() - 1].join("\n"); - match serde_json::from_str(&string) { - Ok(value) => match ruma::signatures::reference_hash(&value, &RoomVersionId::V6) { - Ok(hash) => { - let event_id = EventId::parse(format!("${hash}")); - - match serde_json::from_value::( - serde_json::to_value(value).expect("value is json"), - ) { - Ok(pdu) => { - RoomMessageEventContent::text_plain(format!("EventId: {event_id:?}\n{pdu:#?}")) - }, - Err(e) => RoomMessageEventContent::text_plain(format!( - "EventId: {event_id:?}\nCould not parse event: {e}" - )), - } - }, - Err(e) => RoomMessageEventContent::text_plain(format!("Could not parse PDU JSON: {e:?}")), - }, - Err(e) => RoomMessageEventContent::text_plain(format!("Invalid json in command body: {e}")), - } - } else { - RoomMessageEventContent::text_plain("Expected code block in command body.") - } - }, - DebugCommand::GetPdu { - event_id, - } => { - let mut outlier = false; - let mut pdu_json = services() - .rooms - .timeline - .get_non_outlier_pdu_json(&event_id)?; - if pdu_json.is_none() { - outlier = true; - pdu_json = services().rooms.timeline.get_pdu_json(&event_id)?; - } - match pdu_json { - Some(json) => { - let json_text = serde_json::to_string_pretty(&json).expect("canonical json is valid json"); - return Ok(RoomMessageEventContent::text_html( - format!( - "{}\n```json\n{}\n```", - if outlier { - "Outlier PDU found in our database" - } else { - "PDU found in our database" - }, - json_text - ), - format!( - "

{}

\n
{}\n
\n", - if outlier { - "Outlier PDU found in our database" - } else { - "PDU found in our database" - }, - HtmlEscape(&json_text) - ), - )); - }, - None => { - return Ok(RoomMessageEventContent::text_plain("PDU not found locally.")); - }, - } - }, - DebugCommand::GetRemotePdu { - event_id, - server, - } => { - if !services().globals.config.allow_federation { - return Ok(RoomMessageEventContent::text_plain( - "Federation is disabled on this homeserver.", - )); - } - - if server == services().globals.server_name() { - return Ok(RoomMessageEventContent::text_plain( - "Not allowed to send federation requests to ourselves. Please use `get-pdu` for fetching local \ - PDUs.", - )); - } - - // TODO: use Futures as some requests may take a while so we dont block the - // admin room - match services() - .sending - .send_federation_request( - &server, - ruma::api::federation::event::get_event::v1::Request { - event_id: event_id.clone().into(), - }, - ) - .await - { - Ok(response) => { - let json: CanonicalJsonObject = serde_json::from_str(response.pdu.get()).map_err(|e| { - warn!( - "Requested event ID {event_id} from server but failed to convert from RawValue to \ - CanonicalJsonObject (malformed event/response?): {e}" - ); - Error::BadRequest(ErrorKind::Unknown, "Received response from server but failed to parse PDU") - })?; - - debug!("Attempting to parse PDU: {:?}", &response.pdu); - let parsed_pdu = { - let parsed_result = parse_incoming_pdu(&response.pdu); - let (event_id, value, room_id) = match parsed_result { - Ok(t) => t, - Err(e) => { - warn!("Failed to parse PDU: {e}"); - info!("Full PDU: {:?}", &response.pdu); - return Ok(RoomMessageEventContent::text_plain(format!( - "Failed to parse PDU remote server {server} sent us: {e}" - ))); - }, - }; - - vec![(event_id, value, room_id)] - }; - - let pub_key_map = RwLock::new(BTreeMap::new()); - - debug!("Attempting to fetch homeserver signing keys for {server}"); - services() - .rooms - .event_handler - .fetch_required_signing_keys( - parsed_pdu.iter().map(|(_event_id, event, _room_id)| event), - &pub_key_map, - ) - .await - .unwrap_or_else(|e| { - warn!("Could not fetch all signatures for PDUs from {server}: {e:?}"); - }); - - info!("Attempting to handle event ID {event_id} as backfilled PDU"); - services() - .rooms - .timeline - .backfill_pdu(&server, response.pdu, &pub_key_map) - .await?; - - let json_text = serde_json::to_string_pretty(&json).expect("canonical json is valid json"); - - return Ok(RoomMessageEventContent::text_html( - format!( - "{}\n```json\n{}\n```", - "Got PDU from specified server and handled as backfilled PDU successfully. Event body:", - json_text - ), - format!( - "

{}

\n
{}\n
\n", - "Got PDU from specified server and handled as backfilled PDU successfully. Event body:", - HtmlEscape(&json_text) - ), - )); - }, - Err(e) => { - return Ok(RoomMessageEventContent::text_plain(format!( - "Remote server did not have PDU or failed sending request to remote server: {e}" - ))); - }, - } - }, - DebugCommand::GetRoomState { - room_id, - } => { - let room_state = services() - .rooms - .state_accessor - .room_state_full(&room_id) - .await? - .values() - .map(|pdu| pdu.to_state_event()) - .collect::>(); - - if room_state.is_empty() { - return Ok(RoomMessageEventContent::text_plain( - "Unable to find room state in our database (vector is empty)", - )); - } - - let json_text = serde_json::to_string_pretty(&room_state).map_err(|e| { - error!("Failed converting room state vector in our database to pretty JSON: {e}"); - Error::bad_database( - "Failed to convert room state events to pretty JSON, possible invalid room state events in our \ - database", - ) - })?; - - return Ok(RoomMessageEventContent::text_html( - format!("{}\n```json\n{}\n```", "Found full room state", json_text), - format!( - "

{}

\n
{}\n
\n", - "Found full room state", - HtmlEscape(&json_text) - ), - )); - }, - DebugCommand::Ping { - server, - } => { - if server == services().globals.server_name() { - return Ok(RoomMessageEventContent::text_plain( - "Not allowed to send federation requests to ourselves.", - )); - } - - let timer = tokio::time::Instant::now(); - - match services() - .sending - .send_federation_request(&server, ruma::api::federation::discovery::get_server_version::v1::Request {}) - .await - { - Ok(response) => { - let ping_time = timer.elapsed(); - - let json_text_res = serde_json::to_string_pretty(&response.server); - - if let Ok(json) = json_text_res { - return Ok(RoomMessageEventContent::text_html( - format!("Got response which took {ping_time:?} time:\n```json\n{json}\n```"), - format!( - "

Got response which took {ping_time:?} time:

\n
{}\n
\n", - HtmlEscape(&json) - ), - )); - } - - return Ok(RoomMessageEventContent::text_plain(format!( - "Got non-JSON response which took {ping_time:?} time:\n{0:?}", - response - ))); - }, - Err(e) => { - error!("Failed sending federation request to specified server from ping debug command: {e}"); - return Ok(RoomMessageEventContent::text_plain(format!( - "Failed sending federation request to specified server:\n\n{e}", - ))); - }, - } - }, - DebugCommand::ForceDeviceListUpdates => { - // Force E2EE device list updates for all users - for user_id in services().users.iter().filter_map(Result::ok) { - services().users.mark_device_key_update(&user_id)?; - } - RoomMessageEventContent::text_plain("Marked all devices for all users as having new keys to update") - }, - DebugCommand::ChangeLogLevel { - filter, - reset, - } => { - if reset { - let old_filter_layer = match EnvFilter::try_new(&services().globals.config.log) { - Ok(s) => s, - Err(e) => { - return Ok(RoomMessageEventContent::text_plain(format!( - "Log level from config appears to be invalid now: {e}" - ))); - }, - }; - - match services() - .globals - .tracing_reload_handle - .modify(|filter| *filter = old_filter_layer) - { - Ok(()) => { - return Ok(RoomMessageEventContent::text_plain(format!( - "Successfully changed log level back to config value {}", - services().globals.config.log - ))); - }, - Err(e) => { - return Ok(RoomMessageEventContent::text_plain(format!( - "Failed to modify and reload the global tracing log level: {e}" - ))); - }, - } - } - - if let Some(filter) = filter { - let new_filter_layer = match EnvFilter::try_new(filter) { - Ok(s) => s, - Err(e) => { - return Ok(RoomMessageEventContent::text_plain(format!( - "Invalid log level filter specified: {e}" - ))); - }, - }; - - match services() - .globals - .tracing_reload_handle - .modify(|filter| *filter = new_filter_layer) - { - Ok(()) => { - return Ok(RoomMessageEventContent::text_plain("Successfully changed log level")); - }, - Err(e) => { - return Ok(RoomMessageEventContent::text_plain(format!( - "Failed to modify and reload the global tracing log level: {e}" - ))); - }, - } - } - - return Ok(RoomMessageEventContent::text_plain("No log level was specified.")); - }, - }) -} diff --git a/src/service/admin/debug/debug_commands.rs b/src/service/admin/debug/debug_commands.rs new file mode 100644 index 00000000..870016f3 --- /dev/null +++ b/src/service/admin/debug/debug_commands.rs @@ -0,0 +1,339 @@ +use std::{collections::BTreeMap, sync::Arc, time::Instant}; + +use ruma::{ + api::client::error::ErrorKind, events::room::message::RoomMessageEventContent, CanonicalJsonObject, EventId, + RoomId, RoomVersionId, ServerName, +}; +use tokio::sync::RwLock; +use tracing::{debug, error, info, warn}; +use tracing_subscriber::EnvFilter; + +use crate::{api::server_server::parse_incoming_pdu, services, utils::HtmlEscape, Error, PduEvent, Result}; + +pub(super) async fn get_auth_chain(_body: Vec<&str>, event_id: Box) -> Result { + let event_id = Arc::::from(event_id); + if let Some(event) = services().rooms.timeline.get_pdu_json(&event_id)? { + let room_id_str = event + .get("room_id") + .and_then(|val| val.as_str()) + .ok_or_else(|| Error::bad_database("Invalid event in database"))?; + + let room_id = <&RoomId>::try_from(room_id_str) + .map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; + let start = Instant::now(); + let count = services() + .rooms + .auth_chain + .event_ids_iter(room_id, vec![event_id]) + .await? + .count(); + let elapsed = start.elapsed(); + Ok(RoomMessageEventContent::text_plain(format!( + "Loaded auth chain with length {count} in {elapsed:?}" + ))) + } else { + Ok(RoomMessageEventContent::text_plain("Event not found.")) + } +} + +pub(super) async fn parse_pdu(body: Vec<&str>) -> Result { + if body.len() > 2 && body[0].trim().starts_with("```") && body.last().unwrap().trim() == "```" { + let string = body[1..body.len() - 1].join("\n"); + match serde_json::from_str(&string) { + Ok(value) => match ruma::signatures::reference_hash(&value, &RoomVersionId::V6) { + Ok(hash) => { + let event_id = EventId::parse(format!("${hash}")); + + match serde_json::from_value::(serde_json::to_value(value).expect("value is json")) { + Ok(pdu) => Ok(RoomMessageEventContent::text_plain(format!("EventId: {event_id:?}\n{pdu:#?}"))), + Err(e) => Ok(RoomMessageEventContent::text_plain(format!( + "EventId: {event_id:?}\nCould not parse event: {e}" + ))), + } + }, + Err(e) => Ok(RoomMessageEventContent::text_plain(format!("Could not parse PDU JSON: {e:?}"))), + }, + Err(e) => Ok(RoomMessageEventContent::text_plain(format!( + "Invalid json in command body: {e}" + ))), + } + } else { + Ok(RoomMessageEventContent::text_plain("Expected code block in command body.")) + } +} + +pub(super) async fn get_pdu(_body: Vec<&str>, event_id: Box) -> Result { + let mut outlier = false; + let mut pdu_json = services() + .rooms + .timeline + .get_non_outlier_pdu_json(&event_id)?; + if pdu_json.is_none() { + outlier = true; + pdu_json = services().rooms.timeline.get_pdu_json(&event_id)?; + } + match pdu_json { + Some(json) => { + let json_text = serde_json::to_string_pretty(&json).expect("canonical json is valid json"); + Ok(RoomMessageEventContent::text_html( + format!( + "{}\n```json\n{}\n```", + if outlier { + "Outlier PDU found in our database" + } else { + "PDU found in our database" + }, + json_text + ), + format!( + "

{}

\n
{}\n
\n", + if outlier { + "Outlier PDU found in our database" + } else { + "PDU found in our database" + }, + HtmlEscape(&json_text) + ), + )) + }, + None => Ok(RoomMessageEventContent::text_plain("PDU not found locally.")), + } +} + +pub(super) async fn get_remote_pdu( + _body: Vec<&str>, event_id: Box, server: Box, +) -> Result { + if !services().globals.config.allow_federation { + return Ok(RoomMessageEventContent::text_plain( + "Federation is disabled on this homeserver.", + )); + } + + if server == services().globals.server_name() { + return Ok(RoomMessageEventContent::text_plain( + "Not allowed to send federation requests to ourselves. Please use `get-pdu` for fetching local PDUs.", + )); + } + + match services() + .sending + .send_federation_request( + &server, + ruma::api::federation::event::get_event::v1::Request { + event_id: event_id.clone().into(), + }, + ) + .await + { + Ok(response) => { + let json: CanonicalJsonObject = serde_json::from_str(response.pdu.get()).map_err(|e| { + warn!( + "Requested event ID {event_id} from server but failed to convert from RawValue to \ + CanonicalJsonObject (malformed event/response?): {e}" + ); + Error::BadRequest(ErrorKind::Unknown, "Received response from server but failed to parse PDU") + })?; + + debug!("Attempting to parse PDU: {:?}", &response.pdu); + let parsed_pdu = { + let parsed_result = parse_incoming_pdu(&response.pdu); + let (event_id, value, room_id) = match parsed_result { + Ok(t) => t, + Err(e) => { + warn!("Failed to parse PDU: {e}"); + info!("Full PDU: {:?}", &response.pdu); + return Ok(RoomMessageEventContent::text_plain(format!( + "Failed to parse PDU remote server {server} sent us: {e}" + ))); + }, + }; + + vec![(event_id, value, room_id)] + }; + + let pub_key_map = RwLock::new(BTreeMap::new()); + + debug!("Attempting to fetch homeserver signing keys for {server}"); + services() + .rooms + .event_handler + .fetch_required_signing_keys(parsed_pdu.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map) + .await + .unwrap_or_else(|e| { + warn!("Could not fetch all signatures for PDUs from {server}: {e:?}"); + }); + + info!("Attempting to handle event ID {event_id} as backfilled PDU"); + services() + .rooms + .timeline + .backfill_pdu(&server, response.pdu, &pub_key_map) + .await?; + + let json_text = serde_json::to_string_pretty(&json).expect("canonical json is valid json"); + + Ok(RoomMessageEventContent::text_html( + format!( + "{}\n```json\n{}\n```", + "Got PDU from specified server and handled as backfilled PDU successfully. Event body:", json_text + ), + format!( + "

{}

\n
{}\n
\n", + "Got PDU from specified server and handled as backfilled PDU successfully. Event body:", + HtmlEscape(&json_text) + ), + )) + }, + Err(e) => Ok(RoomMessageEventContent::text_plain(format!( + "Remote server did not have PDU or failed sending request to remote server: {e}" + ))), + } +} + +pub(super) async fn get_room_state(_body: Vec<&str>, room_id: Box) -> Result { + let room_state = services() + .rooms + .state_accessor + .room_state_full(&room_id) + .await? + .values() + .map(|pdu| pdu.to_state_event()) + .collect::>(); + + if room_state.is_empty() { + return Ok(RoomMessageEventContent::text_plain( + "Unable to find room state in our database (vector is empty)", + )); + } + + let json_text = serde_json::to_string_pretty(&room_state).map_err(|e| { + error!("Failed converting room state vector in our database to pretty JSON: {e}"); + Error::bad_database( + "Failed to convert room state events to pretty JSON, possible invalid room state events in our database", + ) + })?; + + Ok(RoomMessageEventContent::text_html( + format!("{}\n```json\n{}\n```", "Found full room state", json_text), + format!( + "

{}

\n
{}\n
\n", + "Found full room state", + HtmlEscape(&json_text) + ), + )) +} + +pub(super) async fn ping(_body: Vec<&str>, server: Box) -> Result { + if server == services().globals.server_name() { + return Ok(RoomMessageEventContent::text_plain( + "Not allowed to send federation requests to ourselves.", + )); + } + + let timer = tokio::time::Instant::now(); + + match services() + .sending + .send_federation_request(&server, ruma::api::federation::discovery::get_server_version::v1::Request {}) + .await + { + Ok(response) => { + let ping_time = timer.elapsed(); + + let json_text_res = serde_json::to_string_pretty(&response.server); + + if let Ok(json) = json_text_res { + return Ok(RoomMessageEventContent::text_html( + format!("Got response which took {ping_time:?} time:\n```json\n{json}\n```"), + format!( + "

Got response which took {ping_time:?} time:

\n
{}\n
\n", + HtmlEscape(&json) + ), + )); + } + + Ok(RoomMessageEventContent::text_plain(format!( + "Got non-JSON response which took {ping_time:?} time:\n{0:?}", + response + ))) + }, + Err(e) => { + error!("Failed sending federation request to specified server from ping debug command: {e}"); + Ok(RoomMessageEventContent::text_plain(format!( + "Failed sending federation request to specified server:\n\n{e}", + ))) + }, + } +} + +pub(super) async fn force_device_list_updates(_body: Vec<&str>) -> Result { + // Force E2EE device list updates for all users + for user_id in services().users.iter().filter_map(Result::ok) { + services().users.mark_device_key_update(&user_id)?; + } + Ok(RoomMessageEventContent::text_plain( + "Marked all devices for all users as having new keys to update", + )) +} + +pub(super) async fn change_log_level( + _body: Vec<&str>, filter: Option, reset: bool, +) -> Result { + if reset { + let old_filter_layer = match EnvFilter::try_new(&services().globals.config.log) { + Ok(s) => s, + Err(e) => { + return Ok(RoomMessageEventContent::text_plain(format!( + "Log level from config appears to be invalid now: {e}" + ))); + }, + }; + + match services() + .globals + .tracing_reload_handle + .modify(|filter| *filter = old_filter_layer) + { + Ok(()) => { + return Ok(RoomMessageEventContent::text_plain(format!( + "Successfully changed log level back to config value {}", + services().globals.config.log + ))); + }, + Err(e) => { + return Ok(RoomMessageEventContent::text_plain(format!( + "Failed to modify and reload the global tracing log level: {e}" + ))); + }, + } + } + + if let Some(filter) = filter { + let new_filter_layer = match EnvFilter::try_new(filter) { + Ok(s) => s, + Err(e) => { + return Ok(RoomMessageEventContent::text_plain(format!( + "Invalid log level filter specified: {e}" + ))); + }, + }; + + match services() + .globals + .tracing_reload_handle + .modify(|filter| *filter = new_filter_layer) + { + Ok(()) => { + return Ok(RoomMessageEventContent::text_plain("Successfully changed log level")); + }, + Err(e) => { + return Ok(RoomMessageEventContent::text_plain(format!( + "Failed to modify and reload the global tracing log level: {e}" + ))); + }, + } + } + + Ok(RoomMessageEventContent::text_plain("No log level was specified.")) +} diff --git a/src/service/admin/debug/mod.rs b/src/service/admin/debug/mod.rs index 43823175..80e1c74c 100644 --- a/src/service/admin/debug/mod.rs +++ b/src/service/admin/debug/mod.rs @@ -1,8 +1,13 @@ use clap::Subcommand; -use ruma::{EventId, RoomId, ServerName}; +use ruma::{events::room::message::RoomMessageEventContent, EventId, RoomId, ServerName}; -#[allow(clippy::module_inception)] -pub(crate) mod debug; +use self::debug_commands::{ + change_log_level, force_device_list_updates, get_auth_chain, get_pdu, get_remote_pdu, get_room_state, parse_pdu, + ping, +}; +use crate::Result; + +pub(crate) mod debug_commands; #[cfg_attr(test, derive(Debug))] #[derive(Subcommand)] @@ -78,3 +83,30 @@ pub(crate) enum DebugCommand { reset: bool, }, } + +pub(crate) async fn process(command: DebugCommand, body: Vec<&str>) -> Result { + Ok(match command { + DebugCommand::GetAuthChain { + event_id, + } => get_auth_chain(body, event_id).await?, + DebugCommand::ParsePdu => parse_pdu(body).await?, + DebugCommand::GetPdu { + event_id, + } => get_pdu(body, event_id).await?, + DebugCommand::GetRemotePdu { + event_id, + server, + } => get_remote_pdu(body, event_id, server).await?, + DebugCommand::GetRoomState { + room_id, + } => get_room_state(body, room_id).await?, + DebugCommand::Ping { + server, + } => ping(body, server).await?, + DebugCommand::ForceDeviceListUpdates => force_device_list_updates(body).await?, + DebugCommand::ChangeLogLevel { + filter, + reset, + } => change_log_level(body, filter, reset).await?, + }) +} diff --git a/src/service/admin/fsck.rs b/src/service/admin/fsck.rs index 054976d4..9e9b64a1 100644 --- a/src/service/admin/fsck.rs +++ b/src/service/admin/fsck.rs @@ -1,7 +1,7 @@ use clap::Subcommand; use ruma::events::room::message::RoomMessageEventContent; -use crate::{services, Result}; +use crate::Result; #[cfg_attr(test, derive(Debug))] #[derive(Subcommand)] @@ -9,7 +9,8 @@ pub(crate) enum FsckCommand { Register, } -pub(crate) async fn fsck(command: FsckCommand, body: Vec<&str>) -> Result { +#[allow(dead_code)] +pub(crate) async fn fsck(command: FsckCommand, _body: Vec<&str>) -> Result { match command { FsckCommand::Register => { todo!() diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index da5a77c4..1971461d 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -280,11 +280,11 @@ impl Service { let reply_message_content = match command { AdminCommand::Appservices(command) => appservice::process(command, body).await?, AdminCommand::Media(command) => media::process(command, body).await?, - AdminCommand::Users(command) => user::user::process(command, body).await?, + AdminCommand::Users(command) => user::process(command, body).await?, AdminCommand::Rooms(command) => room::process(command, body).await?, AdminCommand::Federation(command) => federation::process(command, body).await?, AdminCommand::Server(command) => server::process(command, body).await?, - AdminCommand::Debug(command) => debug::debug::process(command, body).await?, + AdminCommand::Debug(command) => debug::process(command, body).await?, AdminCommand::Query(command) => query::process(command, body).await?, }; diff --git a/src/service/admin/query/account_data.rs b/src/service/admin/query/account_data.rs index 7e03d794..f335489e 100644 --- a/src/service/admin/query/account_data.rs +++ b/src/service/admin/query/account_data.rs @@ -32,7 +32,7 @@ pub(crate) enum AccountData { } /// All the getters and iterators from src/database/key_value/account_data.rs -pub(crate) async fn account_data(subcommand: AccountData) -> Result { +pub(super) async fn account_data(subcommand: AccountData) -> Result { match subcommand { AccountData::ChangesSince { user_id, diff --git a/src/service/admin/query/appservice.rs b/src/service/admin/query/appservice.rs index 4dfada18..c576f7db 100644 --- a/src/service/admin/query/appservice.rs +++ b/src/service/admin/query/appservice.rs @@ -15,7 +15,7 @@ pub(crate) enum Appservice { } /// All the getters and iterators from src/database/key_value/appservice.rs -pub(crate) async fn appservice(subcommand: Appservice) -> Result { +pub(super) async fn appservice(subcommand: Appservice) -> Result { match subcommand { Appservice::GetRegistration { appservice_id, diff --git a/src/service/admin/query/globals.rs b/src/service/admin/query/globals.rs index 2e2a38fc..25c3e337 100644 --- a/src/service/admin/query/globals.rs +++ b/src/service/admin/query/globals.rs @@ -23,7 +23,7 @@ pub(crate) enum Globals { } /// All the getters and iterators from src/database/key_value/globals.rs -pub(crate) async fn globals(subcommand: Globals) -> Result { +pub(super) async fn globals(subcommand: Globals) -> Result { match subcommand { Globals::DatabaseVersion => { let timer = tokio::time::Instant::now(); diff --git a/src/service/admin/query/mod.rs b/src/service/admin/query/mod.rs index fc10c1c9..8033e731 100644 --- a/src/service/admin/query/mod.rs +++ b/src/service/admin/query/mod.rs @@ -44,11 +44,11 @@ pub(crate) enum QueryCommand { /// Processes admin query commands #[allow(non_snake_case)] pub(crate) async fn process(command: QueryCommand, _body: Vec<&str>) -> Result { - match command { - QueryCommand::AccountData(AccountData) => account_data(AccountData).await, - QueryCommand::Appservice(Appservice) => appservice(Appservice).await, - QueryCommand::Presence(Presence) => presence(Presence).await, - QueryCommand::RoomAlias(RoomAlias) => room_alias(RoomAlias).await, - QueryCommand::Globals(Globals) => globals(Globals).await, - } + Ok(match command { + QueryCommand::AccountData(AccountData) => account_data(AccountData).await?, + QueryCommand::Appservice(Appservice) => appservice(Appservice).await?, + QueryCommand::Presence(Presence) => presence(Presence).await?, + QueryCommand::RoomAlias(RoomAlias) => room_alias(RoomAlias).await?, + QueryCommand::Globals(Globals) => globals(Globals).await?, + }) } diff --git a/src/service/admin/query/presence.rs b/src/service/admin/query/presence.rs index 952920c1..bb55b88f 100644 --- a/src/service/admin/query/presence.rs +++ b/src/service/admin/query/presence.rs @@ -22,7 +22,7 @@ pub(crate) enum Presence { } /// All the getters and iterators in key_value/presence.rs -pub(crate) async fn presence(subcommand: Presence) -> Result { +pub(super) async fn presence(subcommand: Presence) -> Result { match subcommand { Presence::GetPresence { user_id, diff --git a/src/service/admin/query/room_alias.rs b/src/service/admin/query/room_alias.rs index f5ca4965..e854f643 100644 --- a/src/service/admin/query/room_alias.rs +++ b/src/service/admin/query/room_alias.rs @@ -23,7 +23,7 @@ pub(crate) enum RoomAlias { } /// All the getters and iterators in src/database/key_value/rooms/alias.rs -pub(crate) async fn room_alias(subcommand: RoomAlias) -> Result { +pub(super) async fn room_alias(subcommand: RoomAlias) -> Result { match subcommand { RoomAlias::ResolveLocalAlias { alias, diff --git a/src/service/admin/server.rs b/src/service/admin/server.rs deleted file mode 100644 index 07519a1b..00000000 --- a/src/service/admin/server.rs +++ /dev/null @@ -1,106 +0,0 @@ -use clap::Subcommand; -use ruma::events::room::message::RoomMessageEventContent; - -use crate::{services, Result}; - -#[cfg_attr(test, derive(Debug))] -#[derive(Subcommand)] -pub(crate) enum ServerCommand { - /// - Show configuration values - ShowConfig, - - /// - Print database memory usage statistics - MemoryUsage, - - /// - Clears all of Conduit's database caches with index smaller than the - /// amount - ClearDatabaseCaches { - amount: u32, - }, - - /// - Clears all of Conduit's service caches with index smaller than the - /// amount - ClearServiceCaches { - amount: u32, - }, - - /// - Performs an online backup of the database (only available for RocksDB - /// at the moment) - BackupDatabase, - - /// - List database backups - ListBackups, - - /// - List database files - ListDatabaseFiles, -} - -pub(crate) async fn process(command: ServerCommand, _body: Vec<&str>) -> Result { - match command { - ServerCommand::ShowConfig => { - // Construct and send the response - Ok(RoomMessageEventContent::text_plain(format!("{}", services().globals.config))) - }, - ServerCommand::MemoryUsage => { - let response1 = services().memory_usage().await; - let response2 = services().globals.db.memory_usage(); - - Ok(RoomMessageEventContent::text_plain(format!( - "Services:\n{response1}\n\nDatabase:\n{response2}" - ))) - }, - ServerCommand::ClearDatabaseCaches { - amount, - } => { - services().globals.db.clear_caches(amount); - - Ok(RoomMessageEventContent::text_plain("Done.")) - }, - ServerCommand::ClearServiceCaches { - amount, - } => { - services().clear_caches(amount).await; - - Ok(RoomMessageEventContent::text_plain("Done.")) - }, - ServerCommand::ListBackups => { - let result = services().globals.db.backup_list()?; - - if result.is_empty() { - Ok(RoomMessageEventContent::text_plain("No backups found.")) - } else { - Ok(RoomMessageEventContent::text_plain(result)) - } - }, - ServerCommand::BackupDatabase => { - if !cfg!(feature = "rocksdb") { - return Ok(RoomMessageEventContent::text_plain( - "Only RocksDB supports online backups in conduwuit.", - )); - } - - let mut result = tokio::task::spawn_blocking(move || match services().globals.db.backup() { - Ok(()) => String::new(), - Err(e) => (*e).to_string(), - }) - .await - .unwrap(); - - if result.is_empty() { - result = services().globals.db.backup_list()?; - } - - Ok(RoomMessageEventContent::text_plain(&result)) - }, - ServerCommand::ListDatabaseFiles => { - if !cfg!(feature = "rocksdb") { - return Ok(RoomMessageEventContent::text_plain( - "Only RocksDB supports listing files in conduwuit.", - )); - } - - let result = services().globals.db.file_list()?; - Ok(RoomMessageEventContent::notice_html(String::new(), result)) - }, - } -} diff --git a/src/service/admin/server/mod.rs b/src/service/admin/server/mod.rs new file mode 100644 index 00000000..b18b2c7a --- /dev/null +++ b/src/service/admin/server/mod.rs @@ -0,0 +1,58 @@ +pub(crate) mod server_commands; + +use clap::Subcommand; +use ruma::events::room::message::RoomMessageEventContent; + +use self::server_commands::{ + backup_database, clear_database_caches, clear_service_caches, list_backups, list_database_files, memory_usage, + show_config, +}; +use crate::Result; + +#[cfg_attr(test, derive(Debug))] +#[derive(Subcommand)] +pub(crate) enum ServerCommand { + /// - Show configuration values + ShowConfig, + + /// - Print database memory usage statistics + MemoryUsage, + + /// - Clears all of Conduit's database caches with index smaller than the + /// amount + ClearDatabaseCaches { + amount: u32, + }, + + /// - Clears all of Conduit's service caches with index smaller than the + /// amount + ClearServiceCaches { + amount: u32, + }, + + /// - Performs an online backup of the database (only available for RocksDB + /// at the moment) + BackupDatabase, + + /// - List database backups + ListBackups, + + /// - List database files + ListDatabaseFiles, +} + +pub(crate) async fn process(command: ServerCommand, body: Vec<&str>) -> Result { + Ok(match command { + ServerCommand::ShowConfig => show_config(body).await?, + ServerCommand::MemoryUsage => memory_usage(body).await?, + ServerCommand::ClearDatabaseCaches { + amount, + } => clear_database_caches(body, amount).await?, + ServerCommand::ClearServiceCaches { + amount, + } => clear_service_caches(body, amount).await?, + ServerCommand::ListBackups => list_backups(body).await?, + ServerCommand::BackupDatabase => backup_database(body).await?, + ServerCommand::ListDatabaseFiles => list_database_files(body).await?, + }) +} diff --git a/src/service/admin/server/server_commands.rs b/src/service/admin/server/server_commands.rs new file mode 100644 index 00000000..f363d7ff --- /dev/null +++ b/src/service/admin/server/server_commands.rs @@ -0,0 +1,71 @@ +use ruma::events::room::message::RoomMessageEventContent; + +use crate::{services, Result}; + +pub(super) async fn show_config(_body: Vec<&str>) -> Result { + // Construct and send the response + Ok(RoomMessageEventContent::text_plain(format!("{}", services().globals.config))) +} + +pub(super) async fn memory_usage(_body: Vec<&str>) -> Result { + let response1 = services().memory_usage().await; + let response2 = services().globals.db.memory_usage(); + + Ok(RoomMessageEventContent::text_plain(format!( + "Services:\n{response1}\n\nDatabase:\n{response2}" + ))) +} + +pub(super) async fn clear_database_caches(_body: Vec<&str>, amount: u32) -> Result { + services().globals.db.clear_caches(amount); + + Ok(RoomMessageEventContent::text_plain("Done.")) +} + +pub(super) async fn clear_service_caches(_body: Vec<&str>, amount: u32) -> Result { + services().clear_caches(amount).await; + + Ok(RoomMessageEventContent::text_plain("Done.")) +} + +pub(super) async fn list_backups(_body: Vec<&str>) -> Result { + let result = services().globals.db.backup_list()?; + + if result.is_empty() { + Ok(RoomMessageEventContent::text_plain("No backups found.")) + } else { + Ok(RoomMessageEventContent::text_plain(result)) + } +} + +pub(super) async fn backup_database(_body: Vec<&str>) -> Result { + if !cfg!(feature = "rocksdb") { + return Ok(RoomMessageEventContent::text_plain( + "Only RocksDB supports online backups in conduwuit.", + )); + } + + let mut result = tokio::task::spawn_blocking(move || match services().globals.db.backup() { + Ok(()) => String::new(), + Err(e) => (*e).to_string(), + }) + .await + .unwrap(); + + if result.is_empty() { + result = services().globals.db.backup_list()?; + } + + Ok(RoomMessageEventContent::text_plain(&result)) +} + +pub(super) async fn list_database_files(_body: Vec<&str>) -> Result { + if !cfg!(feature = "rocksdb") { + return Ok(RoomMessageEventContent::text_plain( + "Only RocksDB supports listing files in conduwuit.", + )); + } + + let result = services().globals.db.file_list()?; + Ok(RoomMessageEventContent::notice_html(String::new(), result)) +} diff --git a/src/service/admin/user/mod.rs b/src/service/admin/user/mod.rs index 7ac30043..d14f7cfc 100644 --- a/src/service/admin/user/mod.rs +++ b/src/service/admin/user/mod.rs @@ -1,8 +1,10 @@ -#[allow(clippy::module_inception)] -pub(crate) mod user; +pub(crate) mod user_commands; use clap::Subcommand; -use ruma::UserId; +use ruma::{events::room::message::RoomMessageEventContent, UserId}; + +use self::user_commands::{create, deactivate, deactivate_all, list, list_joined_rooms, reset_password}; +use crate::Result; #[cfg_attr(test, derive(Debug))] #[derive(Subcommand)] @@ -18,7 +20,7 @@ pub(crate) enum UserCommand { /// - Reset user password ResetPassword { /// Username of the user for whom the password should be reset - username: String, + username: Box, }, /// - Deactivate a user @@ -61,3 +63,27 @@ pub(crate) enum UserCommand { user_id: Box, }, } + +pub(crate) async fn process(command: UserCommand, body: Vec<&str>) -> Result { + Ok(match command { + UserCommand::List => list(body).await?, + UserCommand::Create { + username, + password, + } => create(body, username, password).await?, + UserCommand::Deactivate { + leave_rooms, + user_id, + } => deactivate(body, leave_rooms, user_id).await?, + UserCommand::ResetPassword { + username, + } => reset_password(body, username).await?, + UserCommand::DeactivateAll { + leave_rooms, + force, + } => deactivate_all(body, leave_rooms, force).await?, + UserCommand::ListJoinedRooms { + user_id, + } => list_joined_rooms(body, user_id).await?, + }) +} diff --git a/src/service/admin/user/user.rs b/src/service/admin/user/user.rs deleted file mode 100644 index 73273590..00000000 --- a/src/service/admin/user/user.rs +++ /dev/null @@ -1,349 +0,0 @@ -use std::{fmt::Write as _, sync::Arc}; - -use itertools::Itertools; -use ruma::{events::room::message::RoomMessageEventContent, OwnedRoomId, UserId}; -use tracing::{error, info, warn}; - -use super::UserCommand; - -use crate::{ - api::client_server::{join_room_by_id_helper, leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH}, - service::admin::{escape_html, get_room_info}, - services, utils, Result, -}; - -pub(crate) async fn process(command: UserCommand, body: Vec<&str>) -> Result { - match command { - UserCommand::List => match services().users.list_local_users() { - Ok(users) => { - let mut msg = format!("Found {} local user account(s):\n", users.len()); - msg += &users.join("\n"); - Ok(RoomMessageEventContent::text_plain(&msg)) - }, - Err(e) => Ok(RoomMessageEventContent::text_plain(e.to_string())), - }, - UserCommand::Create { - username, - password, - } => { - let password = password.unwrap_or_else(|| utils::random_string(AUTO_GEN_PASSWORD_LENGTH)); - // Validate user id - let user_id = match UserId::parse_with_server_name( - username.as_str().to_lowercase(), - services().globals.server_name(), - ) { - Ok(id) => id, - Err(e) => { - return Ok(RoomMessageEventContent::text_plain(format!( - "The supplied username is not a valid username: {e}" - ))) - }, - }; - if user_id.is_historical() { - return Ok(RoomMessageEventContent::text_plain(format!( - "Userid {user_id} is not allowed due to historical" - ))); - } - if services().users.exists(&user_id)? { - return Ok(RoomMessageEventContent::text_plain(format!("Userid {user_id} already exists"))); - } - // Create user - services().users.create(&user_id, Some(password.as_str()))?; - - // Default to pretty displayname - let mut displayname = user_id.localpart().to_owned(); - - // If `new_user_displayname_suffix` is set, registration will push whatever - // content is set to the user's display name with a space before it - if !services().globals.new_user_displayname_suffix().is_empty() { - displayname.push_str(&(" ".to_owned() + services().globals.new_user_displayname_suffix())); - } - - services() - .users - .set_displayname(&user_id, Some(displayname)) - .await?; - - // Initial account data - services().account_data.update( - None, - &user_id, - ruma::events::GlobalAccountDataEventType::PushRules - .to_string() - .into(), - &serde_json::to_value(ruma::events::push_rules::PushRulesEvent { - content: ruma::events::push_rules::PushRulesEventContent { - global: ruma::push::Ruleset::server_default(&user_id), - }, - }) - .expect("to json value always works"), - )?; - - if !services().globals.config.auto_join_rooms.is_empty() { - for room in &services().globals.config.auto_join_rooms { - if !services() - .rooms - .state_cache - .server_in_room(services().globals.server_name(), room)? - { - warn!("Skipping room {room} to automatically join as we have never joined before."); - continue; - } - - if let Some(room_id_server_name) = room.server_name() { - match join_room_by_id_helper( - Some(&user_id), - room, - Some("Automatically joining this room upon registration".to_owned()), - &[room_id_server_name.to_owned(), services().globals.server_name().to_owned()], - None, - ) - .await - { - Ok(_) => { - info!("Automatically joined room {room} for user {user_id}"); - }, - Err(e) => { - // don't return this error so we don't fail registrations - error!("Failed to automatically join room {room} for user {user_id}: {e}"); - }, - }; - } - } - } - - // we dont add a device since we're not the user, just the creator - - // Inhibit login does not work for guests - Ok(RoomMessageEventContent::text_plain(format!( - "Created user with user_id: {user_id} and password: `{password}`" - ))) - }, - UserCommand::Deactivate { - leave_rooms, - user_id, - } => { - let user_id = Arc::::from(user_id); - - // check if user belongs to our server - if user_id.server_name() != services().globals.server_name() { - return Ok(RoomMessageEventContent::text_plain(format!( - "User {user_id} does not belong to our server." - ))); - } - - // don't deactivate the conduit service account - if user_id - == UserId::parse_with_server_name("conduit", services().globals.server_name()) - .expect("conduit user exists") - { - return Ok(RoomMessageEventContent::text_plain( - "Not allowed to deactivate the Conduit service account.", - )); - } - - if services().users.exists(&user_id)? { - RoomMessageEventContent::text_plain(format!("Making {user_id} leave all rooms before deactivation...")); - - services().users.deactivate_account(&user_id)?; - - if leave_rooms { - leave_all_rooms(&user_id).await?; - } - - Ok(RoomMessageEventContent::text_plain(format!( - "User {user_id} has been deactivated" - ))) - } else { - Ok(RoomMessageEventContent::text_plain(format!( - "User {user_id} doesn't exist on this server" - ))) - } - }, - UserCommand::ResetPassword { - username, - } => { - let user_id = match UserId::parse_with_server_name( - username.as_str().to_lowercase(), - services().globals.server_name(), - ) { - Ok(id) => id, - Err(e) => { - return Ok(RoomMessageEventContent::text_plain(format!( - "The supplied username is not a valid username: {e}" - ))) - }, - }; - - // check if user belongs to our server - if user_id.server_name() != services().globals.server_name() { - return Ok(RoomMessageEventContent::text_plain(format!( - "User {user_id} does not belong to our server." - ))); - } - - // Check if the specified user is valid - if !services().users.exists(&user_id)? - || user_id - == UserId::parse_with_server_name("conduit", services().globals.server_name()) - .expect("conduit user exists") - { - return Ok(RoomMessageEventContent::text_plain("The specified user does not exist!")); - } - - let new_password = utils::random_string(AUTO_GEN_PASSWORD_LENGTH); - - match services() - .users - .set_password(&user_id, Some(new_password.as_str())) - { - Ok(()) => Ok(RoomMessageEventContent::text_plain(format!( - "Successfully reset the password for user {user_id}: `{new_password}`" - ))), - Err(e) => Ok(RoomMessageEventContent::text_plain(format!( - "Couldn't reset the password for user {user_id}: {e}" - ))), - } - }, - UserCommand::DeactivateAll { - leave_rooms, - force, - } => { - if body.len() > 2 && body[0].trim().starts_with("```") && body.last().unwrap().trim() == "```" { - let usernames = body.clone().drain(1..body.len() - 1).collect::>(); - - let mut user_ids: Vec<&UserId> = Vec::new(); - - for &username in &usernames { - match <&UserId>::try_from(username) { - Ok(user_id) => user_ids.push(user_id), - Err(e) => { - return Ok(RoomMessageEventContent::text_plain(format!( - "{username} is not a valid username: {e}" - ))) - }, - } - } - - let mut deactivation_count = 0; - let mut admins = Vec::new(); - - if !force { - user_ids.retain(|&user_id| match services().users.is_admin(user_id) { - Ok(is_admin) => { - if is_admin { - admins.push(user_id.localpart()); - false - } else { - true - } - }, - Err(_) => false, - }); - } - - for &user_id in &user_ids { - // check if user belongs to our server and skips over non-local users - if user_id.server_name() != services().globals.server_name() { - continue; - } - - // don't deactivate the conduit service account - if user_id - == UserId::parse_with_server_name("conduit", services().globals.server_name()) - .expect("conduit user exists") - { - continue; - } - - // user does not exist on our server - if !services().users.exists(user_id)? { - continue; - } - - if services().users.deactivate_account(user_id).is_ok() { - deactivation_count += 1; - } - } - - if leave_rooms { - for &user_id in &user_ids { - _ = leave_all_rooms(user_id).await; - } - } - - if admins.is_empty() { - Ok(RoomMessageEventContent::text_plain(format!( - "Deactivated {deactivation_count} accounts." - ))) - } else { - Ok(RoomMessageEventContent::text_plain(format!( - "Deactivated {} accounts.\nSkipped admin accounts: {:?}. Use --force to deactivate admin \ - accounts", - deactivation_count, - admins.join(", ") - ))) - } - } else { - Ok(RoomMessageEventContent::text_plain( - "Expected code block in command body. Add --help for details.", - )) - } - }, - UserCommand::ListJoinedRooms { - user_id, - } => { - if user_id.server_name() != services().globals.server_name() { - return Ok(RoomMessageEventContent::text_plain("User does not belong to our server.")); - } - - if !services().users.exists(&user_id)? { - return Ok(RoomMessageEventContent::text_plain("User does not exist on this server.")); - } - - let mut rooms: Vec<(OwnedRoomId, u64, String)> = services() - .rooms - .state_cache - .rooms_joined(&user_id) - .filter_map(Result::ok) - .map(|room_id| get_room_info(&room_id)) - .sorted_unstable() - .dedup() - .collect(); - - if rooms.is_empty() { - return Ok(RoomMessageEventContent::text_plain("User is not in any rooms.")); - } - - rooms.sort_by_key(|r| r.1); - rooms.reverse(); - - let output_plain = format!( - "Rooms {user_id} Joined:\n{}", - rooms - .iter() - .map(|(id, members, name)| format!("{id}\tMembers: {members}\tName: {name}")) - .collect::>() - .join("\n") - ); - let output_html = format!( - "\n\t\t\n{}
Rooms {user_id} \ - Joined
idmembersname
", - rooms - .iter() - .fold(String::new(), |mut output, (id, members, name)| { - writeln!( - output, - "{}\t{}\t{}", - escape_html(id.as_ref()), - members, - escape_html(name) - ) - .unwrap(); - output - }) - ); - Ok(RoomMessageEventContent::text_html(output_plain, output_html)) - }, - } -} diff --git a/src/service/admin/user/user_commands.rs b/src/service/admin/user/user_commands.rs new file mode 100644 index 00000000..47a43233 --- /dev/null +++ b/src/service/admin/user/user_commands.rs @@ -0,0 +1,334 @@ +use std::{fmt::Write as _, sync::Arc}; + +use itertools::Itertools; +use ruma::{events::room::message::RoomMessageEventContent, OwnedRoomId, UserId}; +use tracing::{error, info, warn}; + +use crate::{ + api::client_server::{join_room_by_id_helper, leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH}, + service::admin::{escape_html, get_room_info}, + services, utils, Result, +}; + +pub(super) async fn list(_body: Vec<&str>) -> Result { + match services().users.list_local_users() { + Ok(users) => { + let mut msg = format!("Found {} local user account(s):\n", users.len()); + msg += &users.join("\n"); + Ok(RoomMessageEventContent::text_plain(&msg)) + }, + Err(e) => Ok(RoomMessageEventContent::text_plain(e.to_string())), + } +} + +pub(super) async fn create( + _body: Vec<&str>, username: String, password: Option, +) -> Result { + let password = password.unwrap_or_else(|| utils::random_string(AUTO_GEN_PASSWORD_LENGTH)); + // Validate user id + let user_id = + match UserId::parse_with_server_name(username.as_str().to_lowercase(), services().globals.server_name()) { + Ok(id) => id, + Err(e) => { + return Ok(RoomMessageEventContent::text_plain(format!( + "The supplied username is not a valid username: {e}" + ))) + }, + }; + if user_id.is_historical() { + return Ok(RoomMessageEventContent::text_plain(format!( + "Userid {user_id} is not allowed due to historical" + ))); + } + if services().users.exists(&user_id)? { + return Ok(RoomMessageEventContent::text_plain(format!("Userid {user_id} already exists"))); + } + // Create user + services().users.create(&user_id, Some(password.as_str()))?; + + // Default to pretty displayname + let mut displayname = user_id.localpart().to_owned(); + + // If `new_user_displayname_suffix` is set, registration will push whatever + // content is set to the user's display name with a space before it + if !services().globals.new_user_displayname_suffix().is_empty() { + displayname.push_str(&(" ".to_owned() + services().globals.new_user_displayname_suffix())); + } + + services() + .users + .set_displayname(&user_id, Some(displayname)) + .await?; + + // Initial account data + services().account_data.update( + None, + &user_id, + ruma::events::GlobalAccountDataEventType::PushRules + .to_string() + .into(), + &serde_json::to_value(ruma::events::push_rules::PushRulesEvent { + content: ruma::events::push_rules::PushRulesEventContent { + global: ruma::push::Ruleset::server_default(&user_id), + }, + }) + .expect("to json value always works"), + )?; + + if !services().globals.config.auto_join_rooms.is_empty() { + for room in &services().globals.config.auto_join_rooms { + if !services() + .rooms + .state_cache + .server_in_room(services().globals.server_name(), room)? + { + warn!("Skipping room {room} to automatically join as we have never joined before."); + continue; + } + + if let Some(room_id_server_name) = room.server_name() { + match join_room_by_id_helper( + Some(&user_id), + room, + Some("Automatically joining this room upon registration".to_owned()), + &[room_id_server_name.to_owned(), services().globals.server_name().to_owned()], + None, + ) + .await + { + Ok(_) => { + info!("Automatically joined room {room} for user {user_id}"); + }, + Err(e) => { + // don't return this error so we don't fail registrations + error!("Failed to automatically join room {room} for user {user_id}: {e}"); + }, + }; + } + } + } + + // we dont add a device since we're not the user, just the creator + + // Inhibit login does not work for guests + Ok(RoomMessageEventContent::text_plain(format!( + "Created user with user_id: {user_id} and password: `{password}`" + ))) +} + +pub(super) async fn deactivate( + _body: Vec<&str>, leave_rooms: bool, user_id: Box, +) -> Result { + let user_id = Arc::::from(user_id); + + // check if user belongs to our server + if user_id.server_name() != services().globals.server_name() { + return Ok(RoomMessageEventContent::text_plain(format!( + "User {user_id} does not belong to our server." + ))); + } + + // don't deactivate the conduit service account + if user_id + == UserId::parse_with_server_name("conduit", services().globals.server_name()).expect("conduit user exists") + { + return Ok(RoomMessageEventContent::text_plain( + "Not allowed to deactivate the Conduit service account.", + )); + } + + if services().users.exists(&user_id)? { + RoomMessageEventContent::text_plain(format!("Making {user_id} leave all rooms before deactivation...")); + + services().users.deactivate_account(&user_id)?; + + if leave_rooms { + leave_all_rooms(&user_id).await?; + } + + Ok(RoomMessageEventContent::text_plain(format!( + "User {user_id} has been deactivated" + ))) + } else { + Ok(RoomMessageEventContent::text_plain(format!( + "User {user_id} doesn't exist on this server" + ))) + } +} + +pub(super) async fn reset_password(_body: Vec<&str>, username: Box) -> Result { + let user_id = + match UserId::parse_with_server_name(username.as_str().to_lowercase(), services().globals.server_name()) { + Ok(id) => id, + Err(e) => { + return Ok(RoomMessageEventContent::text_plain(format!( + "The supplied username is not a valid username: {e}" + ))) + }, + }; + + // check if user belongs to our server + if user_id.server_name() != services().globals.server_name() { + return Ok(RoomMessageEventContent::text_plain(format!( + "User {user_id} does not belong to our server." + ))); + } + + // Check if the specified user is valid + if !services().users.exists(&user_id)? + || user_id + == UserId::parse_with_server_name("conduit", services().globals.server_name()).expect("conduit user exists") + { + return Ok(RoomMessageEventContent::text_plain("The specified user does not exist!")); + } + + let new_password = utils::random_string(AUTO_GEN_PASSWORD_LENGTH); + + match services() + .users + .set_password(&user_id, Some(new_password.as_str())) + { + Ok(()) => Ok(RoomMessageEventContent::text_plain(format!( + "Successfully reset the password for user {user_id}: `{new_password}`" + ))), + Err(e) => Ok(RoomMessageEventContent::text_plain(format!( + "Couldn't reset the password for user {user_id}: {e}" + ))), + } +} + +pub(super) async fn deactivate_all(body: Vec<&str>, leave_rooms: bool, force: bool) -> Result { + if body.len() > 2 && body[0].trim().starts_with("```") && body.last().unwrap().trim() == "```" { + let usernames = body.clone().drain(1..body.len() - 1).collect::>(); + + let mut user_ids: Vec<&UserId> = Vec::new(); + + for &username in &usernames { + match <&UserId>::try_from(username) { + Ok(user_id) => user_ids.push(user_id), + Err(e) => { + return Ok(RoomMessageEventContent::text_plain(format!( + "{username} is not a valid username: {e}" + ))) + }, + } + } + + let mut deactivation_count = 0; + let mut admins = Vec::new(); + + if !force { + user_ids.retain(|&user_id| match services().users.is_admin(user_id) { + Ok(is_admin) => { + if is_admin { + admins.push(user_id.localpart()); + false + } else { + true + } + }, + Err(_) => false, + }); + } + + for &user_id in &user_ids { + // check if user belongs to our server and skips over non-local users + if user_id.server_name() != services().globals.server_name() { + continue; + } + + // don't deactivate the conduit service account + if user_id + == UserId::parse_with_server_name("conduit", services().globals.server_name()) + .expect("conduit user exists") + { + continue; + } + + // user does not exist on our server + if !services().users.exists(user_id)? { + continue; + } + + if services().users.deactivate_account(user_id).is_ok() { + deactivation_count += 1; + } + } + + if leave_rooms { + for &user_id in &user_ids { + _ = leave_all_rooms(user_id).await; + } + } + + if admins.is_empty() { + Ok(RoomMessageEventContent::text_plain(format!( + "Deactivated {deactivation_count} accounts." + ))) + } else { + Ok(RoomMessageEventContent::text_plain(format!( + "Deactivated {} accounts.\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts", + deactivation_count, + admins.join(", ") + ))) + } + } else { + Ok(RoomMessageEventContent::text_plain( + "Expected code block in command body. Add --help for details.", + )) + } +} + +pub(super) async fn list_joined_rooms(_body: Vec<&str>, user_id: Box) -> Result { + if user_id.server_name() != services().globals.server_name() { + return Ok(RoomMessageEventContent::text_plain("User does not belong to our server.")); + } + + if !services().users.exists(&user_id)? { + return Ok(RoomMessageEventContent::text_plain("User does not exist on this server.")); + } + + let mut rooms: Vec<(OwnedRoomId, u64, String)> = services() + .rooms + .state_cache + .rooms_joined(&user_id) + .filter_map(Result::ok) + .map(|room_id| get_room_info(&room_id)) + .sorted_unstable() + .dedup() + .collect(); + + if rooms.is_empty() { + return Ok(RoomMessageEventContent::text_plain("User is not in any rooms.")); + } + + rooms.sort_by_key(|r| r.1); + rooms.reverse(); + + let output_plain = format!( + "Rooms {user_id} Joined:\n{}", + rooms + .iter() + .map(|(id, members, name)| format!("{id}\tMembers: {members}\tName: {name}")) + .collect::>() + .join("\n") + ); + let output_html = format!( + "\n\t\t\n{}
Rooms {user_id} \ + Joined
idmembersname
", + rooms + .iter() + .fold(String::new(), |mut output, (id, members, name)| { + writeln!( + output, + "{}\t{}\t{}", + escape_html(id.as_ref()), + members, + escape_html(name) + ) + .unwrap(); + output + }) + ); + Ok(RoomMessageEventContent::text_html(output_plain, output_html)) +}