de-global services from admin
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
7a3cc3941e
commit
7e50db4193
37 changed files with 1131 additions and 1127 deletions
|
@ -16,19 +16,22 @@ use ruma::{
|
|||
events::room::message::RoomMessageEventContent,
|
||||
CanonicalJsonObject, EventId, OwnedRoomOrAliasId, RoomId, RoomVersionId, ServerName,
|
||||
};
|
||||
use service::services;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
pub(super) async fn echo(_body: &[&str], message: Vec<String>) -> Result<RoomMessageEventContent> {
|
||||
use crate::admin_command;
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn echo(&self, message: Vec<String>) -> Result<RoomMessageEventContent> {
|
||||
let message = message.join(" ");
|
||||
|
||||
Ok(RoomMessageEventContent::notice_plain(message))
|
||||
}
|
||||
|
||||
pub(super) async fn get_auth_chain(_body: &[&str], event_id: Box<EventId>) -> Result<RoomMessageEventContent> {
|
||||
#[admin_command]
|
||||
pub(super) async fn get_auth_chain(&self, event_id: Box<EventId>) -> Result<RoomMessageEventContent> {
|
||||
let event_id = Arc::<EventId>::from(event_id);
|
||||
if let Some(event) = services().rooms.timeline.get_pdu_json(&event_id)? {
|
||||
if let Some(event) = self.services.rooms.timeline.get_pdu_json(&event_id)? {
|
||||
let room_id_str = event
|
||||
.get("room_id")
|
||||
.and_then(|val| val.as_str())
|
||||
|
@ -36,13 +39,16 @@ pub(super) async fn get_auth_chain(_body: &[&str], event_id: Box<EventId>) -> Re
|
|||
|
||||
let room_id = <&RoomId>::try_from(room_id_str)
|
||||
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
|
||||
|
||||
let start = Instant::now();
|
||||
let count = services()
|
||||
let count = self
|
||||
.services
|
||||
.rooms
|
||||
.auth_chain
|
||||
.event_ids_iter(room_id, vec![event_id])
|
||||
.await?
|
||||
.count();
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Loaded auth chain with length {count} in {elapsed:?}"
|
||||
|
@ -52,14 +58,16 @@ pub(super) async fn get_auth_chain(_body: &[&str], event_id: Box<EventId>) -> Re
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) async fn parse_pdu(body: &[&str]) -> Result<RoomMessageEventContent> {
|
||||
if body.len() < 2 || !body[0].trim().starts_with("```") || body.last().unwrap_or(&"").trim() != "```" {
|
||||
#[admin_command]
|
||||
pub(super) async fn parse_pdu(&self) -> Result<RoomMessageEventContent> {
|
||||
if self.body.len() < 2 || !self.body[0].trim().starts_with("```") || self.body.last().unwrap_or(&"").trim() != "```"
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
));
|
||||
}
|
||||
|
||||
let string = body[1..body.len().saturating_sub(1)].join("\n");
|
||||
let string = self.body[1..self.body.len().saturating_sub(1)].join("\n");
|
||||
match serde_json::from_str(&string) {
|
||||
Ok(value) => match ruma::signatures::reference_hash(&value, &RoomVersionId::V6) {
|
||||
Ok(hash) => {
|
||||
|
@ -80,15 +88,17 @@ pub(super) async fn parse_pdu(body: &[&str]) -> Result<RoomMessageEventContent>
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) async fn get_pdu(_body: &[&str], event_id: Box<EventId>) -> Result<RoomMessageEventContent> {
|
||||
#[admin_command]
|
||||
pub(super) async fn get_pdu(&self, event_id: Box<EventId>) -> Result<RoomMessageEventContent> {
|
||||
let mut outlier = false;
|
||||
let mut pdu_json = services()
|
||||
let mut pdu_json = self
|
||||
.services
|
||||
.rooms
|
||||
.timeline
|
||||
.get_non_outlier_pdu_json(&event_id)?;
|
||||
if pdu_json.is_none() {
|
||||
outlier = true;
|
||||
pdu_json = services().rooms.timeline.get_pdu_json(&event_id)?;
|
||||
pdu_json = self.services.rooms.timeline.get_pdu_json(&event_id)?;
|
||||
}
|
||||
match pdu_json {
|
||||
Some(json) => {
|
||||
|
@ -107,39 +117,42 @@ pub(super) async fn get_pdu(_body: &[&str], event_id: Box<EventId>) -> Result<Ro
|
|||
}
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn get_remote_pdu_list(
|
||||
body: &[&str], server: Box<ServerName>, force: bool,
|
||||
&self, server: Box<ServerName>, force: bool,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
if !services().globals.config.allow_federation {
|
||||
if !self.services.globals.config.allow_federation {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Federation is disabled on this homeserver.",
|
||||
));
|
||||
}
|
||||
|
||||
if server == services().globals.server_name() {
|
||||
if server == self.services.globals.server_name() {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Not allowed to send federation requests to ourselves. Please use `get-pdu` for fetching local PDUs from \
|
||||
the database.",
|
||||
));
|
||||
}
|
||||
|
||||
if body.len() < 2 || !body[0].trim().starts_with("```") || body.last().unwrap_or(&"").trim() != "```" {
|
||||
if self.body.len() < 2 || !self.body[0].trim().starts_with("```") || self.body.last().unwrap_or(&"").trim() != "```"
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
));
|
||||
}
|
||||
|
||||
let list = body
|
||||
let list = self
|
||||
.body
|
||||
.iter()
|
||||
.collect::<Vec<_>>()
|
||||
.drain(1..body.len().saturating_sub(1))
|
||||
.drain(1..self.body.len().saturating_sub(1))
|
||||
.filter_map(|pdu| EventId::parse(pdu).ok())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for pdu in list {
|
||||
if force {
|
||||
if let Err(e) = get_remote_pdu(&[], Box::from(pdu), server.clone()).await {
|
||||
services()
|
||||
if let Err(e) = self.get_remote_pdu(Box::from(pdu), server.clone()).await {
|
||||
self.services
|
||||
.admin
|
||||
.send_message(RoomMessageEventContent::text_plain(format!(
|
||||
"Failed to get remote PDU, ignoring error: {e}"
|
||||
|
@ -148,29 +161,31 @@ pub(super) async fn get_remote_pdu_list(
|
|||
warn!(%e, "Failed to get remote PDU, ignoring error");
|
||||
}
|
||||
} else {
|
||||
get_remote_pdu(&[], Box::from(pdu), server.clone()).await?;
|
||||
self.get_remote_pdu(Box::from(pdu), server.clone()).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(RoomMessageEventContent::text_plain("Fetched list of remote PDUs."))
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn get_remote_pdu(
|
||||
_body: &[&str], event_id: Box<EventId>, server: Box<ServerName>,
|
||||
&self, event_id: Box<EventId>, server: Box<ServerName>,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
if !services().globals.config.allow_federation {
|
||||
if !self.services.globals.config.allow_federation {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Federation is disabled on this homeserver.",
|
||||
));
|
||||
}
|
||||
|
||||
if server == services().globals.server_name() {
|
||||
if server == self.services.globals.server_name() {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Not allowed to send federation requests to ourselves. Please use `get-pdu` for fetching local PDUs.",
|
||||
));
|
||||
}
|
||||
|
||||
match services()
|
||||
match self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&server,
|
||||
|
@ -191,7 +206,8 @@ pub(super) async fn get_remote_pdu(
|
|||
|
||||
debug!("Attempting to parse PDU: {:?}", &response.pdu);
|
||||
let parsed_pdu = {
|
||||
let parsed_result = services()
|
||||
let parsed_result = self
|
||||
.services
|
||||
.rooms
|
||||
.event_handler
|
||||
.parse_incoming_pdu(&response.pdu);
|
||||
|
@ -212,7 +228,7 @@ pub(super) async fn get_remote_pdu(
|
|||
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||
|
||||
debug!("Attempting to fetch homeserver signing keys for {server}");
|
||||
services()
|
||||
self.services
|
||||
.rooms
|
||||
.event_handler
|
||||
.fetch_required_signing_keys(parsed_pdu.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map)
|
||||
|
@ -222,7 +238,7 @@ pub(super) async fn get_remote_pdu(
|
|||
});
|
||||
|
||||
info!("Attempting to handle event ID {event_id} as backfilled PDU");
|
||||
services()
|
||||
self.services
|
||||
.rooms
|
||||
.timeline
|
||||
.backfill_pdu(&server, response.pdu, &pub_key_map)
|
||||
|
@ -241,9 +257,11 @@ pub(super) async fn get_remote_pdu(
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) async fn get_room_state(_body: &[&str], room: OwnedRoomOrAliasId) -> Result<RoomMessageEventContent> {
|
||||
let room_id = services().rooms.alias.resolve(&room).await?;
|
||||
let room_state = services()
|
||||
#[admin_command]
|
||||
pub(super) async fn get_room_state(&self, room: OwnedRoomOrAliasId) -> Result<RoomMessageEventContent> {
|
||||
let room_id = self.services.rooms.alias.resolve(&room).await?;
|
||||
let room_state = self
|
||||
.services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.room_state_full(&room_id)
|
||||
|
@ -268,8 +286,9 @@ pub(super) async fn get_room_state(_body: &[&str], room: OwnedRoomOrAliasId) ->
|
|||
Ok(RoomMessageEventContent::notice_markdown(format!("```json\n{json}\n```")))
|
||||
}
|
||||
|
||||
pub(super) async fn ping(_body: &[&str], server: Box<ServerName>) -> Result<RoomMessageEventContent> {
|
||||
if server == services().globals.server_name() {
|
||||
#[admin_command]
|
||||
pub(super) async fn ping(&self, server: Box<ServerName>) -> Result<RoomMessageEventContent> {
|
||||
if server == self.services.globals.server_name() {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Not allowed to send federation requests to ourselves.",
|
||||
));
|
||||
|
@ -277,7 +296,8 @@ pub(super) async fn ping(_body: &[&str], server: Box<ServerName>) -> Result<Room
|
|||
|
||||
let timer = tokio::time::Instant::now();
|
||||
|
||||
match services()
|
||||
match self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(&server, ruma::api::federation::discovery::get_server_version::v1::Request {})
|
||||
.await
|
||||
|
@ -306,23 +326,23 @@ pub(super) async fn ping(_body: &[&str], server: Box<ServerName>) -> Result<Room
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) async fn force_device_list_updates(_body: &[&str]) -> Result<RoomMessageEventContent> {
|
||||
#[admin_command]
|
||||
pub(super) async fn force_device_list_updates(&self) -> Result<RoomMessageEventContent> {
|
||||
// Force E2EE device list updates for all users
|
||||
for user_id in services().users.iter().filter_map(Result::ok) {
|
||||
services().users.mark_device_key_update(&user_id)?;
|
||||
for user_id in self.services.users.iter().filter_map(Result::ok) {
|
||||
self.services.users.mark_device_key_update(&user_id)?;
|
||||
}
|
||||
Ok(RoomMessageEventContent::text_plain(
|
||||
"Marked all devices for all users as having new keys to update",
|
||||
))
|
||||
}
|
||||
|
||||
pub(super) async fn change_log_level(
|
||||
_body: &[&str], filter: Option<String>, reset: bool,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
#[admin_command]
|
||||
pub(super) async fn change_log_level(&self, filter: Option<String>, reset: bool) -> Result<RoomMessageEventContent> {
|
||||
let handles = &["console"];
|
||||
|
||||
if reset {
|
||||
let old_filter_layer = match EnvFilter::try_new(&services().globals.config.log) {
|
||||
let old_filter_layer = match EnvFilter::try_new(&self.services.globals.config.log) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
|
@ -331,7 +351,8 @@ pub(super) async fn change_log_level(
|
|||
},
|
||||
};
|
||||
|
||||
match services()
|
||||
match self
|
||||
.services
|
||||
.server
|
||||
.log
|
||||
.reload
|
||||
|
@ -340,7 +361,7 @@ pub(super) async fn change_log_level(
|
|||
Ok(()) => {
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Successfully changed log level back to config value {}",
|
||||
services().globals.config.log
|
||||
self.services.globals.config.log
|
||||
)));
|
||||
},
|
||||
Err(e) => {
|
||||
|
@ -361,7 +382,8 @@ pub(super) async fn change_log_level(
|
|||
},
|
||||
};
|
||||
|
||||
match services()
|
||||
match self
|
||||
.services
|
||||
.server
|
||||
.log
|
||||
.reload
|
||||
|
@ -381,19 +403,21 @@ pub(super) async fn change_log_level(
|
|||
Ok(RoomMessageEventContent::text_plain("No log level was specified."))
|
||||
}
|
||||
|
||||
pub(super) async fn sign_json(body: &[&str]) -> Result<RoomMessageEventContent> {
|
||||
if body.len() < 2 || !body[0].trim().starts_with("```") || body.last().unwrap_or(&"").trim() != "```" {
|
||||
#[admin_command]
|
||||
pub(super) async fn sign_json(&self) -> Result<RoomMessageEventContent> {
|
||||
if self.body.len() < 2 || !self.body[0].trim().starts_with("```") || self.body.last().unwrap_or(&"").trim() != "```"
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
));
|
||||
}
|
||||
|
||||
let string = body[1..body.len().checked_sub(1).unwrap()].join("\n");
|
||||
let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n");
|
||||
match serde_json::from_str(&string) {
|
||||
Ok(mut value) => {
|
||||
ruma::signatures::sign_json(
|
||||
services().globals.server_name().as_str(),
|
||||
services().globals.keypair(),
|
||||
self.services.globals.server_name().as_str(),
|
||||
self.services.globals.keypair(),
|
||||
&mut value,
|
||||
)
|
||||
.expect("our request json is what ruma expects");
|
||||
|
@ -404,19 +428,21 @@ pub(super) async fn sign_json(body: &[&str]) -> Result<RoomMessageEventContent>
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) async fn verify_json(body: &[&str]) -> Result<RoomMessageEventContent> {
|
||||
if body.len() < 2 || !body[0].trim().starts_with("```") || body.last().unwrap_or(&"").trim() != "```" {
|
||||
#[admin_command]
|
||||
pub(super) async fn verify_json(&self) -> Result<RoomMessageEventContent> {
|
||||
if self.body.len() < 2 || !self.body[0].trim().starts_with("```") || self.body.last().unwrap_or(&"").trim() != "```"
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
));
|
||||
}
|
||||
|
||||
let string = body[1..body.len().checked_sub(1).unwrap()].join("\n");
|
||||
let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n");
|
||||
match serde_json::from_str(&string) {
|
||||
Ok(value) => {
|
||||
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||
|
||||
services()
|
||||
self.services
|
||||
.rooms
|
||||
.event_handler
|
||||
.fetch_required_signing_keys([&value], &pub_key_map)
|
||||
|
@ -434,19 +460,22 @@ pub(super) async fn verify_json(body: &[&str]) -> Result<RoomMessageEventContent
|
|||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(_body))]
|
||||
pub(super) async fn first_pdu_in_room(_body: &[&str], room_id: Box<RoomId>) -> Result<RoomMessageEventContent> {
|
||||
if !services()
|
||||
#[admin_command]
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub(super) async fn first_pdu_in_room(&self, room_id: Box<RoomId>) -> Result<RoomMessageEventContent> {
|
||||
if !self
|
||||
.services
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(&services().globals.config.server_name, &room_id)?
|
||||
.server_in_room(&self.services.globals.config.server_name, &room_id)?
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"We are not participating in the room / we don't know about the room ID.",
|
||||
));
|
||||
}
|
||||
|
||||
let first_pdu = services()
|
||||
let first_pdu = self
|
||||
.services
|
||||
.rooms
|
||||
.timeline
|
||||
.first_pdu_in_room(&room_id)?
|
||||
|
@ -455,19 +484,22 @@ pub(super) async fn first_pdu_in_room(_body: &[&str], room_id: Box<RoomId>) -> R
|
|||
Ok(RoomMessageEventContent::text_plain(format!("{first_pdu:?}")))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(_body))]
|
||||
pub(super) async fn latest_pdu_in_room(_body: &[&str], room_id: Box<RoomId>) -> Result<RoomMessageEventContent> {
|
||||
if !services()
|
||||
#[admin_command]
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub(super) async fn latest_pdu_in_room(&self, room_id: Box<RoomId>) -> Result<RoomMessageEventContent> {
|
||||
if !self
|
||||
.services
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(&services().globals.config.server_name, &room_id)?
|
||||
.server_in_room(&self.services.globals.config.server_name, &room_id)?
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"We are not participating in the room / we don't know about the room ID.",
|
||||
));
|
||||
}
|
||||
|
||||
let latest_pdu = services()
|
||||
let latest_pdu = self
|
||||
.services
|
||||
.rooms
|
||||
.timeline
|
||||
.latest_pdu_in_room(&room_id)?
|
||||
|
@ -476,32 +508,36 @@ pub(super) async fn latest_pdu_in_room(_body: &[&str], room_id: Box<RoomId>) ->
|
|||
Ok(RoomMessageEventContent::text_plain(format!("{latest_pdu:?}")))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(_body))]
|
||||
#[admin_command]
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub(super) async fn force_set_room_state_from_server(
|
||||
_body: &[&str], room_id: Box<RoomId>, server_name: Box<ServerName>,
|
||||
&self, room_id: Box<RoomId>, server_name: Box<ServerName>,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
if !services()
|
||||
if !self
|
||||
.services
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(&services().globals.config.server_name, &room_id)?
|
||||
.server_in_room(&self.services.globals.config.server_name, &room_id)?
|
||||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"We are not participating in the room / we don't know about the room ID.",
|
||||
));
|
||||
}
|
||||
|
||||
let first_pdu = services()
|
||||
let first_pdu = self
|
||||
.services
|
||||
.rooms
|
||||
.timeline
|
||||
.latest_pdu_in_room(&room_id)?
|
||||
.ok_or_else(|| Error::bad_database("Failed to find the latest PDU in database"))?;
|
||||
|
||||
let room_version = services().rooms.state.get_room_version(&room_id)?;
|
||||
let room_version = self.services.rooms.state.get_room_version(&room_id)?;
|
||||
|
||||
let mut state: HashMap<u64, Arc<EventId>> = HashMap::new();
|
||||
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||
|
||||
let remote_state_response = services()
|
||||
let remote_state_response = self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&server_name,
|
||||
|
@ -515,7 +551,7 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
let mut events = Vec::with_capacity(remote_state_response.pdus.len());
|
||||
|
||||
for pdu in remote_state_response.pdus.clone() {
|
||||
events.push(match services().rooms.event_handler.parse_incoming_pdu(&pdu) {
|
||||
events.push(match self.services.rooms.event_handler.parse_incoming_pdu(&pdu) {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
warn!("Could not parse PDU, ignoring: {e}");
|
||||
|
@ -525,7 +561,7 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
}
|
||||
|
||||
info!("Fetching required signing keys for all the state events we got");
|
||||
services()
|
||||
self.services
|
||||
.rooms
|
||||
.event_handler
|
||||
.fetch_required_signing_keys(events.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map)
|
||||
|
@ -535,7 +571,7 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
for result in remote_state_response
|
||||
.pdus
|
||||
.iter()
|
||||
.map(|pdu| validate_and_add_event_id(services(), pdu, &room_version, &pub_key_map))
|
||||
.map(|pdu| validate_and_add_event_id(self.services, pdu, &room_version, &pub_key_map))
|
||||
{
|
||||
let Ok((event_id, value)) = result.await else {
|
||||
continue;
|
||||
|
@ -546,12 +582,13 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
Error::BadServerResponse("Invalid PDU in send_join response.")
|
||||
})?;
|
||||
|
||||
services()
|
||||
self.services
|
||||
.rooms
|
||||
.outlier
|
||||
.add_pdu_outlier(&event_id, &value)?;
|
||||
if let Some(state_key) = &pdu.state_key {
|
||||
let shortstatekey = services()
|
||||
let shortstatekey = self
|
||||
.services
|
||||
.rooms
|
||||
.short
|
||||
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)?;
|
||||
|
@ -563,32 +600,34 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
for result in remote_state_response
|
||||
.auth_chain
|
||||
.iter()
|
||||
.map(|pdu| validate_and_add_event_id(services(), pdu, &room_version, &pub_key_map))
|
||||
.map(|pdu| validate_and_add_event_id(self.services, pdu, &room_version, &pub_key_map))
|
||||
{
|
||||
let Ok((event_id, value)) = result.await else {
|
||||
continue;
|
||||
};
|
||||
|
||||
services()
|
||||
self.services
|
||||
.rooms
|
||||
.outlier
|
||||
.add_pdu_outlier(&event_id, &value)?;
|
||||
}
|
||||
|
||||
let new_room_state = services()
|
||||
let new_room_state = self
|
||||
.services
|
||||
.rooms
|
||||
.event_handler
|
||||
.resolve_state(room_id.clone().as_ref(), &room_version, state)
|
||||
.await?;
|
||||
|
||||
info!("Forcing new room state");
|
||||
let (short_state_hash, new, removed) = services()
|
||||
let (short_state_hash, new, removed) = self
|
||||
.services
|
||||
.rooms
|
||||
.state_compressor
|
||||
.save_state(room_id.clone().as_ref(), new_room_state)?;
|
||||
|
||||
let state_lock = services().rooms.state.mutex.lock(&room_id).await;
|
||||
services()
|
||||
let state_lock = self.services.rooms.state.mutex.lock(&room_id).await;
|
||||
self.services
|
||||
.rooms
|
||||
.state
|
||||
.force_state(room_id.clone().as_ref(), short_state_hash, new, removed, &state_lock)
|
||||
|
@ -598,7 +637,10 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
"Updating joined counts for room just in case (e.g. we may have found a difference in the room's \
|
||||
m.room.member state"
|
||||
);
|
||||
services().rooms.state_cache.update_joined_count(&room_id)?;
|
||||
self.services
|
||||
.rooms
|
||||
.state_cache
|
||||
.update_joined_count(&room_id)?;
|
||||
|
||||
drop(state_lock);
|
||||
|
||||
|
@ -607,28 +649,30 @@ pub(super) async fn force_set_room_state_from_server(
|
|||
))
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn get_signing_keys(
|
||||
_body: &[&str], server_name: Option<Box<ServerName>>, _cached: bool,
|
||||
&self, server_name: Option<Box<ServerName>>, _cached: bool,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
let server_name = server_name.unwrap_or_else(|| services().server.config.server_name.clone().into());
|
||||
let signing_keys = services().globals.signing_keys_for(&server_name)?;
|
||||
let server_name = server_name.unwrap_or_else(|| self.services.server.config.server_name.clone().into());
|
||||
let signing_keys = self.services.globals.signing_keys_for(&server_name)?;
|
||||
|
||||
Ok(RoomMessageEventContent::notice_markdown(format!(
|
||||
"```rs\n{signing_keys:#?}\n```"
|
||||
)))
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
#[allow(dead_code)]
|
||||
pub(super) async fn get_verify_keys(
|
||||
_body: &[&str], server_name: Option<Box<ServerName>>, cached: bool,
|
||||
&self, server_name: Option<Box<ServerName>>, cached: bool,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
let server_name = server_name.unwrap_or_else(|| services().server.config.server_name.clone().into());
|
||||
let server_name = server_name.unwrap_or_else(|| self.services.server.config.server_name.clone().into());
|
||||
let mut out = String::new();
|
||||
|
||||
if cached {
|
||||
writeln!(out, "| Key ID | VerifyKey |")?;
|
||||
writeln!(out, "| --- | --- |")?;
|
||||
for (key_id, verify_key) in services().globals.verify_keys_for(&server_name)? {
|
||||
for (key_id, verify_key) in self.services.globals.verify_keys_for(&server_name)? {
|
||||
writeln!(out, "| {key_id} | {verify_key:?} |")?;
|
||||
}
|
||||
|
||||
|
@ -636,7 +680,8 @@ pub(super) async fn get_verify_keys(
|
|||
}
|
||||
|
||||
let signature_ids: Vec<String> = Vec::new();
|
||||
let keys = services()
|
||||
let keys = self
|
||||
.services
|
||||
.rooms
|
||||
.event_handler
|
||||
.fetch_signing_keys_for_server(&server_name, signature_ids)
|
||||
|
@ -651,16 +696,17 @@ pub(super) async fn get_verify_keys(
|
|||
Ok(RoomMessageEventContent::notice_markdown(out))
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn resolve_true_destination(
|
||||
_body: &[&str], server_name: Box<ServerName>, no_cache: bool,
|
||||
&self, server_name: Box<ServerName>, no_cache: bool,
|
||||
) -> Result<RoomMessageEventContent> {
|
||||
if !services().globals.config.allow_federation {
|
||||
if !self.services.globals.config.allow_federation {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Federation is disabled on this homeserver.",
|
||||
));
|
||||
}
|
||||
|
||||
if server_name == services().globals.config.server_name {
|
||||
if server_name == self.services.globals.config.server_name {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Not allowed to send federation requests to ourselves. Please use `get-pdu` for fetching local PDUs.",
|
||||
));
|
||||
|
@ -672,12 +718,13 @@ pub(super) async fn resolve_true_destination(
|
|||
&& matches!(data.span_name(), "actual" | "well-known" | "srv")
|
||||
};
|
||||
|
||||
let state = &services().server.log.capture;
|
||||
let state = &self.services.server.log.capture;
|
||||
let logs = Arc::new(Mutex::new(String::new()));
|
||||
let capture = Capture::new(state, Some(filter), capture::fmt_markdown(logs.clone()));
|
||||
|
||||
let capture_scope = capture.start();
|
||||
let actual = services()
|
||||
let actual = self
|
||||
.services
|
||||
.resolver
|
||||
.resolve_actual_dest(&server_name, !no_cache)
|
||||
.await?;
|
||||
|
@ -692,7 +739,8 @@ pub(super) async fn resolve_true_destination(
|
|||
Ok(RoomMessageEventContent::text_markdown(msg))
|
||||
}
|
||||
|
||||
pub(super) async fn memory_stats(_body: &[&str]) -> Result<RoomMessageEventContent> {
|
||||
#[admin_command]
|
||||
pub(super) async fn memory_stats(&self) -> Result<RoomMessageEventContent> {
|
||||
let html_body = conduit::alloc::memory_stats();
|
||||
|
||||
if html_body.is_none() {
|
||||
|
@ -708,8 +756,9 @@ pub(super) async fn memory_stats(_body: &[&str]) -> Result<RoomMessageEventConte
|
|||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
pub(super) async fn runtime_metrics(_body: &[&str]) -> Result<RoomMessageEventContent> {
|
||||
let out = services().server.metrics.runtime_metrics().map_or_else(
|
||||
#[admin_command]
|
||||
pub(super) async fn runtime_metrics(&self) -> Result<RoomMessageEventContent> {
|
||||
let out = self.services.server.metrics.runtime_metrics().map_or_else(
|
||||
|| "Runtime metrics are not available.".to_owned(),
|
||||
|metrics| format!("```rs\n{metrics:#?}\n```"),
|
||||
);
|
||||
|
@ -718,15 +767,17 @@ pub(super) async fn runtime_metrics(_body: &[&str]) -> Result<RoomMessageEventCo
|
|||
}
|
||||
|
||||
#[cfg(not(tokio_unstable))]
|
||||
pub(super) async fn runtime_metrics(_body: &[&str]) -> Result<RoomMessageEventContent> {
|
||||
#[admin_command]
|
||||
pub(super) async fn runtime_metrics(&self) -> Result<RoomMessageEventContent> {
|
||||
Ok(RoomMessageEventContent::text_markdown(
|
||||
"Runtime metrics require building with `tokio_unstable`.",
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
pub(super) async fn runtime_interval(_body: &[&str]) -> Result<RoomMessageEventContent> {
|
||||
let out = services().server.metrics.runtime_interval().map_or_else(
|
||||
#[admin_command]
|
||||
pub(super) async fn runtime_interval(&self) -> Result<RoomMessageEventContent> {
|
||||
let out = self.services.server.metrics.runtime_interval().map_or_else(
|
||||
|| "Runtime metrics are not available.".to_owned(),
|
||||
|metrics| format!("```rs\n{metrics:#?}\n```"),
|
||||
);
|
||||
|
@ -735,18 +786,21 @@ pub(super) async fn runtime_interval(_body: &[&str]) -> Result<RoomMessageEventC
|
|||
}
|
||||
|
||||
#[cfg(not(tokio_unstable))]
|
||||
pub(super) async fn runtime_interval(_body: &[&str]) -> Result<RoomMessageEventContent> {
|
||||
#[admin_command]
|
||||
pub(super) async fn runtime_interval(&self) -> Result<RoomMessageEventContent> {
|
||||
Ok(RoomMessageEventContent::text_markdown(
|
||||
"Runtime metrics require building with `tokio_unstable`.",
|
||||
))
|
||||
}
|
||||
|
||||
pub(super) async fn time(_body: &[&str]) -> Result<RoomMessageEventContent> {
|
||||
#[admin_command]
|
||||
pub(super) async fn time(&self) -> Result<RoomMessageEventContent> {
|
||||
let now = SystemTime::now();
|
||||
Ok(RoomMessageEventContent::text_markdown(utils::time::format(now, "%+")))
|
||||
}
|
||||
|
||||
pub(super) async fn list_dependencies(_body: &[&str], names: bool) -> Result<RoomMessageEventContent> {
|
||||
#[admin_command]
|
||||
pub(super) async fn list_dependencies(&self, names: bool) -> Result<RoomMessageEventContent> {
|
||||
if names {
|
||||
let out = info::cargo::dependencies_names().join(" ");
|
||||
return Ok(RoomMessageEventContent::notice_markdown(out));
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue