State resolution outline for /send
This commit is contained in:
parent
fb9880fee9
commit
4a92a29b56
6 changed files with 384 additions and 143 deletions
|
@ -1,7 +1,7 @@
|
|||
use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma};
|
||||
use get_profile_information::v1::ProfileField;
|
||||
use http::header::{HeaderValue, AUTHORIZATION, HOST};
|
||||
use log::{info, warn};
|
||||
use log::{error, info, warn};
|
||||
use rocket::{get, post, put, response::content::Json, State};
|
||||
use ruma::{
|
||||
api::{
|
||||
|
@ -11,17 +11,18 @@ use ruma::{
|
|||
get_server_keys, get_server_version::v1 as get_server_version, ServerSigningKeys,
|
||||
VerifyKey,
|
||||
},
|
||||
event::get_missing_events,
|
||||
event::{get_missing_events, get_room_state, get_room_state_ids},
|
||||
query::get_profile_information,
|
||||
transactions::send_transaction_message,
|
||||
},
|
||||
OutgoingRequest,
|
||||
},
|
||||
directory::{IncomingFilter, IncomingRoomNetwork},
|
||||
EventId, RoomId, ServerName, ServerSigningKeyId, UserId,
|
||||
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,
|
||||
};
|
||||
use state_res::StateMap;
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
convert::TryFrom,
|
||||
fmt::Debug,
|
||||
net::{IpAddr, SocketAddr},
|
||||
|
@ -476,6 +477,34 @@ pub async fn get_public_rooms_route(
|
|||
.into())
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
|
||||
pub enum PrevEvents<T> {
|
||||
Sequential(T),
|
||||
Fork(Vec<T>),
|
||||
}
|
||||
|
||||
impl<T> IntoIterator for PrevEvents<T> {
|
||||
type Item = T;
|
||||
type IntoIter = std::vec::IntoIter<Self::Item>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
match self {
|
||||
Self::Sequential(item) => vec![item].into_iter(),
|
||||
Self::Fork(list) => list.into_iter(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone> PrevEvents<T> {
|
||||
pub fn new(id: &[T]) -> Self {
|
||||
match id {
|
||||
[] => panic!("All events must have previous event"),
|
||||
[single_id] => Self::Sequential(single_id.clone()),
|
||||
rest => Self::Fork(rest.to_vec()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(
|
||||
feature = "conduit_bin",
|
||||
put("/_matrix/federation/v1/send/<_>", data = "<body>")
|
||||
|
@ -532,55 +561,313 @@ pub async fn send_transaction_message_route<'a>(
|
|||
// would return a M_BAD_JSON error.
|
||||
let mut resolved_map = BTreeMap::new();
|
||||
for pdu in &body.pdus {
|
||||
// Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped.
|
||||
// 1. Is a valid event, otherwise it is dropped.
|
||||
// Ruma/PduEvent/StateEvent satisfies this
|
||||
|
||||
// state-res checks signatures - 2. Passes signature checks, otherwise event is dropped.
|
||||
|
||||
// 3. Passes hash checks, otherwise it is redacted before being processed further.
|
||||
// TODO: redact event if hashing fails
|
||||
let (event_id, value) = crate::pdu::process_incoming_pdu(pdu);
|
||||
|
||||
// 2. Passes signature checks, otherwise event is dropped.
|
||||
// 3. Passes hash checks, otherwise it is redacted before being processed further.
|
||||
let keys = db.globals.keypair();
|
||||
let mut pub_key_set = BTreeMap::new();
|
||||
pub_key_set.insert(
|
||||
"ed25519:1".to_string(),
|
||||
String::from_utf8(keys.public_key().to_vec()).expect("public key is valid utf8"),
|
||||
);
|
||||
let mut pub_key_map = BTreeMap::new();
|
||||
pub_key_map.insert("domain".to_string(), pub_key_set);
|
||||
|
||||
let value =
|
||||
match ruma::signatures::verify_event(&pub_key_map, &value, &RoomVersionId::Version6) {
|
||||
Ok(ver) => {
|
||||
if let ruma::signatures::Verified::Signatures = ver {
|
||||
match ruma::signatures::redact(&value, &RoomVersionId::Version6) {
|
||||
Ok(obj) => obj,
|
||||
Err(_) => {
|
||||
resolved_map
|
||||
.insert(event_id, Err("Room is unknown to this server".into()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
value
|
||||
}
|
||||
}
|
||||
Err(_e) => {
|
||||
resolved_map.insert(event_id, Err("Room is unknown to this server".into()));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let pdu = serde_json::from_value::<PduEvent>(
|
||||
serde_json::to_value(&value).expect("CanonicalJsonObj is a valid JsonValue"),
|
||||
)
|
||||
.expect("all ruma pdus are conduit pdus");
|
||||
let room_id = &pdu.room_id;
|
||||
|
||||
// If we have no idea about this room skip the PDU
|
||||
if !db.rooms.exists(room_id)? {
|
||||
if !db.rooms.exists(&pdu.room_id)? {
|
||||
resolved_map.insert(event_id, Err("Room is unknown to this server".into()));
|
||||
continue;
|
||||
}
|
||||
|
||||
let count = db.globals.next_count()?;
|
||||
let mut pdu_id = room_id.as_bytes().to_vec();
|
||||
pdu_id.push(0xff);
|
||||
pdu_id.extend_from_slice(&count.to_be_bytes());
|
||||
// TODO: remove the need to convert to state_res
|
||||
let event = pdu.convert_for_state_res();
|
||||
let previous = pdu
|
||||
.prev_events
|
||||
.first()
|
||||
.map(|id| {
|
||||
db.rooms
|
||||
.get_pdu(id)
|
||||
.expect("todo")
|
||||
.map(|ev| ev.convert_for_state_res())
|
||||
})
|
||||
.flatten();
|
||||
|
||||
let next_room_state = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;
|
||||
|
||||
db.rooms.append_pdu(
|
||||
&pdu,
|
||||
value,
|
||||
count,
|
||||
pdu_id.clone().into(),
|
||||
&db.globals,
|
||||
&db.account_data,
|
||||
&db.admin,
|
||||
// 4.
|
||||
let auth_events = db.rooms.get_auth_events(
|
||||
&pdu.room_id,
|
||||
&pdu.kind,
|
||||
&pdu.sender,
|
||||
pdu.state_key.as_deref(),
|
||||
pdu.content.clone(),
|
||||
)?;
|
||||
|
||||
db.rooms.set_room_state(&room_id, &next_room_state)?;
|
||||
|
||||
for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) {
|
||||
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
|
||||
if !state_res::event_auth::auth_check(
|
||||
&RoomVersionId::Version6,
|
||||
&event,
|
||||
previous.clone(),
|
||||
auth_events
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, v.convert_for_state_res()))
|
||||
.collect(),
|
||||
None,
|
||||
)
|
||||
.map_err(|_e| Error::Conflict("Auth check failed"))?
|
||||
{
|
||||
resolved_map.insert(
|
||||
event.event_id(),
|
||||
Err("Event has failed auth check with auth events".into()),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
resolved_map.insert(event_id, Ok::<(), String>(()));
|
||||
let mut previous_states = vec![];
|
||||
for id in &pdu.prev_events {
|
||||
if let Some(id) = db.rooms.get_pdu_id(id)? {
|
||||
let state_hash = db
|
||||
.rooms
|
||||
.pdu_state_hash(&id)?
|
||||
.expect("found pdu with no statehash");
|
||||
let state = db.rooms.state_full(&pdu.room_id, &state_hash)?;
|
||||
previous_states.push(state);
|
||||
} else {
|
||||
// fetch the state
|
||||
match db
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&db.globals,
|
||||
body.body.origin,
|
||||
get_room_state_ids::v1::Request {
|
||||
room_id: &pdu.room_id,
|
||||
event_id: id,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => todo!(),
|
||||
Err(e) => panic!(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Passes authorization rules based on the state at the event, otherwise it is rejected.
|
||||
let state_at_event = if previous_states.is_empty() {
|
||||
// State is empty
|
||||
Default::default()
|
||||
} else if previous_states.len() == 1 {
|
||||
previous_states[0].clone()
|
||||
} else {
|
||||
match state_res::StateResolution::resolve(
|
||||
&pdu.room_id,
|
||||
&RoomVersionId::Version6,
|
||||
&previous_states
|
||||
.into_iter()
|
||||
.map(|map| {
|
||||
map.into_iter()
|
||||
.map(|(k, v)| (k, v.event_id))
|
||||
.collect::<StateMap<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
None,
|
||||
&db.rooms,
|
||||
) {
|
||||
Ok(res) => res
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, db.rooms.get_pdu(&v).unwrap().unwrap()))
|
||||
.collect(),
|
||||
Err(e) => panic!("{:?}", e),
|
||||
}
|
||||
};
|
||||
|
||||
if !state_res::event_auth::auth_check(
|
||||
&RoomVersionId::Version6,
|
||||
&event,
|
||||
previous.clone(),
|
||||
state_at_event
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, v.convert_for_state_res()))
|
||||
.collect(),
|
||||
None,
|
||||
)
|
||||
.map_err(|_e| Error::Conflict("Auth check failed"))?
|
||||
{
|
||||
// Event failed auth with state_at
|
||||
resolved_map.insert(
|
||||
event.event_id(),
|
||||
Err("Event has failed auth check with state at the event".into()),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// The event could still be soft failed
|
||||
append_state_soft(&db, &pdu)?;
|
||||
|
||||
// Gather the forward extremities and resolve
|
||||
let forward_extrems = forward_extremity_ids(&db, &pdu.room_id)?;
|
||||
let mut fork_states = vec![];
|
||||
for id in &forward_extrems {
|
||||
if let Some(id) = db.rooms.get_pdu_id(id)? {
|
||||
let state_hash = db
|
||||
.rooms
|
||||
.pdu_state_hash(&id)?
|
||||
.expect("found pdu with no statehash");
|
||||
let state = db.rooms.state_full(&pdu.room_id, &state_hash)?;
|
||||
fork_states.push(state);
|
||||
} else {
|
||||
// This is probably an error??
|
||||
match db
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&db.globals,
|
||||
body.body.origin,
|
||||
get_room_state_ids::v1::Request {
|
||||
room_id: &pdu.room_id,
|
||||
event_id: id,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => todo!(),
|
||||
Err(e) => panic!(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 6.
|
||||
let state_at_forks = if fork_states.is_empty() {
|
||||
// State is empty
|
||||
Default::default()
|
||||
} else if fork_states.len() == 1 {
|
||||
fork_states[0].clone()
|
||||
} else {
|
||||
match state_res::StateResolution::resolve(
|
||||
&pdu.room_id,
|
||||
&RoomVersionId::Version6,
|
||||
&fork_states
|
||||
.into_iter()
|
||||
.map(|map| {
|
||||
map.into_iter()
|
||||
.map(|(k, v)| (k, v.event_id))
|
||||
.collect::<StateMap<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
None,
|
||||
&db.rooms,
|
||||
) {
|
||||
Ok(res) => res
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, db.rooms.get_pdu(&v).unwrap().unwrap()))
|
||||
.collect(),
|
||||
Err(e) => panic!("{:?}", e),
|
||||
}
|
||||
};
|
||||
|
||||
if !state_res::event_auth::auth_check(
|
||||
&RoomVersionId::Version6,
|
||||
&event,
|
||||
previous,
|
||||
state_at_forks
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, v.convert_for_state_res()))
|
||||
.collect(),
|
||||
None,
|
||||
)
|
||||
.map_err(|_e| Error::Conflict("Auth check failed"))?
|
||||
{
|
||||
// Soft fail
|
||||
resolved_map.insert(event.event_id(), Err("Event has been soft failed".into()));
|
||||
} else {
|
||||
append_state(&db, &pdu)?;
|
||||
// Event has passed all auth/stateres checks
|
||||
resolved_map.insert(event.event_id(), Ok(()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into())
|
||||
}
|
||||
|
||||
fn forward_extremity_ids(db: &Database, room_id: &RoomId) -> Result<Vec<EventId>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> {
|
||||
let count = db.globals.next_count()?;
|
||||
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
|
||||
pdu_id.push(0xff);
|
||||
pdu_id.extend_from_slice(&count.to_be_bytes());
|
||||
|
||||
db.rooms.append_to_state(&pdu_id, pdu, &db.globals)?;
|
||||
db.rooms.append_pdu(
|
||||
pdu,
|
||||
&utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"),
|
||||
count,
|
||||
pdu_id.clone().into(),
|
||||
&db.globals,
|
||||
&db.account_data,
|
||||
&db.admin,
|
||||
)?;
|
||||
|
||||
for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) {
|
||||
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// TODO: This should not write to the current room state (roomid_statehash)
|
||||
fn append_state_soft(db: &Database, pdu: &PduEvent) -> Result<()> {
|
||||
let count = db.globals.next_count()?;
|
||||
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
|
||||
pdu_id.push(0xff);
|
||||
pdu_id.extend_from_slice(&count.to_be_bytes());
|
||||
|
||||
db.rooms.append_to_state(&pdu_id, pdu, &db.globals)?;
|
||||
db.rooms.append_pdu(
|
||||
pdu,
|
||||
&utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"),
|
||||
count,
|
||||
pdu_id.clone().into(),
|
||||
&db.globals,
|
||||
&db.account_data,
|
||||
&db.admin,
|
||||
)?;
|
||||
|
||||
for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) {
|
||||
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(
|
||||
feature = "conduit_bin",
|
||||
post("/_matrix/federation/v1/get_missing_events/<_>", data = "<body>")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue