Do state res even if the event soft fails
This commit is contained in:
parent
b671238aa0
commit
7c6d25dcd1
1 changed files with 129 additions and 181 deletions
|
@ -38,6 +38,8 @@ use tracing::{debug, error, info, trace, warn};
|
||||||
|
|
||||||
use crate::{service::*, services, Error, PduEvent, Result};
|
use crate::{service::*, services, Error, PduEvent, Result};
|
||||||
|
|
||||||
|
use super::state_compressor::CompressedStateEvent;
|
||||||
|
|
||||||
pub struct Service;
|
pub struct Service;
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
|
@ -62,9 +64,8 @@ impl Service {
|
||||||
/// 12. Ensure that the state is derived from the previous current state (i.e. we calculated by
|
/// 12. Ensure that the state is derived from the previous current state (i.e. we calculated by
|
||||||
/// doing state res where one of the inputs was a previously trusted set of state, don't just
|
/// doing state res where one of the inputs was a previously trusted set of state, don't just
|
||||||
/// trust a set of state we got from a remote)
|
/// trust a set of state we got from a remote)
|
||||||
/// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail"
|
/// 13. Use state resolution to find new room state
|
||||||
/// it
|
/// 14. Check if the event passes auth based on the "current state" of the room, if not soft fail it
|
||||||
/// 14. Use state resolution to find new room state
|
|
||||||
// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
|
// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
|
||||||
#[tracing::instrument(skip(self, value, is_timeline_event, pub_key_map))]
|
#[tracing::instrument(skip(self, value, is_timeline_event, pub_key_map))]
|
||||||
pub(crate) async fn handle_incoming_pdu<'a>(
|
pub(crate) async fn handle_incoming_pdu<'a>(
|
||||||
|
@ -304,7 +305,7 @@ impl Service {
|
||||||
) {
|
) {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Drop
|
// Drop
|
||||||
warn!("Dropping bad event {}: {}", event_id, e);
|
warn!("Dropping bad event {}: {}", event_id, e,);
|
||||||
return Err(Error::BadRequest(
|
return Err(Error::BadRequest(
|
||||||
ErrorKind::InvalidParam,
|
ErrorKind::InvalidParam,
|
||||||
"Signature verification failed",
|
"Signature verification failed",
|
||||||
|
@ -735,8 +736,9 @@ impl Service {
|
||||||
}
|
}
|
||||||
info!("Auth check succeeded");
|
info!("Auth check succeeded");
|
||||||
|
|
||||||
// We start looking at current room state now, so lets lock the room
|
// 13. Use state resolution to find new room state
|
||||||
|
|
||||||
|
// We start looking at current room state now, so lets lock the room
|
||||||
let mutex_state = Arc::clone(
|
let mutex_state = Arc::clone(
|
||||||
services()
|
services()
|
||||||
.globals
|
.globals
|
||||||
|
@ -782,7 +784,40 @@ impl Service {
|
||||||
})
|
})
|
||||||
.collect::<Result<_>>()?;
|
.collect::<Result<_>>()?;
|
||||||
|
|
||||||
// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it
|
if incoming_pdu.state_key.is_some() {
|
||||||
|
info!("Preparing for stateres to derive new room state");
|
||||||
|
|
||||||
|
// We also add state after incoming event to the fork states
|
||||||
|
let mut state_after = state_at_incoming_event.clone();
|
||||||
|
if let Some(state_key) = &incoming_pdu.state_key {
|
||||||
|
let shortstatekey = services().rooms.short.get_or_create_shortstatekey(
|
||||||
|
&incoming_pdu.kind.to_string().into(),
|
||||||
|
state_key,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id));
|
||||||
|
}
|
||||||
|
|
||||||
|
let new_room_state = self
|
||||||
|
.resolve_state(room_id, room_version_id, state_after)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Set the new room state to the resolved state
|
||||||
|
info!("Forcing new room state");
|
||||||
|
|
||||||
|
let (sstatehash, new, removed) = services()
|
||||||
|
.rooms
|
||||||
|
.state_compressor
|
||||||
|
.save_state(room_id, new_room_state)?;
|
||||||
|
|
||||||
|
services()
|
||||||
|
.rooms
|
||||||
|
.state
|
||||||
|
.force_state(room_id, sstatehash, new, removed, &state_lock)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 14. Check if the event passes auth based on the "current state" of the room, if not soft fail it
|
||||||
info!("Starting soft fail auth check");
|
info!("Starting soft fail auth check");
|
||||||
|
|
||||||
let auth_events = services().rooms.state.get_auth_events(
|
let auth_events = services().rooms.state.get_auth_events(
|
||||||
|
@ -823,181 +858,6 @@ impl Service {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if incoming_pdu.state_key.is_some() {
|
|
||||||
info!("Loading current room state ids");
|
|
||||||
let current_sstatehash = services()
|
|
||||||
.rooms
|
|
||||||
.state
|
|
||||||
.get_room_shortstatehash(room_id)?
|
|
||||||
.expect("every room has state");
|
|
||||||
|
|
||||||
let current_state_ids = services()
|
|
||||||
.rooms
|
|
||||||
.state_accessor
|
|
||||||
.state_full_ids(current_sstatehash)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
info!("Preparing for stateres to derive new room state");
|
|
||||||
let mut extremity_sstatehashes = HashMap::new();
|
|
||||||
|
|
||||||
info!(?extremities, "Loading extremities");
|
|
||||||
for id in &extremities {
|
|
||||||
match services().rooms.timeline.get_pdu(id)? {
|
|
||||||
Some(leaf_pdu) => {
|
|
||||||
extremity_sstatehashes.insert(
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.state_accessor
|
|
||||||
.pdu_shortstatehash(&leaf_pdu.event_id)?
|
|
||||||
.ok_or_else(|| {
|
|
||||||
error!(
|
|
||||||
"Found extremity pdu with no statehash in db: {:?}",
|
|
||||||
leaf_pdu
|
|
||||||
);
|
|
||||||
Error::bad_database("Found pdu with no statehash in db.")
|
|
||||||
})?,
|
|
||||||
leaf_pdu,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
error!("Missing state snapshot for {:?}", id);
|
|
||||||
return Err(Error::BadDatabase("Missing state snapshot."));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut fork_states = Vec::new();
|
|
||||||
|
|
||||||
// 12. Ensure that the state is derived from the previous current state (i.e. we calculated
|
|
||||||
// by doing state res where one of the inputs was a previously trusted set of state,
|
|
||||||
// don't just trust a set of state we got from a remote).
|
|
||||||
|
|
||||||
// We do this by adding the current state to the list of fork states
|
|
||||||
extremity_sstatehashes.remove(¤t_sstatehash);
|
|
||||||
fork_states.push(current_state_ids);
|
|
||||||
|
|
||||||
// We also add state after incoming event to the fork states
|
|
||||||
let mut state_after = state_at_incoming_event.clone();
|
|
||||||
if let Some(state_key) = &incoming_pdu.state_key {
|
|
||||||
let shortstatekey = services().rooms.short.get_or_create_shortstatekey(
|
|
||||||
&incoming_pdu.kind.to_string().into(),
|
|
||||||
state_key,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id));
|
|
||||||
}
|
|
||||||
fork_states.push(state_after);
|
|
||||||
|
|
||||||
let mut update_state = false;
|
|
||||||
// 14. Use state resolution to find new room state
|
|
||||||
let new_room_state = if fork_states.is_empty() {
|
|
||||||
panic!("State is empty");
|
|
||||||
} else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) {
|
|
||||||
info!("State resolution trivial");
|
|
||||||
// There was only one state, so it has to be the room's current state (because that is
|
|
||||||
// always included)
|
|
||||||
fork_states[0]
|
|
||||||
.iter()
|
|
||||||
.map(|(k, id)| {
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.state_compressor
|
|
||||||
.compress_state_event(*k, id)
|
|
||||||
})
|
|
||||||
.collect::<Result<_>>()?
|
|
||||||
} else {
|
|
||||||
info!("Loading auth chains");
|
|
||||||
// We do need to force an update to this room's state
|
|
||||||
update_state = true;
|
|
||||||
|
|
||||||
let mut auth_chain_sets = Vec::new();
|
|
||||||
for state in &fork_states {
|
|
||||||
auth_chain_sets.push(
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.auth_chain
|
|
||||||
.get_auth_chain(
|
|
||||||
room_id,
|
|
||||||
state.iter().map(|(_, id)| id.clone()).collect(),
|
|
||||||
)
|
|
||||||
.await?
|
|
||||||
.collect(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Loading fork states");
|
|
||||||
|
|
||||||
let fork_states: Vec<_> = fork_states
|
|
||||||
.into_iter()
|
|
||||||
.map(|map| {
|
|
||||||
map.into_iter()
|
|
||||||
.filter_map(|(k, id)| {
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.short
|
|
||||||
.get_statekey_from_short(k)
|
|
||||||
.map(|(ty, st_key)| ((ty.to_string().into(), st_key), id))
|
|
||||||
.ok()
|
|
||||||
})
|
|
||||||
.collect::<StateMap<_>>()
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
info!("Resolving state");
|
|
||||||
|
|
||||||
let lock = services().globals.stateres_mutex.lock();
|
|
||||||
let state = match state_res::resolve(
|
|
||||||
room_version_id,
|
|
||||||
&fork_states,
|
|
||||||
auth_chain_sets,
|
|
||||||
|id| {
|
|
||||||
let res = services().rooms.timeline.get_pdu(id);
|
|
||||||
if let Err(e) = &res {
|
|
||||||
error!("LOOK AT ME Failed to fetch event: {}", e);
|
|
||||||
}
|
|
||||||
res.ok().flatten()
|
|
||||||
},
|
|
||||||
) {
|
|
||||||
Ok(new_state) => new_state,
|
|
||||||
Err(_) => {
|
|
||||||
return Err(Error::bad_database("State resolution failed, either an event could not be found or deserialization"));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
drop(lock);
|
|
||||||
|
|
||||||
info!("State resolution done. Compressing state");
|
|
||||||
|
|
||||||
state
|
|
||||||
.into_iter()
|
|
||||||
.map(|((event_type, state_key), event_id)| {
|
|
||||||
let shortstatekey = services().rooms.short.get_or_create_shortstatekey(
|
|
||||||
&event_type.to_string().into(),
|
|
||||||
&state_key,
|
|
||||||
)?;
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.state_compressor
|
|
||||||
.compress_state_event(shortstatekey, &event_id)
|
|
||||||
})
|
|
||||||
.collect::<Result<_>>()?
|
|
||||||
};
|
|
||||||
|
|
||||||
// Set the new room state to the resolved state
|
|
||||||
if update_state {
|
|
||||||
info!("Forcing new room state");
|
|
||||||
let (sstatehash, new, removed) = services()
|
|
||||||
.rooms
|
|
||||||
.state_compressor
|
|
||||||
.save_state(room_id, new_room_state)?;
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.state
|
|
||||||
.force_state(room_id, sstatehash, new, removed, &state_lock)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Appending pdu to timeline");
|
info!("Appending pdu to timeline");
|
||||||
extremities.insert(incoming_pdu.event_id.clone());
|
extremities.insert(incoming_pdu.event_id.clone());
|
||||||
|
|
||||||
|
@ -1021,6 +881,94 @@ impl Service {
|
||||||
Ok(pdu_id)
|
Ok(pdu_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn resolve_state(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
room_version_id: &RoomVersionId,
|
||||||
|
incoming_state: HashMap<u64, Arc<EventId>>,
|
||||||
|
) -> Result<HashSet<CompressedStateEvent>> {
|
||||||
|
info!("Loading current room state ids");
|
||||||
|
let current_sstatehash = services()
|
||||||
|
.rooms
|
||||||
|
.state
|
||||||
|
.get_room_shortstatehash(room_id)?
|
||||||
|
.expect("every room has state");
|
||||||
|
|
||||||
|
let current_state_ids = services()
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.state_full_ids(current_sstatehash)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let fork_states = [current_state_ids, incoming_state];
|
||||||
|
|
||||||
|
let mut auth_chain_sets = Vec::new();
|
||||||
|
for state in &fork_states {
|
||||||
|
auth_chain_sets.push(
|
||||||
|
services()
|
||||||
|
.rooms
|
||||||
|
.auth_chain
|
||||||
|
.get_auth_chain(room_id, state.iter().map(|(_, id)| id.clone()).collect())
|
||||||
|
.await?
|
||||||
|
.collect(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Loading fork states");
|
||||||
|
|
||||||
|
let fork_states: Vec<_> = fork_states
|
||||||
|
.into_iter()
|
||||||
|
.map(|map| {
|
||||||
|
map.into_iter()
|
||||||
|
.filter_map(|(k, id)| {
|
||||||
|
services()
|
||||||
|
.rooms
|
||||||
|
.short
|
||||||
|
.get_statekey_from_short(k)
|
||||||
|
.map(|(ty, st_key)| ((ty.to_string().into(), st_key), id))
|
||||||
|
.ok()
|
||||||
|
})
|
||||||
|
.collect::<StateMap<_>>()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
info!("Resolving state");
|
||||||
|
|
||||||
|
let lock = services().globals.stateres_mutex.lock();
|
||||||
|
let state = match state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| {
|
||||||
|
let res = services().rooms.timeline.get_pdu(id);
|
||||||
|
if let Err(e) = &res {
|
||||||
|
error!("LOOK AT ME Failed to fetch event: {}", e);
|
||||||
|
}
|
||||||
|
res.ok().flatten()
|
||||||
|
}) {
|
||||||
|
Ok(new_state) => new_state,
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::bad_database("State resolution failed, either an event could not be found or deserialization"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
drop(lock);
|
||||||
|
|
||||||
|
info!("State resolution done. Compressing state");
|
||||||
|
|
||||||
|
let new_room_state = state
|
||||||
|
.into_iter()
|
||||||
|
.map(|((event_type, state_key), event_id)| {
|
||||||
|
let shortstatekey = services()
|
||||||
|
.rooms
|
||||||
|
.short
|
||||||
|
.get_or_create_shortstatekey(&event_type.to_string().into(), &state_key)?;
|
||||||
|
services()
|
||||||
|
.rooms
|
||||||
|
.state_compressor
|
||||||
|
.compress_state_event(shortstatekey, &event_id)
|
||||||
|
})
|
||||||
|
.collect::<Result<_>>()?;
|
||||||
|
|
||||||
|
Ok(new_room_state)
|
||||||
|
}
|
||||||
|
|
||||||
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
||||||
/// it is appended to the outliers Tree.
|
/// it is appended to the outliers Tree.
|
||||||
///
|
///
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue