replace admin command branches returning RoomMessageEventContent
rename admin Command back to Context Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
f86d7236ac
commit
56cc9318de
32 changed files with 903 additions and 1306 deletions
|
@ -6,7 +6,7 @@ use std::{
|
|||
};
|
||||
|
||||
use conduwuit::{
|
||||
Error, Result, debug_error, err, info,
|
||||
Err, Result, debug_error, err, info,
|
||||
matrix::pdu::{PduEvent, PduId, RawPduId},
|
||||
trace, utils,
|
||||
utils::{
|
||||
|
@ -19,7 +19,7 @@ use futures::{FutureExt, StreamExt, TryStreamExt};
|
|||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
|
||||
OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId,
|
||||
api::federation::event::get_room_state, events::room::message::RoomMessageEventContent,
|
||||
api::federation::event::get_room_state,
|
||||
};
|
||||
use service::rooms::{
|
||||
short::{ShortEventId, ShortRoomId},
|
||||
|
@ -30,28 +30,24 @@ use tracing_subscriber::EnvFilter;
|
|||
use crate::admin_command;
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn echo(&self, message: Vec<String>) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn echo(&self, message: Vec<String>) -> Result {
|
||||
let message = message.join(" ");
|
||||
|
||||
Ok(RoomMessageEventContent::notice_plain(message))
|
||||
self.write_str(&message).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn get_auth_chain(
|
||||
&self,
|
||||
event_id: OwnedEventId,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn get_auth_chain(&self, event_id: OwnedEventId) -> Result {
|
||||
let Ok(event) = self.services.rooms.timeline.get_pdu_json(&event_id).await else {
|
||||
return Ok(RoomMessageEventContent::notice_plain("Event not found."));
|
||||
return Err!("Event not found.");
|
||||
};
|
||||
|
||||
let room_id_str = event
|
||||
.get("room_id")
|
||||
.and_then(|val| val.as_str())
|
||||
.ok_or_else(|| Error::bad_database("Invalid event in database"))?;
|
||||
.and_then(CanonicalJsonValue::as_str)
|
||||
.ok_or_else(|| err!(Database("Invalid event in database")))?;
|
||||
|
||||
let room_id = <&RoomId>::try_from(room_id_str)
|
||||
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
|
||||
.map_err(|_| err!(Database("Invalid room id field in event in database")))?;
|
||||
|
||||
let start = Instant::now();
|
||||
let count = self
|
||||
|
@ -64,51 +60,39 @@ pub(super) async fn get_auth_chain(
|
|||
.await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Loaded auth chain with length {count} in {elapsed:?}"
|
||||
)))
|
||||
let out = format!("Loaded auth chain with length {count} in {elapsed:?}");
|
||||
|
||||
self.write_str(&out).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn parse_pdu(&self) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn parse_pdu(&self) -> Result {
|
||||
if self.body.len() < 2
|
||||
|| !self.body[0].trim().starts_with("```")
|
||||
|| self.body.last().unwrap_or(&EMPTY).trim() != "```"
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
));
|
||||
return Err!("Expected code block in command body. Add --help for details.");
|
||||
}
|
||||
|
||||
let string = self.body[1..self.body.len().saturating_sub(1)].join("\n");
|
||||
match serde_json::from_str(&string) {
|
||||
| Err(e) => return Err!("Invalid json in command body: {e}"),
|
||||
| Ok(value) => match ruma::signatures::reference_hash(&value, &RoomVersionId::V6) {
|
||||
| Err(e) => return Err!("Could not parse PDU JSON: {e:?}"),
|
||||
| Ok(hash) => {
|
||||
let event_id = OwnedEventId::parse(format!("${hash}"));
|
||||
|
||||
match serde_json::from_value::<PduEvent>(
|
||||
serde_json::to_value(value).expect("value is json"),
|
||||
) {
|
||||
| Ok(pdu) => Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"EventId: {event_id:?}\n{pdu:#?}"
|
||||
))),
|
||||
| Err(e) => Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"EventId: {event_id:?}\nCould not parse event: {e}"
|
||||
))),
|
||||
match serde_json::from_value::<PduEvent>(serde_json::to_value(value)?) {
|
||||
| Err(e) => return Err!("EventId: {event_id:?}\nCould not parse event: {e}"),
|
||||
| Ok(pdu) => write!(self, "EventId: {event_id:?}\n{pdu:#?}"),
|
||||
}
|
||||
},
|
||||
| Err(e) => Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Could not parse PDU JSON: {e:?}"
|
||||
))),
|
||||
},
|
||||
| Err(e) => Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Invalid json in command body: {e}"
|
||||
))),
|
||||
}
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn get_pdu(&self, event_id: OwnedEventId) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn get_pdu(&self, event_id: OwnedEventId) -> Result {
|
||||
let mut outlier = false;
|
||||
let mut pdu_json = self
|
||||
.services
|
||||
|
@ -123,21 +107,18 @@ pub(super) async fn get_pdu(&self, event_id: OwnedEventId) -> Result<RoomMessage
|
|||
}
|
||||
|
||||
match pdu_json {
|
||||
| Err(_) => return Err!("PDU not found locally."),
|
||||
| Ok(json) => {
|
||||
let json_text =
|
||||
serde_json::to_string_pretty(&json).expect("canonical json is valid json");
|
||||
Ok(RoomMessageEventContent::notice_markdown(format!(
|
||||
"{}\n```json\n{}\n```",
|
||||
if outlier {
|
||||
"Outlier (Rejected / Soft Failed) PDU found in our database"
|
||||
} else {
|
||||
"PDU found in our database"
|
||||
},
|
||||
json_text
|
||||
)))
|
||||
let text = serde_json::to_string_pretty(&json)?;
|
||||
let msg = if outlier {
|
||||
"Outlier (Rejected / Soft Failed) PDU found in our database"
|
||||
} else {
|
||||
"PDU found in our database"
|
||||
};
|
||||
write!(self, "{msg}\n```json\n{text}\n```",)
|
||||
},
|
||||
| Err(_) => Ok(RoomMessageEventContent::text_plain("PDU not found locally.")),
|
||||
}
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
|
@ -145,7 +126,7 @@ pub(super) async fn get_short_pdu(
|
|||
&self,
|
||||
shortroomid: ShortRoomId,
|
||||
shorteventid: ShortEventId,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
) -> Result {
|
||||
let pdu_id: RawPduId = PduId {
|
||||
shortroomid,
|
||||
shorteventid: shorteventid.into(),
|
||||
|
@ -160,41 +141,33 @@ pub(super) async fn get_short_pdu(
|
|||
.await;
|
||||
|
||||
match pdu_json {
|
||||
| Err(_) => return Err!("PDU not found locally."),
|
||||
| Ok(json) => {
|
||||
let json_text =
|
||||
serde_json::to_string_pretty(&json).expect("canonical json is valid json");
|
||||
Ok(RoomMessageEventContent::notice_markdown(format!("```json\n{json_text}\n```",)))
|
||||
let json_text = serde_json::to_string_pretty(&json)?;
|
||||
write!(self, "```json\n{json_text}\n```")
|
||||
},
|
||||
| Err(_) => Ok(RoomMessageEventContent::text_plain("PDU not found locally.")),
|
||||
}
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn get_remote_pdu_list(
|
||||
&self,
|
||||
server: OwnedServerName,
|
||||
force: bool,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn get_remote_pdu_list(&self, server: OwnedServerName, force: bool) -> Result {
|
||||
if !self.services.server.config.allow_federation {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Federation is disabled on this homeserver.",
|
||||
));
|
||||
return Err!("Federation is disabled on this homeserver.",);
|
||||
}
|
||||
|
||||
if server == self.services.globals.server_name() {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
return Err!(
|
||||
"Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
|
||||
fetching local PDUs from the database.",
|
||||
));
|
||||
);
|
||||
}
|
||||
|
||||
if self.body.len() < 2
|
||||
|| !self.body[0].trim().starts_with("```")
|
||||
|| self.body.last().unwrap_or(&EMPTY).trim() != "```"
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
));
|
||||
return Err!("Expected code block in command body. Add --help for details.",);
|
||||
}
|
||||
|
||||
let list = self
|
||||
|
@ -208,18 +181,19 @@ pub(super) async fn get_remote_pdu_list(
|
|||
let mut failed_count: usize = 0;
|
||||
let mut success_count: usize = 0;
|
||||
|
||||
for pdu in list {
|
||||
for event_id in list {
|
||||
if force {
|
||||
match self.get_remote_pdu(Box::from(pdu), server.clone()).await {
|
||||
match self
|
||||
.get_remote_pdu(event_id.to_owned(), server.clone())
|
||||
.await
|
||||
{
|
||||
| Err(e) => {
|
||||
failed_count = failed_count.saturating_add(1);
|
||||
self.services
|
||||
.admin
|
||||
.send_message(RoomMessageEventContent::text_plain(format!(
|
||||
"Failed to get remote PDU, ignoring error: {e}"
|
||||
)))
|
||||
.await
|
||||
.ok();
|
||||
.send_text(&format!("Failed to get remote PDU, ignoring error: {e}"))
|
||||
.await;
|
||||
|
||||
warn!("Failed to get remote PDU, ignoring error: {e}");
|
||||
},
|
||||
| _ => {
|
||||
|
@ -227,44 +201,48 @@ pub(super) async fn get_remote_pdu_list(
|
|||
},
|
||||
}
|
||||
} else {
|
||||
self.get_remote_pdu(Box::from(pdu), server.clone()).await?;
|
||||
self.get_remote_pdu(event_id.to_owned(), server.clone())
|
||||
.await?;
|
||||
success_count = success_count.saturating_add(1);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Fetched {success_count} remote PDUs successfully with {failed_count} failures"
|
||||
)))
|
||||
let out =
|
||||
format!("Fetched {success_count} remote PDUs successfully with {failed_count} failures");
|
||||
|
||||
self.write_str(&out).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn get_remote_pdu(
|
||||
&self,
|
||||
event_id: Box<EventId>,
|
||||
server: Box<ServerName>,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
event_id: OwnedEventId,
|
||||
server: OwnedServerName,
|
||||
) -> Result {
|
||||
if !self.services.server.config.allow_federation {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Federation is disabled on this homeserver.",
|
||||
));
|
||||
return Err!("Federation is disabled on this homeserver.");
|
||||
}
|
||||
|
||||
if server == self.services.globals.server_name() {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
return Err!(
|
||||
"Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
|
||||
fetching local PDUs.",
|
||||
));
|
||||
);
|
||||
}
|
||||
|
||||
match self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(&server, ruma::api::federation::event::get_event::v1::Request {
|
||||
event_id: event_id.clone().into(),
|
||||
event_id: event_id.clone(),
|
||||
include_unredacted_content: None,
|
||||
})
|
||||
.await
|
||||
{
|
||||
| Err(e) =>
|
||||
return Err!(
|
||||
"Remote server did not have PDU or failed sending request to remote server: {e}"
|
||||
),
|
||||
| Ok(response) => {
|
||||
let json: CanonicalJsonObject =
|
||||
serde_json::from_str(response.pdu.get()).map_err(|e| {
|
||||
|
@ -272,10 +250,9 @@ pub(super) async fn get_remote_pdu(
|
|||
"Requested event ID {event_id} from server but failed to convert from \
|
||||
RawValue to CanonicalJsonObject (malformed event/response?): {e}"
|
||||
);
|
||||
Error::BadRequest(
|
||||
ErrorKind::Unknown,
|
||||
"Received response from server but failed to parse PDU",
|
||||
)
|
||||
err!(Request(Unknown(
|
||||
"Received response from server but failed to parse PDU"
|
||||
)))
|
||||
})?;
|
||||
|
||||
trace!("Attempting to parse PDU: {:?}", &response.pdu);
|
||||
|
@ -285,6 +262,7 @@ pub(super) async fn get_remote_pdu(
|
|||
.rooms
|
||||
.event_handler
|
||||
.parse_incoming_pdu(&response.pdu)
|
||||
.boxed()
|
||||
.await;
|
||||
|
||||
let (event_id, value, room_id) = match parsed_result {
|
||||
|
@ -292,9 +270,7 @@ pub(super) async fn get_remote_pdu(
|
|||
| Err(e) => {
|
||||
warn!("Failed to parse PDU: {e}");
|
||||
info!("Full PDU: {:?}", &response.pdu);
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Failed to parse PDU remote server {server} sent us: {e}"
|
||||
)));
|
||||
return Err!("Failed to parse PDU remote server {server} sent us: {e}");
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -306,30 +282,18 @@ pub(super) async fn get_remote_pdu(
|
|||
.rooms
|
||||
.timeline
|
||||
.backfill_pdu(&server, response.pdu)
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
let json_text =
|
||||
serde_json::to_string_pretty(&json).expect("canonical json is valid json");
|
||||
|
||||
Ok(RoomMessageEventContent::notice_markdown(format!(
|
||||
"{}\n```json\n{}\n```",
|
||||
"Got PDU from specified server and handled as backfilled PDU successfully. \
|
||||
Event body:",
|
||||
json_text
|
||||
)))
|
||||
let text = serde_json::to_string_pretty(&json)?;
|
||||
let msg = "Got PDU from specified server and handled as backfilled";
|
||||
write!(self, "{msg}. Event body:\n```json\n{text}\n```")
|
||||
},
|
||||
| Err(e) => Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Remote server did not have PDU or failed sending request to remote server: {e}"
|
||||
))),
|
||||
}
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn get_room_state(
|
||||
&self,
|
||||
room: OwnedRoomOrAliasId,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn get_room_state(&self, room: OwnedRoomOrAliasId) -> Result {
|
||||
let room_id = self.services.rooms.alias.resolve(&room).await?;
|
||||
let room_state: Vec<_> = self
|
||||
.services
|
||||
|
@ -341,28 +305,24 @@ pub(super) async fn get_room_state(
|
|||
.await?;
|
||||
|
||||
if room_state.is_empty() {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Unable to find room state in our database (vector is empty)",
|
||||
));
|
||||
return Err!("Unable to find room state in our database (vector is empty)",);
|
||||
}
|
||||
|
||||
let json = serde_json::to_string_pretty(&room_state).map_err(|e| {
|
||||
warn!("Failed converting room state vector in our database to pretty JSON: {e}");
|
||||
Error::bad_database(
|
||||
err!(Database(
|
||||
"Failed to convert room state events to pretty JSON, possible invalid room state \
|
||||
events in our database",
|
||||
)
|
||||
events in our database {e}",
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok(RoomMessageEventContent::notice_markdown(format!("```json\n{json}\n```")))
|
||||
let out = format!("```json\n{json}\n```");
|
||||
self.write_str(&out).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn ping(&self, server: OwnedServerName) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn ping(&self, server: OwnedServerName) -> Result {
|
||||
if server == self.services.globals.server_name() {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Not allowed to send federation requests to ourselves.",
|
||||
));
|
||||
return Err!("Not allowed to send federation requests to ourselves.");
|
||||
}
|
||||
|
||||
let timer = tokio::time::Instant::now();
|
||||
|
@ -376,35 +336,27 @@ pub(super) async fn ping(&self, server: OwnedServerName) -> Result<RoomMessageEv
|
|||
)
|
||||
.await
|
||||
{
|
||||
| Err(e) => {
|
||||
return Err!("Failed sending federation request to specified server:\n\n{e}");
|
||||
},
|
||||
| Ok(response) => {
|
||||
let ping_time = timer.elapsed();
|
||||
|
||||
let json_text_res = serde_json::to_string_pretty(&response.server);
|
||||
|
||||
if let Ok(json) = json_text_res {
|
||||
return Ok(RoomMessageEventContent::notice_markdown(format!(
|
||||
"Got response which took {ping_time:?} time:\n```json\n{json}\n```"
|
||||
)));
|
||||
}
|
||||
let out = if let Ok(json) = json_text_res {
|
||||
format!("Got response which took {ping_time:?} time:\n```json\n{json}\n```")
|
||||
} else {
|
||||
format!("Got non-JSON response which took {ping_time:?} time:\n{response:?}")
|
||||
};
|
||||
|
||||
Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Got non-JSON response which took {ping_time:?} time:\n{response:?}"
|
||||
)))
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!(
|
||||
"Failed sending federation request to specified server from ping debug command: \
|
||||
{e}"
|
||||
);
|
||||
Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Failed sending federation request to specified server:\n\n{e}",
|
||||
)))
|
||||
write!(self, "{out}")
|
||||
},
|
||||
}
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn force_device_list_updates(&self) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn force_device_list_updates(&self) -> Result {
|
||||
// Force E2EE device list updates for all users
|
||||
self.services
|
||||
.users
|
||||
|
@ -412,27 +364,17 @@ pub(super) async fn force_device_list_updates(&self) -> Result<RoomMessageEventC
|
|||
.for_each(|user_id| self.services.users.mark_device_key_update(user_id))
|
||||
.await;
|
||||
|
||||
Ok(RoomMessageEventContent::text_plain(
|
||||
"Marked all devices for all users as having new keys to update",
|
||||
))
|
||||
write!(self, "Marked all devices for all users as having new keys to update").await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn change_log_level(
|
||||
&self,
|
||||
filter: Option<String>,
|
||||
reset: bool,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn change_log_level(&self, filter: Option<String>, reset: bool) -> Result {
|
||||
let handles = &["console"];
|
||||
|
||||
if reset {
|
||||
let old_filter_layer = match EnvFilter::try_new(&self.services.server.config.log) {
|
||||
| Ok(s) => s,
|
||||
| Err(e) => {
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Log level from config appears to be invalid now: {e}"
|
||||
)));
|
||||
},
|
||||
| Err(e) => return Err!("Log level from config appears to be invalid now: {e}"),
|
||||
};
|
||||
|
||||
match self
|
||||
|
@ -442,16 +384,12 @@ pub(super) async fn change_log_level(
|
|||
.reload
|
||||
.reload(&old_filter_layer, Some(handles))
|
||||
{
|
||||
| Err(e) =>
|
||||
return Err!("Failed to modify and reload the global tracing log level: {e}"),
|
||||
| Ok(()) => {
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Successfully changed log level back to config value {}",
|
||||
self.services.server.config.log
|
||||
)));
|
||||
},
|
||||
| Err(e) => {
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Failed to modify and reload the global tracing log level: {e}"
|
||||
)));
|
||||
let value = &self.services.server.config.log;
|
||||
let out = format!("Successfully changed log level back to config value {value}");
|
||||
return self.write_str(&out).await;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -459,11 +397,7 @@ pub(super) async fn change_log_level(
|
|||
if let Some(filter) = filter {
|
||||
let new_filter_layer = match EnvFilter::try_new(filter) {
|
||||
| Ok(s) => s,
|
||||
| Err(e) => {
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Invalid log level filter specified: {e}"
|
||||
)));
|
||||
},
|
||||
| Err(e) => return Err!("Invalid log level filter specified: {e}"),
|
||||
};
|
||||
|
||||
match self
|
||||
|
@ -473,90 +407,75 @@ pub(super) async fn change_log_level(
|
|||
.reload
|
||||
.reload(&new_filter_layer, Some(handles))
|
||||
{
|
||||
| Ok(()) => {
|
||||
return Ok(RoomMessageEventContent::text_plain("Successfully changed log level"));
|
||||
},
|
||||
| Err(e) => {
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Failed to modify and reload the global tracing log level: {e}"
|
||||
)));
|
||||
},
|
||||
| Ok(()) => return self.write_str("Successfully changed log level").await,
|
||||
| Err(e) =>
|
||||
return Err!("Failed to modify and reload the global tracing log level: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(RoomMessageEventContent::text_plain("No log level was specified."))
|
||||
Err!("No log level was specified.")
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn sign_json(&self) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn sign_json(&self) -> Result {
|
||||
if self.body.len() < 2
|
||||
|| !self.body[0].trim().starts_with("```")
|
||||
|| self.body.last().unwrap_or(&"").trim() != "```"
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
));
|
||||
return Err!("Expected code block in command body. Add --help for details.");
|
||||
}
|
||||
|
||||
let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n");
|
||||
match serde_json::from_str(&string) {
|
||||
| Err(e) => return Err!("Invalid json: {e}"),
|
||||
| Ok(mut value) => {
|
||||
self.services
|
||||
.server_keys
|
||||
.sign_json(&mut value)
|
||||
.expect("our request json is what ruma expects");
|
||||
let json_text =
|
||||
serde_json::to_string_pretty(&value).expect("canonical json is valid json");
|
||||
Ok(RoomMessageEventContent::text_plain(json_text))
|
||||
self.services.server_keys.sign_json(&mut value)?;
|
||||
let json_text = serde_json::to_string_pretty(&value)?;
|
||||
write!(self, "{json_text}")
|
||||
},
|
||||
| Err(e) => Ok(RoomMessageEventContent::text_plain(format!("Invalid json: {e}"))),
|
||||
}
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn verify_json(&self) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn verify_json(&self) -> Result {
|
||||
if self.body.len() < 2
|
||||
|| !self.body[0].trim().starts_with("```")
|
||||
|| self.body.last().unwrap_or(&"").trim() != "```"
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
));
|
||||
return Err!("Expected code block in command body. Add --help for details.");
|
||||
}
|
||||
|
||||
let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n");
|
||||
match serde_json::from_str::<CanonicalJsonObject>(&string) {
|
||||
| Err(e) => return Err!("Invalid json: {e}"),
|
||||
| Ok(value) => match self.services.server_keys.verify_json(&value, None).await {
|
||||
| Ok(()) => Ok(RoomMessageEventContent::text_plain("Signature correct")),
|
||||
| Err(e) => Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Signature verification failed: {e}"
|
||||
))),
|
||||
| Err(e) => return Err!("Signature verification failed: {e}"),
|
||||
| Ok(()) => write!(self, "Signature correct"),
|
||||
},
|
||||
| Err(e) => Ok(RoomMessageEventContent::text_plain(format!("Invalid json: {e}"))),
|
||||
}
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn verify_pdu(&self, event_id: OwnedEventId) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn verify_pdu(&self, event_id: OwnedEventId) -> Result {
|
||||
use ruma::signatures::Verified;
|
||||
|
||||
let mut event = self.services.rooms.timeline.get_pdu_json(&event_id).await?;
|
||||
|
||||
event.remove("event_id");
|
||||
let msg = match self.services.server_keys.verify_event(&event, None).await {
|
||||
| Ok(ruma::signatures::Verified::Signatures) =>
|
||||
"signatures OK, but content hash failed (redaction).",
|
||||
| Ok(ruma::signatures::Verified::All) => "signatures and hashes OK.",
|
||||
| Err(e) => return Err(e),
|
||||
| Ok(Verified::Signatures) => "signatures OK, but content hash failed (redaction).",
|
||||
| Ok(Verified::All) => "signatures and hashes OK.",
|
||||
};
|
||||
|
||||
Ok(RoomMessageEventContent::notice_plain(msg))
|
||||
self.write_str(msg).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub(super) async fn first_pdu_in_room(
|
||||
&self,
|
||||
room_id: OwnedRoomId,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn first_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
|
||||
if !self
|
||||
.services
|
||||
.rooms
|
||||
|
@ -564,9 +483,7 @@ pub(super) async fn first_pdu_in_room(
|
|||
.server_in_room(&self.services.server.name, &room_id)
|
||||
.await
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"We are not participating in the room / we don't know about the room ID.",
|
||||
));
|
||||
return Err!("We are not participating in the room / we don't know about the room ID.",);
|
||||
}
|
||||
|
||||
let first_pdu = self
|
||||
|
@ -575,17 +492,15 @@ pub(super) async fn first_pdu_in_room(
|
|||
.timeline
|
||||
.first_pdu_in_room(&room_id)
|
||||
.await
|
||||
.map_err(|_| Error::bad_database("Failed to find the first PDU in database"))?;
|
||||
.map_err(|_| err!(Database("Failed to find the first PDU in database")))?;
|
||||
|
||||
Ok(RoomMessageEventContent::text_plain(format!("{first_pdu:?}")))
|
||||
let out = format!("{first_pdu:?}");
|
||||
self.write_str(&out).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub(super) async fn latest_pdu_in_room(
|
||||
&self,
|
||||
room_id: OwnedRoomId,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn latest_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
|
||||
if !self
|
||||
.services
|
||||
.rooms
|
||||
|
@ -593,9 +508,7 @@ pub(super) async fn latest_pdu_in_room(
|
|||
.server_in_room(&self.services.server.name, &room_id)
|
||||
.await
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"We are not participating in the room / we don't know about the room ID.",
|
||||
));
|
||||
return Err!("We are not participating in the room / we don't know about the room ID.");
|
||||
}
|
||||
|
||||
let latest_pdu = self
|
||||
|
@ -604,9 +517,10 @@ pub(super) async fn latest_pdu_in_room(
|
|||
.timeline
|
||||
.latest_pdu_in_room(&room_id)
|
||||
.await
|
||||
.map_err(|_| Error::bad_database("Failed to find the latest PDU in database"))?;
|
||||
.map_err(|_| err!(Database("Failed to find the latest PDU in database")))?;
|
||||
|
||||
Ok(RoomMessageEventContent::text_plain(format!("{latest_pdu:?}")))
|
||||
let out = format!("{latest_pdu:?}");
|
||||
self.write_str(&out).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
|
@ -615,7 +529,7 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
&self,
|
||||
room_id: OwnedRoomId,
|
||||
server_name: OwnedServerName,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
) -> Result {
|
||||
if !self
|
||||
.services
|
||||
.rooms
|
||||
|
@ -623,9 +537,7 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
.server_in_room(&self.services.server.name, &room_id)
|
||||
.await
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"We are not participating in the room / we don't know about the room ID.",
|
||||
));
|
||||
return Err!("We are not participating in the room / we don't know about the room ID.");
|
||||
}
|
||||
|
||||
let first_pdu = self
|
||||
|
@ -634,7 +546,7 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
.timeline
|
||||
.latest_pdu_in_room(&room_id)
|
||||
.await
|
||||
.map_err(|_| Error::bad_database("Failed to find the latest PDU in database"))?;
|
||||
.map_err(|_| err!(Database("Failed to find the latest PDU in database")))?;
|
||||
|
||||
let room_version = self.services.rooms.state.get_room_version(&room_id).await?;
|
||||
|
||||
|
@ -644,10 +556,9 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
.services
|
||||
.sending
|
||||
.send_federation_request(&server_name, get_room_state::v1::Request {
|
||||
room_id: room_id.clone().into(),
|
||||
room_id: room_id.clone(),
|
||||
event_id: first_pdu.event_id.clone(),
|
||||
})
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
for pdu in remote_state_response.pdus.clone() {
|
||||
|
@ -656,7 +567,6 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
.rooms
|
||||
.event_handler
|
||||
.parse_incoming_pdu(&pdu)
|
||||
.boxed()
|
||||
.await
|
||||
{
|
||||
| Ok(t) => t,
|
||||
|
@ -720,7 +630,6 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
.rooms
|
||||
.event_handler
|
||||
.resolve_state(&room_id, &room_version, state)
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
info!("Forcing new room state");
|
||||
|
@ -736,6 +645,7 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
.await?;
|
||||
|
||||
let state_lock = self.services.rooms.state.mutex.lock(&*room_id).await;
|
||||
|
||||
self.services
|
||||
.rooms
|
||||
.state
|
||||
|
@ -752,11 +662,8 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
.update_joined_count(&room_id)
|
||||
.await;
|
||||
|
||||
drop(state_lock);
|
||||
|
||||
Ok(RoomMessageEventContent::text_plain(
|
||||
"Successfully forced the room state from the requested remote server.",
|
||||
))
|
||||
self.write_str("Successfully forced the room state from the requested remote server.")
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
|
@ -765,8 +672,8 @@ pub(super) async fn get_signing_keys(
|
|||
server_name: Option<OwnedServerName>,
|
||||
notary: Option<OwnedServerName>,
|
||||
query: bool,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone().into());
|
||||
) -> Result {
|
||||
let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone());
|
||||
|
||||
if let Some(notary) = notary {
|
||||
let signing_keys = self
|
||||
|
@ -775,9 +682,8 @@ pub(super) async fn get_signing_keys(
|
|||
.notary_request(¬ary, &server_name)
|
||||
.await?;
|
||||
|
||||
return Ok(RoomMessageEventContent::notice_markdown(format!(
|
||||
"```rs\n{signing_keys:#?}\n```"
|
||||
)));
|
||||
let out = format!("```rs\n{signing_keys:#?}\n```");
|
||||
return self.write_str(&out).await;
|
||||
}
|
||||
|
||||
let signing_keys = if query {
|
||||
|
@ -792,17 +698,13 @@ pub(super) async fn get_signing_keys(
|
|||
.await?
|
||||
};
|
||||
|
||||
Ok(RoomMessageEventContent::notice_markdown(format!(
|
||||
"```rs\n{signing_keys:#?}\n```"
|
||||
)))
|
||||
let out = format!("```rs\n{signing_keys:#?}\n```");
|
||||
self.write_str(&out).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn get_verify_keys(
|
||||
&self,
|
||||
server_name: Option<OwnedServerName>,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone().into());
|
||||
pub(super) async fn get_verify_keys(&self, server_name: Option<OwnedServerName>) -> Result {
|
||||
let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone());
|
||||
|
||||
let keys = self
|
||||
.services
|
||||
|
@ -817,7 +719,7 @@ pub(super) async fn get_verify_keys(
|
|||
writeln!(out, "| {key_id} | {key:?} |")?;
|
||||
}
|
||||
|
||||
Ok(RoomMessageEventContent::notice_markdown(out))
|
||||
self.write_str(&out).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
|
@ -825,18 +727,16 @@ pub(super) async fn resolve_true_destination(
|
|||
&self,
|
||||
server_name: OwnedServerName,
|
||||
no_cache: bool,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
) -> Result {
|
||||
if !self.services.server.config.allow_federation {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Federation is disabled on this homeserver.",
|
||||
));
|
||||
return Err!("Federation is disabled on this homeserver.",);
|
||||
}
|
||||
|
||||
if server_name == self.services.server.name {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
return Err!(
|
||||
"Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
|
||||
fetching local PDUs.",
|
||||
));
|
||||
);
|
||||
}
|
||||
|
||||
let actual = self
|
||||
|
@ -845,13 +745,12 @@ pub(super) async fn resolve_true_destination(
|
|||
.resolve_actual_dest(&server_name, !no_cache)
|
||||
.await?;
|
||||
|
||||
let msg = format!("Destination: {}\nHostname URI: {}", actual.dest, actual.host,);
|
||||
|
||||
Ok(RoomMessageEventContent::text_markdown(msg))
|
||||
let msg = format!("Destination: {}\nHostname URI: {}", actual.dest, actual.host);
|
||||
self.write_str(&msg).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn memory_stats(&self, opts: Option<String>) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn memory_stats(&self, opts: Option<String>) -> Result {
|
||||
const OPTS: &str = "abcdefghijklmnopqrstuvwxyz";
|
||||
|
||||
let opts: String = OPTS
|
||||
|
@ -870,13 +769,12 @@ pub(super) async fn memory_stats(&self, opts: Option<String>) -> Result<RoomMess
|
|||
self.write_str("```\n").await?;
|
||||
self.write_str(&stats).await?;
|
||||
self.write_str("\n```").await?;
|
||||
|
||||
Ok(RoomMessageEventContent::text_plain(""))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
#[admin_command]
|
||||
pub(super) async fn runtime_metrics(&self) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn runtime_metrics(&self) -> Result {
|
||||
let out = self.services.server.metrics.runtime_metrics().map_or_else(
|
||||
|| "Runtime metrics are not available.".to_owned(),
|
||||
|metrics| {
|
||||
|
@ -889,51 +787,51 @@ pub(super) async fn runtime_metrics(&self) -> Result<RoomMessageEventContent> {
|
|||
},
|
||||
);
|
||||
|
||||
Ok(RoomMessageEventContent::text_markdown(out))
|
||||
self.write_str(&out).await
|
||||
}
|
||||
|
||||
#[cfg(not(tokio_unstable))]
|
||||
#[admin_command]
|
||||
pub(super) async fn runtime_metrics(&self) -> Result<RoomMessageEventContent> {
|
||||
Ok(RoomMessageEventContent::text_markdown(
|
||||
"Runtime metrics require building with `tokio_unstable`.",
|
||||
))
|
||||
pub(super) async fn runtime_metrics(&self) -> Result {
|
||||
self.write_str("Runtime metrics require building with `tokio_unstable`.")
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
#[admin_command]
|
||||
pub(super) async fn runtime_interval(&self) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn runtime_interval(&self) -> Result {
|
||||
let out = self.services.server.metrics.runtime_interval().map_or_else(
|
||||
|| "Runtime metrics are not available.".to_owned(),
|
||||
|metrics| format!("```rs\n{metrics:#?}\n```"),
|
||||
);
|
||||
|
||||
Ok(RoomMessageEventContent::text_markdown(out))
|
||||
self.write_str(&out).await
|
||||
}
|
||||
|
||||
#[cfg(not(tokio_unstable))]
|
||||
#[admin_command]
|
||||
pub(super) async fn runtime_interval(&self) -> Result<RoomMessageEventContent> {
|
||||
Ok(RoomMessageEventContent::text_markdown(
|
||||
"Runtime metrics require building with `tokio_unstable`.",
|
||||
))
|
||||
pub(super) async fn runtime_interval(&self) -> Result {
|
||||
self.write_str("Runtime metrics require building with `tokio_unstable`.")
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn time(&self) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn time(&self) -> Result {
|
||||
let now = SystemTime::now();
|
||||
Ok(RoomMessageEventContent::text_markdown(utils::time::format(now, "%+")))
|
||||
let now = utils::time::format(now, "%+");
|
||||
|
||||
self.write_str(&now).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn list_dependencies(&self, names: bool) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn list_dependencies(&self, names: bool) -> Result {
|
||||
if names {
|
||||
let out = info::cargo::dependencies_names().join(" ");
|
||||
return Ok(RoomMessageEventContent::notice_markdown(out));
|
||||
return self.write_str(&out).await;
|
||||
}
|
||||
|
||||
let deps = info::cargo::dependencies();
|
||||
let mut out = String::new();
|
||||
let deps = info::cargo::dependencies();
|
||||
writeln!(out, "| name | version | features |")?;
|
||||
writeln!(out, "| ---- | ------- | -------- |")?;
|
||||
for (name, dep) in deps {
|
||||
|
@ -944,10 +842,11 @@ pub(super) async fn list_dependencies(&self, names: bool) -> Result<RoomMessageE
|
|||
} else {
|
||||
String::new()
|
||||
};
|
||||
|
||||
writeln!(out, "| {name} | {version} | {feats} |")?;
|
||||
}
|
||||
|
||||
Ok(RoomMessageEventContent::notice_markdown(out))
|
||||
self.write_str(&out).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
|
@ -955,7 +854,7 @@ pub(super) async fn database_stats(
|
|||
&self,
|
||||
property: Option<String>,
|
||||
map: Option<String>,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
) -> Result {
|
||||
let map_name = map.as_ref().map_or(EMPTY, String::as_str);
|
||||
let property = property.unwrap_or_else(|| "rocksdb.stats".to_owned());
|
||||
self.services
|
||||
|
@ -967,17 +866,11 @@ pub(super) async fn database_stats(
|
|||
let res = map.property(&property).expect("invalid property");
|
||||
writeln!(self, "##### {name}:\n```\n{}\n```", res.trim())
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(RoomMessageEventContent::notice_plain(""))
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn database_files(
|
||||
&self,
|
||||
map: Option<String>,
|
||||
level: Option<i32>,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn database_files(&self, map: Option<String>, level: Option<i32>) -> Result {
|
||||
let mut files: Vec<_> = self.services.db.db.file_list().collect::<Result<_>>()?;
|
||||
|
||||
files.sort_by_key(|f| f.name.clone());
|
||||
|
@ -1004,16 +897,12 @@ pub(super) async fn database_files(
|
|||
file.column_family_name,
|
||||
)
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(RoomMessageEventContent::notice_plain(""))
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn trim_memory(&self) -> Result<RoomMessageEventContent> {
|
||||
pub(super) async fn trim_memory(&self) -> Result {
|
||||
conduwuit::alloc::trim(None)?;
|
||||
|
||||
writeln!(self, "done").await?;
|
||||
|
||||
Ok(RoomMessageEventContent::notice_plain(""))
|
||||
writeln!(self, "done").await
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue