Address review issues, fix forward extremity calc

Keep track of all prev_events since if we know that an event is a
prev_event it is referenced and does not qualify as a forward extremity.
This commit is contained in:
Devin Ragotzy 2021-02-03 20:00:01 -05:00
parent 591769d5f3
commit 74d530ae0e
5 changed files with 220 additions and 216 deletions

View file

@ -571,8 +571,6 @@ pub async fn send_transaction_message_route<'a>(
}
// If we know of this pdu we don't need to continue processing it
//
// This check is essentially
if let Ok(Some(_)) = db.rooms.get_pdu_id(&event_id) {
return None;
}
@ -664,64 +662,66 @@ pub async fn send_transaction_message_route<'a>(
// the checks in this list starting at 1. These are not timeline events.
//
// Step 10. check the auth of the event passes based on the calculated state of the event
let (mut state_at_event, incoming_auth_events): (
StateMap<Arc<PduEvent>>,
Vec<Arc<PduEvent>>,
) = match db
.sending
.send_federation_request(
&db.globals,
server_name,
get_room_state_ids::v1::Request {
room_id: pdu.room_id(),
event_id: pdu.event_id(),
},
)
.await
{
Ok(res) => {
let state = fetch_events(
&db,
//
// TODO: if we know the prev_events of the incoming event we can avoid the request and build
// the state from a known point and resolve if > 1 prev_event
let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) =
match db
.sending
.send_federation_request(
&db.globals,
server_name,
&pub_key_map,
&res.pdu_ids,
&mut auth_cache,
get_room_state_ids::v1::Request {
room_id: pdu.room_id(),
event_id: pdu.event_id(),
},
)
.await?;
// Sanity check: there are no conflicting events in the state we received
let mut seen = BTreeSet::new();
for ev in &state {
// If the key is already present
if !seen.insert((&ev.kind, &ev.state_key)) {
todo!("Server sent us an invalid state")
}
}
let state = state
.into_iter()
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
.collect();
(
state,
fetch_events(
.await
{
Ok(res) => {
let state = fetch_events(
&db,
server_name,
&pub_key_map,
&res.auth_chain_ids,
&res.pdu_ids,
&mut auth_cache,
)
.await?,
)
}
Err(_) => {
resolved_map.insert(
pdu.event_id().clone(),
Err("Fetching state for event failed".into()),
);
continue;
}
};
.await?;
// Sanity check: there are no conflicting events in the state we received
let mut seen = BTreeSet::new();
for ev in &state {
// If the key is already present
if !seen.insert((&ev.kind, &ev.state_key)) {
error!("Server sent us an invalid state");
continue;
}
}
let state = state
.into_iter()
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
.collect();
(
state,
fetch_events(
&db,
server_name,
&pub_key_map,
&res.auth_chain_ids,
&mut auth_cache,
)
.await?,
)
}
Err(_) => {
resolved_map.insert(
pdu.event_id().clone(),
Err("Fetching state for event failed".into()),
);
continue;
}
};
// 10. This is the actual auth check for state at the event
if !state_res::event_auth::auth_check(
@ -764,6 +764,7 @@ pub async fn send_transaction_message_route<'a>(
pdu.event_id().clone(),
Err("Event has been soft failed".into()),
);
continue;
};
// Step 11. Ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
@ -779,10 +780,6 @@ pub async fn send_transaction_message_route<'a>(
}
};
// Now that the event has passed all auth it is added into the timeline, we do have to
// find the leaves otherwise we would do this sooner
append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?;
// This will create the state after any state snapshot it builds
// So current_state will have the incoming event inserted to it
let mut fork_states = match build_forward_extremity_snapshots(
@ -805,10 +802,11 @@ pub async fn send_transaction_message_route<'a>(
// Make this the state after (since we appended_incoming_pdu this should agree with our servers
// current state).
state_at_event.insert((pdu.kind(), pdu.state_key()), pdu.clone());
// add the incoming events to the mix of state snapshots
let mut state_after = state_at_event.clone();
state_after.insert((pdu.kind(), pdu.state_key()), pdu.clone());
// Add the incoming event to the mix of state snapshots
// Since we are using a BTreeSet (yea this may be overkill) we guarantee unique state sets
fork_states.insert(state_at_event.clone());
fork_states.insert(state_after.clone());
let fork_states = fork_states.into_iter().collect::<Vec<_>>();
@ -826,39 +824,27 @@ pub async fn send_transaction_message_route<'a>(
update_state = true;
// TODO: remove this is for current debugging Jan, 15 2021
let mut number_fetches = 0_u32;
let mut auth_events = vec![];
for map in &fork_states {
let mut state_auth = vec![];
for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) {
let event = match auth_cache.get(auth_id) {
Some(aev) => aev.clone(),
// We should know about every event at this point but just incase...
None => match fetch_events(
&db,
server_name,
&pub_key_map,
&[auth_id.clone()],
&mut auth_cache,
)
.await
.map(|mut vec| {
number_fetches += 1;
vec.pop()
}) {
Ok(Some(aev)) => aev,
_ => {
resolved_map
.insert(event_id.clone(), Err("Failed to fetch event".into()));
continue 'main_pdu_loop;
}
},
// The only events that haven't been added to the auth cache are
// events we have knowledge of previously
None => {
error!("Event was not present in auth_cache {}", auth_id);
resolved_map.insert(
event_id.clone(),
Err("Event was not present in auth cache".into()),
);
continue 'main_pdu_loop;
}
};
state_auth.push(event);
}
auth_events.push(state_auth);
}
info!("{} event's were not in the auth_cache", number_fetches);
// Add everything we will need to event_map
auth_cache.extend(
@ -873,7 +859,7 @@ pub async fn send_transaction_message_route<'a>(
.map(|pdu| (pdu.event_id().clone(), pdu)),
);
auth_cache.extend(
state_at_event
state_after
.into_iter()
.map(|(_, pdu)| (pdu.event_id().clone(), pdu)),
);
@ -911,17 +897,12 @@ pub async fn send_transaction_message_route<'a>(
let pdu = match auth_cache.get(&id) {
Some(pdu) => pdu.clone(),
None => {
match fetch_events(&db, server_name, &pub_key_map, &[id], &mut auth_cache)
.await
.map(|mut vec| vec.pop())
{
Ok(Some(aev)) => aev,
_ => {
resolved_map
.insert(event_id.clone(), Err("Failed to fetch event".into()));
continue 'main_pdu_loop;
}
}
error!("Event was not present in auth_cache {}", id);
resolved_map.insert(
event_id.clone(),
Err("Event was not present in auth cache".into()),
);
continue 'main_pdu_loop;
}
};
resolved.insert(k, pdu);
@ -929,7 +910,12 @@ pub async fn send_transaction_message_route<'a>(
resolved
};
// Add the event to the DB and update the forward extremities (via roomid_pduleaves).
// Now that the event has passed all auth it is added into the timeline.
// We use the `state_at_event` instead of `state_after` so we accurately
// represent the state for this event.
append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?;
// Set the new room state to the resolved state
update_resolved_state(
&db,
pdu.room_id(),
@ -1046,8 +1032,6 @@ fn validate_event<'a>(
/// TODO: don't add as outlier if event is fetched as a result of gathering auth_events
/// The check in `fetch_check_auth_events` is that a complete chain is found for the
/// events `auth_events`. If the chain is found to have any missing events it fails.
///
/// The `auth_cache` is filled instead of returning a `Vec`.
async fn fetch_check_auth_events(
db: &Database,
origin: &ServerName,
@ -1073,7 +1057,6 @@ async fn fetch_check_auth_events(
})??;
stack.extend(ev.auth_events());
auth_cache.insert(ev.event_id().clone(), ev);
}
Ok(())
}
@ -1085,6 +1068,9 @@ async fn fetch_check_auth_events(
/// 2. Look at outlier pdu tree
/// 3. Ask origin server over federation
/// 4. TODO: Ask other servers over federation?
///
/// If the event is unknown to the `auth_cache` it is added. This guarantees that any
/// event we need to know of will be present.
pub(crate) async fn fetch_events(
db: &Database,
origin: &ServerName,
@ -1118,6 +1104,7 @@ pub(crate) async fn fetch_events(
Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")),
},
};
auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone());
pdus.push(pdu);
}
Ok(pdus)
@ -1167,13 +1154,9 @@ pub(crate) async fn calculate_forward_extremities(
// If the incoming event is already referenced by an existing event
// then do nothing - it's not a candidate to be a new extremity if
// it has been referenced.
//
// We check this in the filter just before the main incoming PDU for loop
// so no already known event can make it this far.
//
// if db.rooms.get_pdu_id(pdu.event_id())?.is_some() {
// is_incoming_leaf = db.rooms.get_pdu_outlier(pdu.event_id())?.is_some();
// }
if db.rooms.is_pdu_referenced(pdu)? {
is_incoming_leaf = false;
}
// TODO:
// [dendrite] Checks if any other leaves have been referenced and removes them
@ -1217,74 +1200,79 @@ pub(crate) async fn build_forward_extremity_snapshots(
let mut includes_current_state = false;
let mut fork_states = BTreeSet::new();
for id in current_leaves {
if let Some(id) = db.rooms.get_pdu_id(id)? {
let state_hash = db
.rooms
.pdu_state_hash(&id)?
.expect("found pdu with no statehash");
match db.rooms.get_pdu_id(id)? {
// We can skip this because it is handled outside of this function
// The current server state and incoming event state are built to be
// the state after.
// This would be the incoming state from the server.
Some(_) if id == pdu.event_id() => {}
Some(pduid) if db.rooms.get_pdu_from_id(&pduid)?.is_some() => {
let state_hash = db
.rooms
.pdu_state_hash(&pduid)?
.expect("found pdu with no statehash");
if current_hash.as_ref() == Some(&state_hash) {
includes_current_state = true;
if current_hash.as_ref() == Some(&state_hash) {
includes_current_state = true;
}
let mut state_before = db
.rooms
.state_full(pdu.room_id(), &state_hash)?
.into_iter()
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect::<StateMap<_>>();
// Now it's the state after
if let Some(pdu) = db.rooms.get_pdu_from_id(&pduid)? {
let key = (pdu.kind.clone(), pdu.state_key());
state_before.insert(key, Arc::new(pdu));
}
fork_states.insert(state_before);
}
_ => {
error!("Missing state snapshot for {:?} - {:?}", id, pdu.kind());
let mut state_before = db
.rooms
.state_full(pdu.room_id(), &state_hash)?
.into_iter()
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect::<StateMap<_>>();
let res = db
.sending
.send_federation_request(
&db.globals,
origin,
get_room_state_ids::v1::Request {
room_id: pdu.room_id(),
event_id: id,
},
)
.await?;
// Now it's the state after
if let Some(pdu) = db.rooms.get_pdu_from_id(&id)? {
let key = (pdu.kind.clone(), pdu.state_key());
state_before.insert(key, Arc::new(pdu));
// TODO: This only adds events to the auth_cache, there is for sure a better way to
// do this...
fetch_events(&db, origin, pub_key_map, &res.auth_chain_ids, auth_cache).await?;
let mut state_before =
fetch_events(&db, origin, pub_key_map, &res.pdu_ids, auth_cache)
.await?
.into_iter()
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
.collect::<StateMap<_>>();
if let Some(pdu) = fetch_events(db, origin, pub_key_map, &[id.clone()], auth_cache)
.await?
.pop()
{
let key = (pdu.kind.clone(), pdu.state_key());
state_before.insert(key, pdu);
}
// Now it's the state after
fork_states.insert(state_before);
}
fork_states.insert(state_before);
} else if id == pdu.event_id() {
// We add this snapshot after `build_forward_extremity_snapshots` is
// called which we requested from the sending server
} else {
error!("Missing state snapshot for {:?} - {:?}", id, pdu.kind());
let res = db
.sending
.send_federation_request(
&db.globals,
origin,
get_room_state_ids::v1::Request {
room_id: pdu.room_id(),
event_id: id,
},
)
.await?;
// TODO: This only adds events to the auth_cache, there is for sure a better way to
// do this...
fetch_events(&db, origin, pub_key_map, &res.auth_chain_ids, auth_cache).await?;
let mut state_before = fetch_events(&db, origin, pub_key_map, &res.pdu_ids, auth_cache)
.await?
.into_iter()
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
.collect::<StateMap<_>>();
if let Some(pdu) = fetch_events(db, origin, pub_key_map, &[id.clone()], auth_cache)
.await?
.pop()
{
let key = (pdu.kind.clone(), pdu.state_key());
state_before.insert(key, pdu);
}
// Now it's the state after
fork_states.insert(state_before);
}
}
// This guarantees that our current room state is included
if !includes_current_state && current_hash.is_some() {
error!("Did not include current state");
if !includes_current_state {
current_state.insert((pdu.kind(), pdu.state_key()), pdu);
fork_states.insert(current_state);
@ -1316,18 +1304,7 @@ pub(crate) fn update_resolved_state(
);
}
None => {
let mut pduid = pdu.room_id().as_bytes().to_vec();
pduid.push(0xff);
pduid.extend_from_slice(pdu.event_id().as_bytes());
new_state.insert(
(
ev_type,
state_k.ok_or_else(|| {
Error::Conflict("State contained non state event")
})?,
),
pduid,
);
error!("We are missing a state event for the current room state.");
}
}
}
@ -1349,9 +1326,9 @@ pub(crate) fn append_incoming_pdu(
// Update the state of the room if needed
// We can tell if we need to do this based on wether state resolution took place or not
let mut new_state = HashMap::new();
for ((ev_type, state_k), pdu) in state {
match db.rooms.get_pdu_id(pdu.event_id())? {
Some(pduid) => {
for ((ev_type, state_k), state_pdu) in state {
match db.rooms.get_pdu_id(state_pdu.event_id())? {
Some(state_pduid) => {
new_state.insert(
(
ev_type.clone(),
@ -1359,12 +1336,10 @@ pub(crate) fn append_incoming_pdu(
.clone()
.ok_or_else(|| Error::Conflict("State contained non state event"))?,
),
pduid.to_vec(),
state_pduid.to_vec(),
);
}
None => {
error!("We didn't append an event as an outlier\n{:?}", pdu);
}
None => error!("We are missing a state event for the incoming event snapshot"),
}
}