diff --git a/src/api/client_server/room.rs b/src/api/client_server/room.rs index 896eba62..c0cea412 100644 --- a/src/api/client_server/room.rs +++ b/src/api/client_server/room.rs @@ -23,7 +23,7 @@ use ruma::{ }, int, serde::JsonObject, - CanonicalJsonObject, OwnedRoomAliasId, RoomAliasId, RoomId, + CanonicalJsonObject, OwnedRoomAliasId, RoomAliasId, RoomId, RoomVersionId, }; use serde_json::{json, value::to_raw_value}; use std::{cmp::max, collections::BTreeMap, sync::Arc}; @@ -127,12 +127,28 @@ pub async fn create_room_route( let mut content = content .deserialize_as::() .expect("Invalid creation content"); - content.insert( - "creator".into(), - json!(&sender_user).try_into().map_err(|_| { - Error::BadRequest(ErrorKind::BadJson, "Invalid creation content") - })?, - ); + match room_version { + RoomVersionId::V1 + | RoomVersionId::V2 + | RoomVersionId::V3 + | RoomVersionId::V4 + | RoomVersionId::V5 + | RoomVersionId::V6 + | RoomVersionId::V7 + | RoomVersionId::V8 + | RoomVersionId::V9 + | RoomVersionId::V10 => { + content.insert( + "creator".into(), + json!(&sender_user).try_into().map_err(|e| { + info!("Invalid creation content: {e}"); + Error::BadRequest(ErrorKind::BadJson, "Invalid creation content") + })?, + ); + } + _ => {} // V11 removed the "creator" key + } + content.insert( "room_version".into(), json!(room_version.as_str()).try_into().map_err(|_| { @@ -143,8 +159,21 @@ pub async fn create_room_route( } None => { // TODO: Add correct value for v11 + let content = match room_version { + RoomVersionId::V1 + | RoomVersionId::V2 + | RoomVersionId::V3 + | RoomVersionId::V4 + | RoomVersionId::V5 + | RoomVersionId::V6 + | RoomVersionId::V7 + | RoomVersionId::V8 + | RoomVersionId::V9 + | RoomVersionId::V10 => RoomCreateEventContent::new_v1(sender_user.clone()), + _ => RoomCreateEventContent::new_v11(), + }; let mut content = serde_json::from_str::( - to_raw_value(&RoomCreateEventContent::new_v1(sender_user.clone())) + to_raw_value(&content) .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid creation content"))? .get(), ) @@ -619,12 +648,31 @@ pub async fn upgrade_room_route( )); // Send a m.room.create event containing a predecessor field and the applicable room_version - create_event_content.insert( - "creator".into(), - json!(&sender_user) - .try_into() - .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Error forming creation event"))?, - ); + match body.new_version { + RoomVersionId::V1 + | RoomVersionId::V2 + | RoomVersionId::V3 + | RoomVersionId::V4 + | RoomVersionId::V5 + | RoomVersionId::V6 + | RoomVersionId::V7 + | RoomVersionId::V8 + | RoomVersionId::V9 + | RoomVersionId::V10 => { + create_event_content.insert( + "creator".into(), + json!(&sender_user).try_into().map_err(|e| { + info!("Error forming creation event: {e}"); + Error::BadRequest(ErrorKind::BadJson, "Error forming creation event") + })?, + ); + } + _ => { + // "creator" key no longer exists in V11 rooms + create_event_content.remove("creator"); + } + } + create_event_content.insert( "room_version".into(), json!(&body.new_version) diff --git a/src/config/mod.rs b/src/config/mod.rs index 758ad180..c6c2646e 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -90,6 +90,16 @@ pub struct Config { #[serde(default = "default_turn_ttl")] pub turn_ttl: u64, + pub rocksdb_log_path: Option, + #[serde(default = "default_rocksdb_log_level")] + pub rocksdb_log_level: String, + #[serde(default = "default_rocksdb_max_log_file_size")] + pub rocksdb_max_log_file_size: usize, + #[serde(default = "default_rocksdb_log_time_to_roll")] + pub rocksdb_log_time_to_roll: usize, + #[serde(default = "false_fn")] + pub rocksdb_optimize_for_spinning_disks: bool, + pub emergency_password: Option, #[serde(default = "default_notification_push_path")] @@ -257,6 +267,22 @@ impl fmt::Display for Config { "zstd Response Body Compression", &self.zstd_compression.to_string(), ), + ( + "RocksDB database log level", + &self.rocksdb_log_level.to_string(), + ), + ( + "RocksDB database log time-to-roll", + &self.rocksdb_log_time_to_roll.to_string(), + ), + ( + "RocksDB database max log file size", + &self.rocksdb_max_log_file_size.to_string(), + ), + ( + "RocksDB database optimize for spinning disks", + &self.rocksdb_optimize_for_spinning_disks.to_string(), + ), ]; let mut msg: String = "Active config values:\n\n".to_owned(); @@ -290,7 +316,7 @@ fn default_unix_socket_perms() -> u32 { } fn default_database_backend() -> String { - "sqlite".to_owned() + "rocksdb".to_owned() } fn default_db_cache_capacity_mb() -> f64 { @@ -330,7 +356,7 @@ fn default_trusted_servers() -> Vec { } fn default_log() -> String { - "warn,state_res=warn,_=off,sled=off".to_owned() + "warn,state_res=warn".to_owned() } fn default_notification_push_path() -> String { @@ -349,7 +375,20 @@ fn default_presence_offline_timeout_s() -> u64 { 15 * 60 } +fn default_rocksdb_log_level() -> String { + "info".to_owned() +} + +fn default_rocksdb_log_time_to_roll() -> usize { + 0 +} + // I know, it's a great name pub fn default_default_room_version() -> RoomVersionId { RoomVersionId::V10 } + +pub fn default_rocksdb_max_log_file_size() -> usize { + // 4 megabytes + 4 * 1024 * 1024 +} diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index a215f3ca..1a53c6c3 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -1,11 +1,13 @@ use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree}; -use crate::{utils, Result}; +use crate::{services, utils, Result}; use std::{ future::Future, pin::Pin, sync::{Arc, RwLock}, }; +use rocksdb::LogLevel::{Debug, Error, Fatal, Info, Warn}; + pub struct Engine { rocks: rocksdb::DBWithThreadMode, max_open_files: i32, @@ -21,7 +23,9 @@ pub struct RocksDbEngineTree<'a> { } fn db_options(max_open_files: i32, rocksdb_cache: &rocksdb::Cache) -> rocksdb::Options { + // block-based options: https://docs.rs/rocksdb/latest/rocksdb/struct.BlockBasedOptions.html# let mut block_based_options = rocksdb::BlockBasedOptions::default(); + block_based_options.set_block_cache(rocksdb_cache); // "Difference of spinning disk" @@ -29,18 +33,40 @@ fn db_options(max_open_files: i32, rocksdb_cache: &rocksdb::Cache) -> rocksdb::O block_based_options.set_block_size(64 * 1024); block_based_options.set_cache_index_and_filter_blocks(true); + // database options: https://docs.rs/rocksdb/latest/rocksdb/struct.Options.html# let mut db_opts = rocksdb::Options::default(); + + let rocksdb_log_level = match services().globals.rocksdb_log_level().as_str() { + "debug" => Debug, + "info" => Info, + "warn" => Warn, + "error" => Error, + "fatal" => Fatal, + _ => Info, + }; + + db_opts.set_log_level(rocksdb_log_level); + db_opts.set_max_log_file_size(services().globals.rocksdb_max_log_file_size()); + db_opts.set_log_file_time_to_roll(services().globals.rocksdb_log_time_to_roll()); + + if services().globals.rocksdb_optimize_for_spinning_disks() { + // useful for hard drives but on literally any half-decent SSD this is not useful + // and the benefits of improved compaction based on up to date stats are good. + // current conduwut users have NVMe/SSDs. + db_opts.set_skip_stats_update_on_db_open(true); + + db_opts.set_compaction_readahead_size(2 * 1024 * 1024); // default compaction_readahead_size is 0 which is good for SSDs + db_opts.set_target_file_size_base(256 * 1024 * 1024); // default target_file_size is 64MB which is good for SSDs + db_opts.set_optimize_filters_for_hits(true); // doesn't really seem useful for fast storage + } else { + db_opts.set_skip_stats_update_on_db_open(false); + db_opts.set_max_bytes_for_level_base(512 * 1024 * 1024); + db_opts.set_use_direct_reads(true); + db_opts.set_use_direct_io_for_flush_and_compaction(true); + } + db_opts.set_block_based_table_factory(&block_based_options); - db_opts.set_optimize_filters_for_hits(true); - db_opts.set_skip_stats_update_on_db_open(true); db_opts.set_level_compaction_dynamic_level_bytes(true); - db_opts.set_target_file_size_base(256 * 1024 * 1024); - // defaults to 1MB - //db_opts.set_writable_file_max_buffer_size(1024 * 1024); - // defaults to 2MB now - //db_opts.set_compaction_readahead_size(2 * 1024 * 1024); - db_opts.set_use_direct_reads(true); - db_opts.set_use_direct_io_for_flush_and_compaction(true); db_opts.create_if_missing(true); db_opts.increase_parallelism(num_cpus::get() as i32); db_opts.set_max_open_files(max_open_files); diff --git a/src/database/mod.rs b/src/database/mod.rs index 36f738d8..70dd95ad 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -240,7 +240,9 @@ impl KeyValueDatabase { if !Path::new(&config.database_path).exists() { std::fs::create_dir_all(&config.database_path) - .map_err(|_| Error::BadConfig("Database folder doesn't exists and couldn't be created (e.g. due to missing permissions). Please create the database folder yourself."))?; + .map_err(|e| { + error!("Failed to create database path: {e}"); + Error::BadConfig("Database folder doesn't exists and couldn't be created (e.g. due to missing permissions). Please create the database folder yourself.")})?; } let builder: Arc = match &*config.database_backend { diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 15f2220a..c4960e27 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -292,6 +292,8 @@ enum DebugCommand { /// An event ID (a $ followed by the base64 reference hash) event_id: Box, }, + + ForceDeviceListUpdates, } #[cfg_attr(test, derive(Debug))] @@ -1263,6 +1265,15 @@ impl Service { None => RoomMessageEventContent::text_plain("PDU not found."), } } + DebugCommand::ForceDeviceListUpdates => { + // Force E2EE device list updates for all users + for user_id in services().users.iter().filter_map(|r| r.ok()) { + services().users.mark_device_key_update(&user_id)?; + } + RoomMessageEventContent::text_plain( + "Marked all devices for all users as having new keys to update", + ) + } }, }; @@ -1394,10 +1405,24 @@ impl Service { services().users.create(&conduit_user, None)?; - let mut content = RoomCreateEventContent::new_v1(conduit_user.clone()); + let room_version = services().globals.default_room_version(); + let mut content = match room_version { + RoomVersionId::V1 + | RoomVersionId::V2 + | RoomVersionId::V3 + | RoomVersionId::V4 + | RoomVersionId::V5 + | RoomVersionId::V6 + | RoomVersionId::V7 + | RoomVersionId::V8 + | RoomVersionId::V9 + | RoomVersionId::V10 => RoomCreateEventContent::new_v1(conduit_user.clone()), + _ => RoomCreateEventContent::new_v11(), + }; + content.federate = true; content.predecessor = None; - content.room_version = services().globals.default_room_version(); + content.room_version = room_version; // 1. The room create event services() diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index d41e7e6a..21072d40 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -395,6 +395,22 @@ impl Service { self.config.presence_offline_timeout_s } + pub fn rocksdb_log_level(&self) -> &String { + &self.config.rocksdb_log_level + } + + pub fn rocksdb_max_log_file_size(&self) -> usize { + self.config.rocksdb_max_log_file_size + } + + pub fn rocksdb_log_time_to_roll(&self) -> usize { + self.config.rocksdb_log_time_to_roll + } + + pub fn rocksdb_optimize_for_spinning_disks(&self) -> bool { + self.config.rocksdb_optimize_for_spinning_disks + } + pub fn supported_room_versions(&self) -> Vec { let mut room_versions: Vec = vec![]; room_versions.extend(self.stable_room_versions.clone()); diff --git a/src/service/pdu.rs b/src/service/pdu.rs index c8d78604..0a9ea861 100644 --- a/src/service/pdu.rs +++ b/src/service/pdu.rs @@ -1,5 +1,6 @@ use crate::Error; use ruma::{ + canonical_json::redact_content_in_place, events::{ room::member::RoomMemberEventContent, space::child::HierarchySpaceChildEvent, AnyEphemeralRoomEvent, AnyMessageLikeEvent, AnyStateEvent, AnyStrippedStateEvent, @@ -49,44 +50,23 @@ pub struct PduEvent { impl PduEvent { #[tracing::instrument(skip(self))] - pub fn redact(&mut self, reason: &PduEvent) -> crate::Result<()> { + pub fn redact( + &mut self, + room_version_id: RoomVersionId, + reason: &PduEvent, + ) -> crate::Result<()> { self.unsigned = None; - let allowed: &[&str] = match self.kind { - TimelineEventType::RoomMember => &["join_authorised_via_users_server", "membership"], - TimelineEventType::RoomCreate => &["creator"], - TimelineEventType::RoomJoinRules => &["join_rule"], - TimelineEventType::RoomPowerLevels => &[ - "ban", - "events", - "events_default", - "kick", - "redact", - "state_default", - "users", - "users_default", - ], - TimelineEventType::RoomHistoryVisibility => &["history_visibility"], - _ => &[], - }; - - let mut old_content: BTreeMap = - serde_json::from_str(self.content.get()) - .map_err(|_| Error::bad_database("PDU in db has invalid content."))?; - - let mut new_content = serde_json::Map::new(); - - for key in allowed { - if let Some(value) = old_content.remove(*key) { - new_content.insert((*key).to_owned(), value); - } - } + let mut content = serde_json::from_str(self.content.get()) + .map_err(|_| Error::bad_database("PDU in db has invalid content."))?; + redact_content_in_place(&mut content, &room_version_id, self.kind.to_string()) + .map_err(|e| Error::RedactionError(self.sender.server_name().to_owned(), e))?; self.unsigned = Some(to_raw_value(&json!({ "redacted_because": serde_json::to_value(reason).expect("to_value(PduEvent) always works") })).expect("to string always works")); - self.content = to_raw_value(&new_content).expect("to string always works"); + self.content = to_raw_value(&content).expect("to string always works"); Ok(()) } diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index dcadece6..729a7a2b 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -799,6 +799,7 @@ impl Service { // applied. We start with the previous extremities (aka leaves) debug!("Calculating extremities"); let mut extremities = services().rooms.state.get_forward_extremities(room_id)?; + debug!("Amount of forward extremities in room {room_id}: {extremities:?}"); // Remove any forward extremities that are referenced by this incoming event's prev_events for prev_event in &incoming_pdu.prev_events { @@ -1142,8 +1143,8 @@ impl Service { events_in_reverse_order.push((next_id.clone(), value)); events_all.insert(next_id); } - Err(_) => { - warn!("Failed to fetch event: {}", next_id); + Err(e) => { + warn!("Failed to fetch event {} | {e}", next_id); back_off((*next_id).to_owned()); } } @@ -1268,7 +1269,10 @@ impl Service { if amount > services().globals.max_fetch_prev_events() { // Max limit reached - info!("Max prev event limit reached!"); + info!( + "Max prev event limit reached! Limit: {}", + services().globals.max_fetch_prev_events() + ); graph.insert(prev_event_id.clone(), HashSet::new()); continue; } @@ -1322,7 +1326,10 @@ impl Service { ), )) }) - .map_err(|_| Error::bad_database("Error sorting prev events"))?; + .map_err(|e| { + error!("Error sorting prev events: {e}"); + Error::bad_database("Error sorting prev events") + })?; Ok((sorted, eventid_info)) } @@ -1339,6 +1346,7 @@ impl Service { let mut server_key_ids = HashMap::new(); for event in events.into_iter() { + debug!("Fetching keys for event: {event:?}"); for (signature_server, signature) in event .get("signatures") .ok_or(Error::BadServerResponse( @@ -1364,6 +1372,7 @@ impl Service { if server_key_ids.is_empty() { // Nothing to do, can exit early + debug!("server_key_ids is empty, not fetching any keys"); return Ok(()); } @@ -1382,7 +1391,8 @@ impl Service { let signature_server2 = signature_server.clone(); let fetch_res = self .fetch_signing_keys_for_server( - signature_server2.as_str().try_into().map_err(|_| { + signature_server2.as_str().try_into().map_err(|e| { + info!("Invalid servername in signatures of server response pdu: {e}"); ( signature_server.clone(), Error::BadServerResponse( @@ -1397,7 +1407,7 @@ impl Service { match fetch_res { Ok(keys) => Ok((signature_server, keys)), Err(e) => { - warn!("Signature verification failed: Could not fetch signing key.",); + warn!("Signature verification failed: Could not fetch signing key for {signature_server}: {e}",); Err((signature_server, e)) } } @@ -1483,7 +1493,8 @@ impl Service { signature_ids.iter().all(|id| keys.contains_key(id)) }; - let origin = <&ServerName>::try_from(signature_server.as_str()).map_err(|_| { + let origin = <&ServerName>::try_from(signature_server.as_str()).map_err(|e| { + info!("Invalid servername in signatures of server response pdu: {e}"); Error::BadServerResponse("Invalid servername in signatures of server response pdu.") })?; @@ -1491,7 +1502,7 @@ impl Service { continue; } - trace!("Loading signing keys for {}", origin); + debug!("Loading signing keys for {}", origin); let result: BTreeMap<_, _> = services() .globals @@ -1501,7 +1512,7 @@ impl Service { .collect(); if !contains_all_ids(&result) { - trace!("Signing key not loaded for {}", origin); + debug!("Signing key not loaded for {}", origin); servers.insert(origin.to_owned(), BTreeMap::new()); } @@ -1540,7 +1551,7 @@ impl Service { } if servers.is_empty() { - info!("We had all keys locally"); + info!("server is empty, we had all keys locally, not fetching any keys"); return Ok(()); } @@ -1556,7 +1567,7 @@ impl Service { ) .await { - trace!("Got signing keys: {:?}", keys); + debug!("Got signing keys: {:?}", keys); let mut pkm = pub_key_map .write() .map_err(|_| Error::bad_database("RwLock is poisoned."))?; @@ -1588,7 +1599,7 @@ impl Service { } if servers.is_empty() { - info!("Trusted server supplied all signing keys"); + info!("Trusted server supplied all signing keys, no more keys to fetch"); return Ok(()); } } @@ -1639,25 +1650,36 @@ impl Service { &StateEventType::RoomServerAcl, "", )? { - Some(acl) => acl, - None => return Ok(()), + Some(acl) => { + debug!("ACL event found: {acl:?}"); + acl + } + None => { + info!("No ACL event found"); + return Ok(()); + } }; let acl_event_content: RoomServerAclEventContent = match serde_json::from_str(acl_event.content.get()) { - Ok(content) => content, - Err(_) => { - warn!("Invalid ACL event"); + Ok(content) => { + debug!("Found ACL event contents: {content:?}"); + content + } + Err(e) => { + warn!("Invalid ACL event: {e}"); return Ok(()); } }; if acl_event_content.allow.is_empty() { + warn!("Ignoring broken ACL event (allow key is empty)"); // Ignore broken acl events return Ok(()); } if acl_event_content.is_allowed(server_name) { + debug!("server {server_name} is allowed by ACL"); Ok(()) } else { info!( @@ -1737,7 +1759,7 @@ impl Service { } } - trace!("Loading signing keys for {}", origin); + debug!("Loading signing keys for {}", origin); let mut result: BTreeMap<_, _> = services() .globals @@ -1792,7 +1814,7 @@ impl Service { MilliSecondsSinceUnixEpoch::from_system_time( SystemTime::now() .checked_add(Duration::from_secs(3600)) - .expect("SystemTime to large"), + .expect("SystemTime too large"), ) .expect("time is valid"), ), @@ -1806,7 +1828,7 @@ impl Service { .collect::>() }) { - trace!("Got signing keys: {:?}", server_keys); + debug!("Got signing keys: {:?}", server_keys); for k in server_keys { services().globals.add_signing_key(origin, k.clone())?; result.extend( diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 5133015a..7ae62a4c 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -28,7 +28,7 @@ use ruma::{ state_res, state_res::{Event, RoomVersion}, uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, - OwnedServerName, RoomAliasId, RoomId, ServerName, UserId, + OwnedServerName, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; use serde::Deserialize; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; @@ -139,6 +139,27 @@ impl Service { } */ + /// Returns the version of a room, if known + pub fn get_room_version(&self, room_id: &RoomId) -> Result> { + let create_event = services().rooms.state_accessor.room_state_get( + room_id, + &StateEventType::RoomCreate, + "", + )?; + + let create_event_content: Option = create_event + .as_ref() + .map(|create_event| { + serde_json::from_str(create_event.content.get()).map_err(|e| { + warn!("Invalid create event: {}", e); + Error::bad_database("Invalid create event in db.") + }) + }) + .transpose()?; + + Ok(create_event_content.map(|content| content.room_version)) + } + /// Returns the json of a pdu. pub fn get_pdu_json(&self, event_id: &EventId) -> Result> { self.db.get_pdu_json(event_id) @@ -340,8 +361,10 @@ impl Service { GlobalAccountDataEventType::PushRules.to_string().into(), )? .map(|event| { - serde_json::from_str::(event.get()) - .map_err(|_| Error::bad_database("Invalid push rules event in db.")) + serde_json::from_str::(event.get()).map_err(|e| { + warn!("Invalid push rules event in db for user ID {user}: {e}"); + Error::bad_database("Invalid push rules event in db.") + }) }) .transpose()? .map(|ev: PushRulesEvent| ev.content.global) @@ -384,9 +407,39 @@ impl Service { match pdu.kind { TimelineEventType::RoomRedaction => { - if let Some(redact_id) = &pdu.redacts { - self.redact_pdu(redact_id, pdu)?; - } + let room_version_id = self + .get_room_version(&pdu.room_id)? + .expect("Got RoomRedaction in a room of unknown version"); + match room_version_id { + RoomVersionId::V1 + | RoomVersionId::V2 + | RoomVersionId::V3 + | RoomVersionId::V4 + | RoomVersionId::V5 + | RoomVersionId::V6 + | RoomVersionId::V7 + | RoomVersionId::V8 + | RoomVersionId::V9 + | RoomVersionId::V10 => { + if let Some(redact_id) = &pdu.redacts { + self.redact_pdu(redact_id, pdu)?; + } + } + _ => { + #[derive(Deserialize)] + struct Redaction { + redacts: Option, + } + let content = serde_json::from_str::(pdu.content.get()) + .map_err(|e| { + warn!("Invalid content in redaction pdu: {e}"); + Error::bad_database("Invalid content in redaction pdu.") + })?; + if let Some(redact_id) = &content.redacts { + self.redact_pdu(redact_id, pdu)?; + } + } + }; } TimelineEventType::SpaceChild => { if let Some(_state_key) = &pdu.state_key { @@ -650,28 +703,28 @@ impl Service { .take(20) .collect(); - let create_event = services().rooms.state_accessor.room_state_get( - room_id, - &StateEventType::RoomCreate, - "", - )?; - - let create_event_content: Option = create_event - .as_ref() - .map(|create_event| { - serde_json::from_str(create_event.content.get()).map_err(|e| { - warn!("Invalid database create event: {}", e); - Error::bad_database("Invalid create event in db.") - }) + let room_version_id = self + .get_room_version(room_id)? + .or_else(|| { + if event_type == TimelineEventType::RoomCreate { + #[derive(Deserialize)] + struct RoomCreate { + room_version: RoomVersionId, + } + let content = serde_json::from_str::(content.get()) + .expect("Invalid content in RoomCreate pdu."); + Some(content.room_version) + } else { + None + } }) - .transpose()?; + .ok_or_else(|| { + Error::InconsistentRoomState( + "non-create event for room of unknown version", + room_id.to_owned(), + ) + })?; - // If there was no create event yet, assume we are creating a room with the default - // version right now - let room_version_id = create_event_content - .map_or(services().globals.default_room_version(), |create_event| { - create_event.room_version - }); let room_version = RoomVersion::new(&room_version_id).expect("room version is supported"); let auth_events = services().rooms.state.get_auth_events( @@ -1041,7 +1094,11 @@ impl Service { let mut pdu = self .get_pdu_from_id(&pdu_id)? .ok_or_else(|| Error::bad_database("PDU ID points to invalid PDU."))?; - pdu.redact(reason)?; + + let room_version_id = self.get_room_version(&pdu.room_id)?.ok_or_else(|| { + Error::bad_database("Trying to redact PDU in in room of unknown version") + })?; + pdu.redact(room_version_id, reason)?; self.replace_pdu( &pdu_id, &utils::to_canonical_object(&pdu).expect("PDU is an object"), diff --git a/src/utils/error.rs b/src/utils/error.rs index 765a31bb..d821fe66 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -80,6 +80,10 @@ pub enum Error { #[cfg(feature = "conduit_bin")] #[error("{0}")] PathError(#[from] axum::extract::rejection::PathRejection), + #[error("from {0}: {1}")] + RedactionError(OwnedServerName, ruma::canonical_json::RedactionError), + #[error("{0} in {1}")] + InconsistentRoomState(&'static str, ruma::OwnedRoomId), } impl Error {