Merge branch 'stateres' into 'next'
Make state resolution more resistant and some sync performance improvements See merge request famedly/conduit!490
This commit is contained in:
commit
3a1a72df98
12 changed files with 256 additions and 293 deletions
22
Cargo.lock
generated
22
Cargo.lock
generated
|
@ -2110,7 +2110,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma"
|
name = "ruma"
|
||||||
version = "0.8.2"
|
version = "0.8.2"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568"
|
source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"assign",
|
"assign",
|
||||||
"js_int",
|
"js_int",
|
||||||
|
@ -2128,7 +2128,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-appservice-api"
|
name = "ruma-appservice-api"
|
||||||
version = "0.8.1"
|
version = "0.8.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568"
|
source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"js_int",
|
"js_int",
|
||||||
"ruma-common",
|
"ruma-common",
|
||||||
|
@ -2139,7 +2139,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-client-api"
|
name = "ruma-client-api"
|
||||||
version = "0.16.2"
|
version = "0.16.2"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568"
|
source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"assign",
|
"assign",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
@ -2156,7 +2156,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-common"
|
name = "ruma-common"
|
||||||
version = "0.11.3"
|
version = "0.11.3"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568"
|
source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64 0.21.2",
|
"base64 0.21.2",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
@ -2184,7 +2184,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-federation-api"
|
name = "ruma-federation-api"
|
||||||
version = "0.7.1"
|
version = "0.7.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568"
|
source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"js_int",
|
"js_int",
|
||||||
"ruma-common",
|
"ruma-common",
|
||||||
|
@ -2195,7 +2195,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-identifiers-validation"
|
name = "ruma-identifiers-validation"
|
||||||
version = "0.9.1"
|
version = "0.9.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568"
|
source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"js_int",
|
"js_int",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
@ -2204,7 +2204,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-identity-service-api"
|
name = "ruma-identity-service-api"
|
||||||
version = "0.7.1"
|
version = "0.7.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568"
|
source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"js_int",
|
"js_int",
|
||||||
"ruma-common",
|
"ruma-common",
|
||||||
|
@ -2214,7 +2214,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-macros"
|
name = "ruma-macros"
|
||||||
version = "0.11.3"
|
version = "0.11.3"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568"
|
source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"proc-macro-crate",
|
"proc-macro-crate",
|
||||||
|
@ -2229,7 +2229,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-push-gateway-api"
|
name = "ruma-push-gateway-api"
|
||||||
version = "0.7.1"
|
version = "0.7.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568"
|
source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"js_int",
|
"js_int",
|
||||||
"ruma-common",
|
"ruma-common",
|
||||||
|
@ -2240,7 +2240,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-signatures"
|
name = "ruma-signatures"
|
||||||
version = "0.13.1"
|
version = "0.13.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568"
|
source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64 0.21.2",
|
"base64 0.21.2",
|
||||||
"ed25519-dalek",
|
"ed25519-dalek",
|
||||||
|
@ -2256,7 +2256,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-state-res"
|
name = "ruma-state-res"
|
||||||
version = "0.9.1"
|
version = "0.9.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568"
|
source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"itertools",
|
"itertools",
|
||||||
"js_int",
|
"js_int",
|
||||||
|
|
|
@ -26,7 +26,7 @@ tower-http = { version = "0.4.1", features = ["add-extension", "cors", "sensitiv
|
||||||
|
|
||||||
# Used for matrix spec type definitions and helpers
|
# Used for matrix spec type definitions and helpers
|
||||||
#ruma = { version = "0.4.0", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
|
#ruma = { version = "0.4.0", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
|
||||||
ruma = { git = "https://github.com/ruma/ruma", rev = "de9a5a6ecca197e59623c210bd21f53055f83568", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-msc2448", "unstable-exhaustive-types", "ring-compat", "unstable-unspecified" ] }
|
ruma = { git = "https://github.com/ruma/ruma", rev = "38294bd5206498c02b1001227d65654eb548308b", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-msc2448", "unstable-exhaustive-types", "ring-compat", "unstable-unspecified" ] }
|
||||||
#ruma = { git = "https://github.com/timokoesters/ruma", rev = "50c1db7e0a3a21fc794b0cce3b64285a4c750c71", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
|
#ruma = { git = "https://github.com/timokoesters/ruma", rev = "50c1db7e0a3a21fc794b0cce3b64285a4c750c71", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
|
||||||
#ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
|
#ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
|
||||||
|
|
||||||
|
|
|
@ -743,6 +743,7 @@ async fn join_room_by_id_helper(
|
||||||
info!("Saving state from send_join");
|
info!("Saving state from send_join");
|
||||||
let (statehash_before_join, new, removed) = services().rooms.state_compressor.save_state(
|
let (statehash_before_join, new, removed) = services().rooms.state_compressor.save_state(
|
||||||
room_id,
|
room_id,
|
||||||
|
Arc::new(
|
||||||
state
|
state
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(k, id)| {
|
.map(|(k, id)| {
|
||||||
|
@ -752,6 +753,7 @@ async fn join_room_by_id_helper(
|
||||||
.compress_state_event(k, &id)
|
.compress_state_event(k, &id)
|
||||||
})
|
})
|
||||||
.collect::<Result<_>>()?,
|
.collect::<Result<_>>()?,
|
||||||
|
),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
services()
|
services()
|
||||||
|
|
|
@ -16,7 +16,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
||||||
.1;
|
.1;
|
||||||
let mut result = HashMap::new();
|
let mut result = HashMap::new();
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
for compressed in full_state.into_iter() {
|
for compressed in full_state.iter() {
|
||||||
let parsed = services()
|
let parsed = services()
|
||||||
.rooms
|
.rooms
|
||||||
.state_compressor
|
.state_compressor
|
||||||
|
@ -45,7 +45,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
||||||
|
|
||||||
let mut result = HashMap::new();
|
let mut result = HashMap::new();
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
for compressed in full_state {
|
for compressed in full_state.iter() {
|
||||||
let (_, eventid) = services()
|
let (_, eventid) = services()
|
||||||
.rooms
|
.rooms
|
||||||
.state_compressor
|
.state_compressor
|
||||||
|
@ -95,7 +95,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
||||||
.expect("there is always one layer")
|
.expect("there is always one layer")
|
||||||
.1;
|
.1;
|
||||||
Ok(full_state
|
Ok(full_state
|
||||||
.into_iter()
|
.iter()
|
||||||
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
|
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
|
||||||
.and_then(|compressed| {
|
.and_then(|compressed| {
|
||||||
services()
|
services()
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::{collections::HashSet, mem::size_of};
|
use std::{collections::HashSet, mem::size_of, sync::Arc};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
database::KeyValueDatabase,
|
database::KeyValueDatabase,
|
||||||
|
@ -37,20 +37,20 @@ impl service::rooms::state_compressor::Data for KeyValueDatabase {
|
||||||
|
|
||||||
Ok(StateDiff {
|
Ok(StateDiff {
|
||||||
parent,
|
parent,
|
||||||
added,
|
added: Arc::new(added),
|
||||||
removed,
|
removed: Arc::new(removed),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn save_statediff(&self, shortstatehash: u64, diff: StateDiff) -> Result<()> {
|
fn save_statediff(&self, shortstatehash: u64, diff: StateDiff) -> Result<()> {
|
||||||
let mut value = diff.parent.unwrap_or(0).to_be_bytes().to_vec();
|
let mut value = diff.parent.unwrap_or(0).to_be_bytes().to_vec();
|
||||||
for new in &diff.added {
|
for new in diff.added.iter() {
|
||||||
value.extend_from_slice(&new[..]);
|
value.extend_from_slice(&new[..]);
|
||||||
}
|
}
|
||||||
|
|
||||||
if !diff.removed.is_empty() {
|
if !diff.removed.is_empty() {
|
||||||
value.extend_from_slice(&0_u64.to_be_bytes());
|
value.extend_from_slice(&0_u64.to_be_bytes());
|
||||||
for removed in &diff.removed {
|
for removed in diff.removed.iter() {
|
||||||
value.extend_from_slice(&removed[..]);
|
value.extend_from_slice(&removed[..]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -587,8 +587,8 @@ impl KeyValueDatabase {
|
||||||
|
|
||||||
services().rooms.state_compressor.save_state_from_diff(
|
services().rooms.state_compressor.save_state_from_diff(
|
||||||
current_sstatehash,
|
current_sstatehash,
|
||||||
statediffnew,
|
Arc::new(statediffnew),
|
||||||
statediffremoved,
|
Arc::new(statediffremoved),
|
||||||
2, // every state change is 2 event changes on average
|
2, // every state change is 2 event changes on average
|
||||||
states_parents,
|
states_parents,
|
||||||
)?;
|
)?;
|
||||||
|
|
|
@ -90,7 +90,7 @@ impl Services {
|
||||||
state_compressor: rooms::state_compressor::Service {
|
state_compressor: rooms::state_compressor::Service {
|
||||||
db,
|
db,
|
||||||
stateinfo_cache: Mutex::new(LruCache::new(
|
stateinfo_cache: Mutex::new(LruCache::new(
|
||||||
(100.0 * config.conduit_cache_capacity_modifier) as usize,
|
(1000.0 * config.conduit_cache_capacity_modifier) as usize,
|
||||||
)),
|
)),
|
||||||
},
|
},
|
||||||
timeline: rooms::timeline::Service {
|
timeline: rooms::timeline::Service {
|
||||||
|
|
|
@ -38,6 +38,8 @@ use tracing::{debug, error, info, trace, warn};
|
||||||
|
|
||||||
use crate::{service::*, services, Error, PduEvent, Result};
|
use crate::{service::*, services, Error, PduEvent, Result};
|
||||||
|
|
||||||
|
use super::state_compressor::CompressedStateEvent;
|
||||||
|
|
||||||
pub struct Service;
|
pub struct Service;
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
|
@ -62,9 +64,8 @@ impl Service {
|
||||||
/// 12. Ensure that the state is derived from the previous current state (i.e. we calculated by
|
/// 12. Ensure that the state is derived from the previous current state (i.e. we calculated by
|
||||||
/// doing state res where one of the inputs was a previously trusted set of state, don't just
|
/// doing state res where one of the inputs was a previously trusted set of state, don't just
|
||||||
/// trust a set of state we got from a remote)
|
/// trust a set of state we got from a remote)
|
||||||
/// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail"
|
/// 13. Use state resolution to find new room state
|
||||||
/// it
|
/// 14. Check if the event passes auth based on the "current state" of the room, if not soft fail it
|
||||||
/// 14. Use state resolution to find new room state
|
|
||||||
// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
|
// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
|
||||||
#[tracing::instrument(skip(self, value, is_timeline_event, pub_key_map))]
|
#[tracing::instrument(skip(self, value, is_timeline_event, pub_key_map))]
|
||||||
pub(crate) async fn handle_incoming_pdu<'a>(
|
pub(crate) async fn handle_incoming_pdu<'a>(
|
||||||
|
@ -304,7 +305,7 @@ impl Service {
|
||||||
) {
|
) {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Drop
|
// Drop
|
||||||
warn!("Dropping bad event {}: {}", event_id, e);
|
warn!("Dropping bad event {}: {}", event_id, e,);
|
||||||
return Err(Error::BadRequest(
|
return Err(Error::BadRequest(
|
||||||
ErrorKind::InvalidParam,
|
ErrorKind::InvalidParam,
|
||||||
"Signature verification failed",
|
"Signature verification failed",
|
||||||
|
@ -735,8 +736,26 @@ impl Service {
|
||||||
}
|
}
|
||||||
info!("Auth check succeeded");
|
info!("Auth check succeeded");
|
||||||
|
|
||||||
// We start looking at current room state now, so lets lock the room
|
// Soft fail check before doing state res
|
||||||
|
let auth_events = services().rooms.state.get_auth_events(
|
||||||
|
room_id,
|
||||||
|
&incoming_pdu.kind,
|
||||||
|
&incoming_pdu.sender,
|
||||||
|
incoming_pdu.state_key.as_deref(),
|
||||||
|
&incoming_pdu.content,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let soft_fail = !state_res::event_auth::auth_check(
|
||||||
|
&room_version,
|
||||||
|
&incoming_pdu,
|
||||||
|
None::<PduEvent>,
|
||||||
|
|k, s| auth_events.get(&(k.clone(), s.to_owned())),
|
||||||
|
)
|
||||||
|
.map_err(|_e| Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed."))?;
|
||||||
|
|
||||||
|
// 13. Use state resolution to find new room state
|
||||||
|
|
||||||
|
// We start looking at current room state now, so lets lock the room
|
||||||
let mutex_state = Arc::clone(
|
let mutex_state = Arc::clone(
|
||||||
services()
|
services()
|
||||||
.globals
|
.globals
|
||||||
|
@ -772,7 +791,8 @@ impl Service {
|
||||||
});
|
});
|
||||||
|
|
||||||
info!("Compressing state at event");
|
info!("Compressing state at event");
|
||||||
let state_ids_compressed = state_at_incoming_event
|
let state_ids_compressed = Arc::new(
|
||||||
|
state_at_incoming_event
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(shortstatekey, id)| {
|
.map(|(shortstatekey, id)| {
|
||||||
services()
|
services()
|
||||||
|
@ -780,26 +800,44 @@ impl Service {
|
||||||
.state_compressor
|
.state_compressor
|
||||||
.compress_state_event(*shortstatekey, id)
|
.compress_state_event(*shortstatekey, id)
|
||||||
})
|
})
|
||||||
.collect::<Result<_>>()?;
|
.collect::<Result<_>>()?,
|
||||||
|
);
|
||||||
|
|
||||||
// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it
|
if incoming_pdu.state_key.is_some() {
|
||||||
info!("Starting soft fail auth check");
|
info!("Preparing for stateres to derive new room state");
|
||||||
|
|
||||||
let auth_events = services().rooms.state.get_auth_events(
|
// We also add state after incoming event to the fork states
|
||||||
room_id,
|
let mut state_after = state_at_incoming_event.clone();
|
||||||
&incoming_pdu.kind,
|
if let Some(state_key) = &incoming_pdu.state_key {
|
||||||
&incoming_pdu.sender,
|
let shortstatekey = services().rooms.short.get_or_create_shortstatekey(
|
||||||
incoming_pdu.state_key.as_deref(),
|
&incoming_pdu.kind.to_string().into(),
|
||||||
&incoming_pdu.content,
|
state_key,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let soft_fail = !state_res::event_auth::auth_check(
|
state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id));
|
||||||
&room_version,
|
}
|
||||||
&incoming_pdu,
|
|
||||||
None::<PduEvent>,
|
let new_room_state = self
|
||||||
|k, s| auth_events.get(&(k.clone(), s.to_owned())),
|
.resolve_state(room_id, room_version_id, state_after)
|
||||||
)
|
.await?;
|
||||||
.map_err(|_e| Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed."))?;
|
|
||||||
|
// Set the new room state to the resolved state
|
||||||
|
info!("Forcing new room state");
|
||||||
|
|
||||||
|
let (sstatehash, new, removed) = services()
|
||||||
|
.rooms
|
||||||
|
.state_compressor
|
||||||
|
.save_state(room_id, new_room_state)?;
|
||||||
|
|
||||||
|
services()
|
||||||
|
.rooms
|
||||||
|
.state
|
||||||
|
.force_state(room_id, sstatehash, new, removed, &state_lock)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 14. Check if the event passes auth based on the "current state" of the room, if not soft fail it
|
||||||
|
info!("Starting soft fail auth check");
|
||||||
|
|
||||||
if soft_fail {
|
if soft_fail {
|
||||||
services().rooms.timeline.append_incoming_pdu(
|
services().rooms.timeline.append_incoming_pdu(
|
||||||
|
@ -823,7 +861,35 @@ impl Service {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if incoming_pdu.state_key.is_some() {
|
info!("Appending pdu to timeline");
|
||||||
|
extremities.insert(incoming_pdu.event_id.clone());
|
||||||
|
|
||||||
|
// Now that the event has passed all auth it is added into the timeline.
|
||||||
|
// We use the `state_at_event` instead of `state_after` so we accurately
|
||||||
|
// represent the state for this event.
|
||||||
|
|
||||||
|
let pdu_id = services().rooms.timeline.append_incoming_pdu(
|
||||||
|
&incoming_pdu,
|
||||||
|
val,
|
||||||
|
extremities.iter().map(|e| (**e).to_owned()).collect(),
|
||||||
|
state_ids_compressed,
|
||||||
|
soft_fail,
|
||||||
|
&state_lock,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
info!("Appended incoming pdu");
|
||||||
|
|
||||||
|
// Event has passed all auth/stateres checks
|
||||||
|
drop(state_lock);
|
||||||
|
Ok(pdu_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn resolve_state(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
room_version_id: &RoomVersionId,
|
||||||
|
incoming_state: HashMap<u64, Arc<EventId>>,
|
||||||
|
) -> Result<Arc<HashSet<CompressedStateEvent>>> {
|
||||||
info!("Loading current room state ids");
|
info!("Loading current room state ids");
|
||||||
let current_sstatehash = services()
|
let current_sstatehash = services()
|
||||||
.rooms
|
.rooms
|
||||||
|
@ -837,78 +903,7 @@ impl Service {
|
||||||
.state_full_ids(current_sstatehash)
|
.state_full_ids(current_sstatehash)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
info!("Preparing for stateres to derive new room state");
|
let fork_states = [current_state_ids, incoming_state];
|
||||||
let mut extremity_sstatehashes = HashMap::new();
|
|
||||||
|
|
||||||
info!(?extremities, "Loading extremities");
|
|
||||||
for id in &extremities {
|
|
||||||
match services().rooms.timeline.get_pdu(id)? {
|
|
||||||
Some(leaf_pdu) => {
|
|
||||||
extremity_sstatehashes.insert(
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.state_accessor
|
|
||||||
.pdu_shortstatehash(&leaf_pdu.event_id)?
|
|
||||||
.ok_or_else(|| {
|
|
||||||
error!(
|
|
||||||
"Found extremity pdu with no statehash in db: {:?}",
|
|
||||||
leaf_pdu
|
|
||||||
);
|
|
||||||
Error::bad_database("Found pdu with no statehash in db.")
|
|
||||||
})?,
|
|
||||||
leaf_pdu,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
error!("Missing state snapshot for {:?}", id);
|
|
||||||
return Err(Error::BadDatabase("Missing state snapshot."));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut fork_states = Vec::new();
|
|
||||||
|
|
||||||
// 12. Ensure that the state is derived from the previous current state (i.e. we calculated
|
|
||||||
// by doing state res where one of the inputs was a previously trusted set of state,
|
|
||||||
// don't just trust a set of state we got from a remote).
|
|
||||||
|
|
||||||
// We do this by adding the current state to the list of fork states
|
|
||||||
extremity_sstatehashes.remove(¤t_sstatehash);
|
|
||||||
fork_states.push(current_state_ids);
|
|
||||||
|
|
||||||
// We also add state after incoming event to the fork states
|
|
||||||
let mut state_after = state_at_incoming_event.clone();
|
|
||||||
if let Some(state_key) = &incoming_pdu.state_key {
|
|
||||||
let shortstatekey = services().rooms.short.get_or_create_shortstatekey(
|
|
||||||
&incoming_pdu.kind.to_string().into(),
|
|
||||||
state_key,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id));
|
|
||||||
}
|
|
||||||
fork_states.push(state_after);
|
|
||||||
|
|
||||||
let mut update_state = false;
|
|
||||||
// 14. Use state resolution to find new room state
|
|
||||||
let new_room_state = if fork_states.is_empty() {
|
|
||||||
panic!("State is empty");
|
|
||||||
} else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) {
|
|
||||||
info!("State resolution trivial");
|
|
||||||
// There was only one state, so it has to be the room's current state (because that is
|
|
||||||
// always included)
|
|
||||||
fork_states[0]
|
|
||||||
.iter()
|
|
||||||
.map(|(k, id)| {
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.state_compressor
|
|
||||||
.compress_state_event(*k, id)
|
|
||||||
})
|
|
||||||
.collect::<Result<_>>()?
|
|
||||||
} else {
|
|
||||||
info!("Loading auth chains");
|
|
||||||
// We do need to force an update to this room's state
|
|
||||||
update_state = true;
|
|
||||||
|
|
||||||
let mut auth_chain_sets = Vec::new();
|
let mut auth_chain_sets = Vec::new();
|
||||||
for state in &fork_states {
|
for state in &fork_states {
|
||||||
|
@ -916,10 +911,7 @@ impl Service {
|
||||||
services()
|
services()
|
||||||
.rooms
|
.rooms
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.get_auth_chain(
|
.get_auth_chain(room_id, state.iter().map(|(_, id)| id.clone()).collect())
|
||||||
room_id,
|
|
||||||
state.iter().map(|(_, id)| id.clone()).collect(),
|
|
||||||
)
|
|
||||||
.await?
|
.await?
|
||||||
.collect(),
|
.collect(),
|
||||||
);
|
);
|
||||||
|
@ -946,18 +938,13 @@ impl Service {
|
||||||
info!("Resolving state");
|
info!("Resolving state");
|
||||||
|
|
||||||
let lock = services().globals.stateres_mutex.lock();
|
let lock = services().globals.stateres_mutex.lock();
|
||||||
let state = match state_res::resolve(
|
let state = match state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| {
|
||||||
room_version_id,
|
|
||||||
&fork_states,
|
|
||||||
auth_chain_sets,
|
|
||||||
|id| {
|
|
||||||
let res = services().rooms.timeline.get_pdu(id);
|
let res = services().rooms.timeline.get_pdu(id);
|
||||||
if let Err(e) = &res {
|
if let Err(e) = &res {
|
||||||
error!("LOOK AT ME Failed to fetch event: {}", e);
|
error!("LOOK AT ME Failed to fetch event: {}", e);
|
||||||
}
|
}
|
||||||
res.ok().flatten()
|
res.ok().flatten()
|
||||||
},
|
}) {
|
||||||
) {
|
|
||||||
Ok(new_state) => new_state,
|
Ok(new_state) => new_state,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
return Err(Error::bad_database("State resolution failed, either an event could not be found or deserialization"));
|
return Err(Error::bad_database("State resolution failed, either an event could not be found or deserialization"));
|
||||||
|
@ -968,57 +955,21 @@ impl Service {
|
||||||
|
|
||||||
info!("State resolution done. Compressing state");
|
info!("State resolution done. Compressing state");
|
||||||
|
|
||||||
state
|
let new_room_state = state
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|((event_type, state_key), event_id)| {
|
.map(|((event_type, state_key), event_id)| {
|
||||||
let shortstatekey = services().rooms.short.get_or_create_shortstatekey(
|
let shortstatekey = services()
|
||||||
&event_type.to_string().into(),
|
.rooms
|
||||||
&state_key,
|
.short
|
||||||
)?;
|
.get_or_create_shortstatekey(&event_type.to_string().into(), &state_key)?;
|
||||||
services()
|
services()
|
||||||
.rooms
|
.rooms
|
||||||
.state_compressor
|
.state_compressor
|
||||||
.compress_state_event(shortstatekey, &event_id)
|
.compress_state_event(shortstatekey, &event_id)
|
||||||
})
|
})
|
||||||
.collect::<Result<_>>()?
|
.collect::<Result<_>>()?;
|
||||||
};
|
|
||||||
|
|
||||||
// Set the new room state to the resolved state
|
Ok(Arc::new(new_room_state))
|
||||||
if update_state {
|
|
||||||
info!("Forcing new room state");
|
|
||||||
let (sstatehash, new, removed) = services()
|
|
||||||
.rooms
|
|
||||||
.state_compressor
|
|
||||||
.save_state(room_id, new_room_state)?;
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.state
|
|
||||||
.force_state(room_id, sstatehash, new, removed, &state_lock)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Appending pdu to timeline");
|
|
||||||
extremities.insert(incoming_pdu.event_id.clone());
|
|
||||||
|
|
||||||
// Now that the event has passed all auth it is added into the timeline.
|
|
||||||
// We use the `state_at_event` instead of `state_after` so we accurately
|
|
||||||
// represent the state for this event.
|
|
||||||
|
|
||||||
let pdu_id = services().rooms.timeline.append_incoming_pdu(
|
|
||||||
&incoming_pdu,
|
|
||||||
val,
|
|
||||||
extremities.iter().map(|e| (**e).to_owned()).collect(),
|
|
||||||
state_ids_compressed,
|
|
||||||
soft_fail,
|
|
||||||
&state_lock,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
info!("Appended incoming pdu");
|
|
||||||
|
|
||||||
// Event has passed all auth/stateres checks
|
|
||||||
drop(state_lock);
|
|
||||||
Ok(pdu_id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
||||||
|
|
|
@ -32,11 +32,11 @@ impl Service {
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
shortstatehash: u64,
|
shortstatehash: u64,
|
||||||
statediffnew: HashSet<CompressedStateEvent>,
|
statediffnew: Arc<HashSet<CompressedStateEvent>>,
|
||||||
_statediffremoved: HashSet<CompressedStateEvent>,
|
_statediffremoved: Arc<HashSet<CompressedStateEvent>>,
|
||||||
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
|
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
for event_id in statediffnew.into_iter().filter_map(|new| {
|
for event_id in statediffnew.iter().filter_map(|new| {
|
||||||
services()
|
services()
|
||||||
.rooms
|
.rooms
|
||||||
.state_compressor
|
.state_compressor
|
||||||
|
@ -107,7 +107,7 @@ impl Service {
|
||||||
&self,
|
&self,
|
||||||
event_id: &EventId,
|
event_id: &EventId,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
state_ids_compressed: HashSet<CompressedStateEvent>,
|
state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
|
||||||
) -> Result<u64> {
|
) -> Result<u64> {
|
||||||
let shorteventid = services()
|
let shorteventid = services()
|
||||||
.rooms
|
.rooms
|
||||||
|
@ -152,9 +152,9 @@ impl Service {
|
||||||
.copied()
|
.copied()
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
(statediffnew, statediffremoved)
|
(Arc::new(statediffnew), Arc::new(statediffremoved))
|
||||||
} else {
|
} else {
|
||||||
(state_ids_compressed, HashSet::new())
|
(state_ids_compressed, Arc::new(HashSet::new()))
|
||||||
};
|
};
|
||||||
services().rooms.state_compressor.save_state_from_diff(
|
services().rooms.state_compressor.save_state_from_diff(
|
||||||
shortstatehash,
|
shortstatehash,
|
||||||
|
@ -234,8 +234,8 @@ impl Service {
|
||||||
|
|
||||||
services().rooms.state_compressor.save_state_from_diff(
|
services().rooms.state_compressor.save_state_from_diff(
|
||||||
shortstatehash,
|
shortstatehash,
|
||||||
statediffnew,
|
Arc::new(statediffnew),
|
||||||
statediffremoved,
|
Arc::new(statediffremoved),
|
||||||
2,
|
2,
|
||||||
states_parents,
|
states_parents,
|
||||||
)?;
|
)?;
|
||||||
|
@ -396,7 +396,7 @@ impl Service {
|
||||||
.1;
|
.1;
|
||||||
|
|
||||||
Ok(full_state
|
Ok(full_state
|
||||||
.into_iter()
|
.iter()
|
||||||
.filter_map(|compressed| {
|
.filter_map(|compressed| {
|
||||||
services()
|
services()
|
||||||
.rooms
|
.rooms
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
use std::collections::HashSet;
|
use std::{collections::HashSet, sync::Arc};
|
||||||
|
|
||||||
use super::CompressedStateEvent;
|
use super::CompressedStateEvent;
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
|
||||||
pub struct StateDiff {
|
pub struct StateDiff {
|
||||||
pub parent: Option<u64>,
|
pub parent: Option<u64>,
|
||||||
pub added: HashSet<CompressedStateEvent>,
|
pub added: Arc<HashSet<CompressedStateEvent>>,
|
||||||
pub removed: HashSet<CompressedStateEvent>,
|
pub removed: Arc<HashSet<CompressedStateEvent>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait Data: Send + Sync {
|
pub trait Data: Send + Sync {
|
||||||
|
|
|
@ -21,9 +21,9 @@ pub struct Service {
|
||||||
u64,
|
u64,
|
||||||
Vec<(
|
Vec<(
|
||||||
u64, // sstatehash
|
u64, // sstatehash
|
||||||
HashSet<CompressedStateEvent>, // full state
|
Arc<HashSet<CompressedStateEvent>>, // full state
|
||||||
HashSet<CompressedStateEvent>, // added
|
Arc<HashSet<CompressedStateEvent>>, // added
|
||||||
HashSet<CompressedStateEvent>, // removed
|
Arc<HashSet<CompressedStateEvent>>, // removed
|
||||||
)>,
|
)>,
|
||||||
>,
|
>,
|
||||||
>,
|
>,
|
||||||
|
@ -40,9 +40,9 @@ impl Service {
|
||||||
) -> Result<
|
) -> Result<
|
||||||
Vec<(
|
Vec<(
|
||||||
u64, // sstatehash
|
u64, // sstatehash
|
||||||
HashSet<CompressedStateEvent>, // full state
|
Arc<HashSet<CompressedStateEvent>>, // full state
|
||||||
HashSet<CompressedStateEvent>, // added
|
Arc<HashSet<CompressedStateEvent>>, // added
|
||||||
HashSet<CompressedStateEvent>, // removed
|
Arc<HashSet<CompressedStateEvent>>, // removed
|
||||||
)>,
|
)>,
|
||||||
> {
|
> {
|
||||||
if let Some(r) = self
|
if let Some(r) = self
|
||||||
|
@ -62,13 +62,19 @@ impl Service {
|
||||||
|
|
||||||
if let Some(parent) = parent {
|
if let Some(parent) = parent {
|
||||||
let mut response = self.load_shortstatehash_info(parent)?;
|
let mut response = self.load_shortstatehash_info(parent)?;
|
||||||
let mut state = response.last().unwrap().1.clone();
|
let mut state = (*response.last().unwrap().1).clone();
|
||||||
state.extend(added.iter().copied());
|
state.extend(added.iter().copied());
|
||||||
|
let removed = (*removed).clone();
|
||||||
for r in &removed {
|
for r in &removed {
|
||||||
state.remove(r);
|
state.remove(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
response.push((shortstatehash, state, added, removed));
|
response.push((shortstatehash, Arc::new(state), added, Arc::new(removed)));
|
||||||
|
|
||||||
|
self.stateinfo_cache
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.insert(shortstatehash, response.clone());
|
||||||
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
} else {
|
} else {
|
||||||
|
@ -135,14 +141,14 @@ impl Service {
|
||||||
pub fn save_state_from_diff(
|
pub fn save_state_from_diff(
|
||||||
&self,
|
&self,
|
||||||
shortstatehash: u64,
|
shortstatehash: u64,
|
||||||
statediffnew: HashSet<CompressedStateEvent>,
|
statediffnew: Arc<HashSet<CompressedStateEvent>>,
|
||||||
statediffremoved: HashSet<CompressedStateEvent>,
|
statediffremoved: Arc<HashSet<CompressedStateEvent>>,
|
||||||
diff_to_sibling: usize,
|
diff_to_sibling: usize,
|
||||||
mut parent_states: Vec<(
|
mut parent_states: Vec<(
|
||||||
u64, // sstatehash
|
u64, // sstatehash
|
||||||
HashSet<CompressedStateEvent>, // full state
|
Arc<HashSet<CompressedStateEvent>>, // full state
|
||||||
HashSet<CompressedStateEvent>, // added
|
Arc<HashSet<CompressedStateEvent>>, // added
|
||||||
HashSet<CompressedStateEvent>, // removed
|
Arc<HashSet<CompressedStateEvent>>, // removed
|
||||||
)>,
|
)>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let diffsum = statediffnew.len() + statediffremoved.len();
|
let diffsum = statediffnew.len() + statediffremoved.len();
|
||||||
|
@ -152,29 +158,29 @@ impl Service {
|
||||||
// To many layers, we have to go deeper
|
// To many layers, we have to go deeper
|
||||||
let parent = parent_states.pop().unwrap();
|
let parent = parent_states.pop().unwrap();
|
||||||
|
|
||||||
let mut parent_new = parent.2;
|
let mut parent_new = (*parent.2).clone();
|
||||||
let mut parent_removed = parent.3;
|
let mut parent_removed = (*parent.3).clone();
|
||||||
|
|
||||||
for removed in statediffremoved {
|
for removed in statediffremoved.iter() {
|
||||||
if !parent_new.remove(&removed) {
|
if !parent_new.remove(removed) {
|
||||||
// It was not added in the parent and we removed it
|
// It was not added in the parent and we removed it
|
||||||
parent_removed.insert(removed);
|
parent_removed.insert(removed.clone());
|
||||||
}
|
}
|
||||||
// Else it was added in the parent and we removed it again. We can forget this change
|
// Else it was added in the parent and we removed it again. We can forget this change
|
||||||
}
|
}
|
||||||
|
|
||||||
for new in statediffnew {
|
for new in statediffnew.iter() {
|
||||||
if !parent_removed.remove(&new) {
|
if !parent_removed.remove(new) {
|
||||||
// It was not touched in the parent and we added it
|
// It was not touched in the parent and we added it
|
||||||
parent_new.insert(new);
|
parent_new.insert(new.clone());
|
||||||
}
|
}
|
||||||
// Else it was removed in the parent and we added it again. We can forget this change
|
// Else it was removed in the parent and we added it again. We can forget this change
|
||||||
}
|
}
|
||||||
|
|
||||||
self.save_state_from_diff(
|
self.save_state_from_diff(
|
||||||
shortstatehash,
|
shortstatehash,
|
||||||
parent_new,
|
Arc::new(parent_new),
|
||||||
parent_removed,
|
Arc::new(parent_removed),
|
||||||
diffsum,
|
diffsum,
|
||||||
parent_states,
|
parent_states,
|
||||||
)?;
|
)?;
|
||||||
|
@ -205,29 +211,29 @@ impl Service {
|
||||||
|
|
||||||
if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff {
|
if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff {
|
||||||
// Diff too big, we replace above layer(s)
|
// Diff too big, we replace above layer(s)
|
||||||
let mut parent_new = parent.2;
|
let mut parent_new = (*parent.2).clone();
|
||||||
let mut parent_removed = parent.3;
|
let mut parent_removed = (*parent.3).clone();
|
||||||
|
|
||||||
for removed in statediffremoved {
|
for removed in statediffremoved.iter() {
|
||||||
if !parent_new.remove(&removed) {
|
if !parent_new.remove(removed) {
|
||||||
// It was not added in the parent and we removed it
|
// It was not added in the parent and we removed it
|
||||||
parent_removed.insert(removed);
|
parent_removed.insert(removed.clone());
|
||||||
}
|
}
|
||||||
// Else it was added in the parent and we removed it again. We can forget this change
|
// Else it was added in the parent and we removed it again. We can forget this change
|
||||||
}
|
}
|
||||||
|
|
||||||
for new in statediffnew {
|
for new in statediffnew.iter() {
|
||||||
if !parent_removed.remove(&new) {
|
if !parent_removed.remove(new) {
|
||||||
// It was not touched in the parent and we added it
|
// It was not touched in the parent and we added it
|
||||||
parent_new.insert(new);
|
parent_new.insert(new.clone());
|
||||||
}
|
}
|
||||||
// Else it was removed in the parent and we added it again. We can forget this change
|
// Else it was removed in the parent and we added it again. We can forget this change
|
||||||
}
|
}
|
||||||
|
|
||||||
self.save_state_from_diff(
|
self.save_state_from_diff(
|
||||||
shortstatehash,
|
shortstatehash,
|
||||||
parent_new,
|
Arc::new(parent_new),
|
||||||
parent_removed,
|
Arc::new(parent_removed),
|
||||||
diffsum,
|
diffsum,
|
||||||
parent_states,
|
parent_states,
|
||||||
)?;
|
)?;
|
||||||
|
@ -250,11 +256,11 @@ impl Service {
|
||||||
pub fn save_state(
|
pub fn save_state(
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
new_state_ids_compressed: HashSet<CompressedStateEvent>,
|
new_state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
|
||||||
) -> Result<(
|
) -> Result<(
|
||||||
u64,
|
u64,
|
||||||
HashSet<CompressedStateEvent>,
|
Arc<HashSet<CompressedStateEvent>>,
|
||||||
HashSet<CompressedStateEvent>,
|
Arc<HashSet<CompressedStateEvent>>,
|
||||||
)> {
|
)> {
|
||||||
let previous_shortstatehash = services().rooms.state.get_room_shortstatehash(room_id)?;
|
let previous_shortstatehash = services().rooms.state.get_room_shortstatehash(room_id)?;
|
||||||
|
|
||||||
|
@ -271,7 +277,11 @@ impl Service {
|
||||||
.get_or_create_shortstatehash(&state_hash)?;
|
.get_or_create_shortstatehash(&state_hash)?;
|
||||||
|
|
||||||
if Some(new_shortstatehash) == previous_shortstatehash {
|
if Some(new_shortstatehash) == previous_shortstatehash {
|
||||||
return Ok((new_shortstatehash, HashSet::new(), HashSet::new()));
|
return Ok((
|
||||||
|
new_shortstatehash,
|
||||||
|
Arc::new(HashSet::new()),
|
||||||
|
Arc::new(HashSet::new()),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let states_parents = previous_shortstatehash
|
let states_parents = previous_shortstatehash
|
||||||
|
@ -290,9 +300,9 @@ impl Service {
|
||||||
.copied()
|
.copied()
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
(statediffnew, statediffremoved)
|
(Arc::new(statediffnew), Arc::new(statediffremoved))
|
||||||
} else {
|
} else {
|
||||||
(new_state_ids_compressed, HashSet::new())
|
(new_state_ids_compressed, Arc::new(HashSet::new()))
|
||||||
};
|
};
|
||||||
|
|
||||||
if !already_existed {
|
if !already_existed {
|
||||||
|
|
|
@ -946,7 +946,7 @@ impl Service {
|
||||||
pdu: &PduEvent,
|
pdu: &PduEvent,
|
||||||
pdu_json: CanonicalJsonObject,
|
pdu_json: CanonicalJsonObject,
|
||||||
new_room_leaves: Vec<OwnedEventId>,
|
new_room_leaves: Vec<OwnedEventId>,
|
||||||
state_ids_compressed: HashSet<CompressedStateEvent>,
|
state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
|
||||||
soft_fail: bool,
|
soft_fail: bool,
|
||||||
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
|
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<Option<Vec<u8>>> {
|
) -> Result<Option<Vec<u8>>> {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue