prevent retry for missing keys later in join process
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
7e087bb93c
commit
f290d1a9c8
3 changed files with 91 additions and 36 deletions
|
@ -878,46 +878,59 @@ async fn join_room_by_id_helper_remote(
|
||||||
|
|
||||||
info!("Going through send_join response room_state");
|
info!("Going through send_join response room_state");
|
||||||
let cork = services.db.cork_and_flush();
|
let cork = services.db.cork_and_flush();
|
||||||
let mut state = HashMap::new();
|
let state = send_join_response
|
||||||
for result in send_join_response.room_state.state.iter().map(|pdu| {
|
.room_state
|
||||||
services
|
.state
|
||||||
.server_keys
|
.iter()
|
||||||
.validate_and_add_event_id(pdu, &room_version_id)
|
.stream()
|
||||||
}) {
|
.then(|pdu| {
|
||||||
let Ok((event_id, value)) = result.await else {
|
services
|
||||||
continue;
|
.server_keys
|
||||||
};
|
.validate_and_add_event_id_no_fetch(pdu, &room_version_id)
|
||||||
|
})
|
||||||
|
.ready_filter_map(Result::ok)
|
||||||
|
.fold(HashMap::new(), |mut state, (event_id, value)| async move {
|
||||||
|
let pdu = match PduEvent::from_id_val(&event_id, value.clone()) {
|
||||||
|
Ok(pdu) => pdu,
|
||||||
|
Err(e) => {
|
||||||
|
debug_warn!("Invalid PDU in send_join response: {e:?}: {value:#?}");
|
||||||
|
return state;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| {
|
services.rooms.outlier.add_pdu_outlier(&event_id, &value);
|
||||||
debug_warn!("Invalid PDU in send_join response: {value:#?}");
|
if let Some(state_key) = &pdu.state_key {
|
||||||
err!(BadServerResponse("Invalid PDU in send_join response: {e:?}"))
|
let shortstatekey = services
|
||||||
})?;
|
.rooms
|
||||||
|
.short
|
||||||
|
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
state.insert(shortstatekey, pdu.event_id.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
state
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
services.rooms.outlier.add_pdu_outlier(&event_id, &value);
|
|
||||||
if let Some(state_key) = &pdu.state_key {
|
|
||||||
let shortstatekey = services
|
|
||||||
.rooms
|
|
||||||
.short
|
|
||||||
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
|
|
||||||
.await;
|
|
||||||
state.insert(shortstatekey, pdu.event_id.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
drop(cork);
|
drop(cork);
|
||||||
|
|
||||||
info!("Going through send_join response auth_chain");
|
info!("Going through send_join response auth_chain");
|
||||||
let cork = services.db.cork_and_flush();
|
let cork = services.db.cork_and_flush();
|
||||||
for result in send_join_response.room_state.auth_chain.iter().map(|pdu| {
|
send_join_response
|
||||||
services
|
.room_state
|
||||||
.server_keys
|
.auth_chain
|
||||||
.validate_and_add_event_id(pdu, &room_version_id)
|
.iter()
|
||||||
}) {
|
.stream()
|
||||||
let Ok((event_id, value)) = result.await else {
|
.then(|pdu| {
|
||||||
continue;
|
services
|
||||||
};
|
.server_keys
|
||||||
|
.validate_and_add_event_id_no_fetch(pdu, &room_version_id)
|
||||||
|
})
|
||||||
|
.ready_filter_map(Result::ok)
|
||||||
|
.ready_for_each(|(event_id, value)| services.rooms.outlier.add_pdu_outlier(&event_id, &value))
|
||||||
|
.await;
|
||||||
|
|
||||||
services.rooms.outlier.add_pdu_outlier(&event_id, &value);
|
|
||||||
}
|
|
||||||
drop(cork);
|
drop(cork);
|
||||||
|
|
||||||
debug!("Running send_join auth check");
|
debug!("Running send_join auth check");
|
||||||
|
|
|
@ -7,13 +7,19 @@ mod verify;
|
||||||
|
|
||||||
use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use conduit::{implement, utils::timepoint_from_now, Result, Server};
|
use conduit::{
|
||||||
|
implement,
|
||||||
|
utils::{timepoint_from_now, IterStream},
|
||||||
|
Result, Server,
|
||||||
|
};
|
||||||
use database::{Deserialized, Json, Map};
|
use database::{Deserialized, Json, Map};
|
||||||
|
use futures::StreamExt;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::federation::discovery::{ServerSigningKeys, VerifyKey},
|
api::federation::discovery::{ServerSigningKeys, VerifyKey},
|
||||||
serde::Raw,
|
serde::Raw,
|
||||||
signatures::{Ed25519KeyPair, PublicKeyMap, PublicKeySet},
|
signatures::{Ed25519KeyPair, PublicKeyMap, PublicKeySet},
|
||||||
MilliSecondsSinceUnixEpoch, OwnedServerSigningKeyId, ServerName, ServerSigningKeyId,
|
CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedServerSigningKeyId, RoomVersionId, ServerName,
|
||||||
|
ServerSigningKeyId,
|
||||||
};
|
};
|
||||||
use serde_json::value::RawValue as RawJsonValue;
|
use serde_json::value::RawValue as RawJsonValue;
|
||||||
|
|
||||||
|
@ -107,7 +113,23 @@ async fn add_signing_keys(&self, new_keys: ServerSigningKeys) {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
async fn verify_key_exists(&self, origin: &ServerName, key_id: &ServerSigningKeyId) -> bool {
|
pub async fn required_keys_exist(&self, object: &CanonicalJsonObject, version: &RoomVersionId) -> bool {
|
||||||
|
use ruma::signatures::required_keys;
|
||||||
|
|
||||||
|
let Ok(required_keys) = required_keys(object, version) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
required_keys
|
||||||
|
.iter()
|
||||||
|
.flat_map(|(server, key_ids)| key_ids.iter().map(move |key_id| (server, key_id)))
|
||||||
|
.stream()
|
||||||
|
.all(|(server, key_id)| self.verify_key_exists(server, key_id))
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[implement(Service)]
|
||||||
|
pub async fn verify_key_exists(&self, origin: &ServerName, key_id: &ServerSigningKeyId) -> bool {
|
||||||
type KeysMap<'a> = BTreeMap<&'a ServerSigningKeyId, &'a RawJsonValue>;
|
type KeysMap<'a> = BTreeMap<&'a ServerSigningKeyId, &'a RawJsonValue>;
|
||||||
|
|
||||||
let Ok(keys) = self
|
let Ok(keys) = self
|
||||||
|
|
|
@ -16,6 +16,26 @@ pub async fn validate_and_add_event_id(
|
||||||
Ok((event_id, value))
|
Ok((event_id, value))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[implement(super::Service)]
|
||||||
|
pub async fn validate_and_add_event_id_no_fetch(
|
||||||
|
&self, pdu: &RawJsonValue, room_version: &RoomVersionId,
|
||||||
|
) -> Result<(OwnedEventId, CanonicalJsonObject)> {
|
||||||
|
let (event_id, mut value) = gen_event_id_canonical_json(pdu, room_version)?;
|
||||||
|
if !self.required_keys_exist(&value, room_version).await {
|
||||||
|
return Err!(BadServerResponse(debug_warn!(
|
||||||
|
"Event {event_id} cannot be verified: missing keys."
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = self.verify_event(&value, Some(room_version)).await {
|
||||||
|
return Err!(BadServerResponse(debug_error!("Event {event_id} failed verification: {e:?}")));
|
||||||
|
}
|
||||||
|
|
||||||
|
value.insert("event_id".into(), CanonicalJsonValue::String(event_id.as_str().into()));
|
||||||
|
|
||||||
|
Ok((event_id, value))
|
||||||
|
}
|
||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
pub async fn verify_event(
|
pub async fn verify_event(
|
||||||
&self, event: &CanonicalJsonObject, room_version: Option<&RoomVersionId>,
|
&self, event: &CanonicalJsonObject, room_version: Option<&RoomVersionId>,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue