fix: join rooms over federation

This commit is contained in:
Timo Kösters 2020-09-12 21:30:07 +02:00
parent 1e8fbd8d50
commit 12a8c9badd
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
17 changed files with 395 additions and 405 deletions

View file

@ -25,6 +25,7 @@ use std::{
collections::{BTreeMap, HashMap},
convert::{TryFrom, TryInto},
mem,
sync::Arc,
};
/// The unique identifier of each state group.
@ -33,10 +34,6 @@ use std::{
/// hashing the entire state.
pub type StateHashId = Vec<u8>;
/// This identifier consists of roomId + count. It represents a
/// unique event, it will never be overwritten or removed.
pub type PduId = IVec;
pub struct Rooms {
pub edus: edus::RoomEdus,
pub(super) pduid_pdu: sled::Tree, // PduId = RoomId + Count
@ -54,22 +51,22 @@ pub struct Rooms {
pub(super) roomuserid_invited: sled::Tree,
pub(super) userroomid_left: sled::Tree,
// STATE TREES
/// This holds the full current state, including the latest event.
pub(super) roomstateid_pduid: sled::Tree, // RoomStateId = Room + StateType + StateKey
/// This holds the full room state minus the latest event.
pub(super) pduid_statehash: sled::Tree, // PDU id -> StateHash
/// Also holds the full room state minus the latest event.
pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + (EventType, StateKey)
/// The room_id -> the latest StateHash
/// Remember the current state hash of a room.
pub(super) roomid_statehash: sled::Tree,
/// Remember the state hash at events in the past.
pub(super) pduid_statehash: sled::Tree,
/// The state for a given state hash.
pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + EventType + StateKey
}
impl StateStore for Rooms {
fn get_event(&self, room_id: &RoomId, event_id: &EventId) -> state_res::Result<StateEvent> {
fn get_event(
&self,
room_id: &RoomId,
event_id: &EventId,
) -> state_res::Result<Arc<StateEvent>> {
let pid = self
.eventid_pduid
.get(event_id.as_bytes())
.get_pdu_id(event_id)
.map_err(StateError::custom)?
.ok_or_else(|| {
StateError::NotFound("PDU via room_id and event_id not found in the db.".into())
@ -87,7 +84,7 @@ impl StateStore for Rooms {
// conduit's PDU's always contain a room_id but some
// of ruma's do not so this must be an Option
if pdu.room_id() == Some(room_id) {
Ok(pdu)
Ok(Arc::new(pdu))
} else {
Err(StateError::NotFound(
"Found PDU for incorrect room in db.".into(),
@ -136,53 +133,12 @@ impl Rooms {
None
}
/// Fetch the current State using the `roomstateid_pduid` tree.
pub fn current_state_pduids(&self, room_id: &RoomId) -> Result<StateMap<PduId>> {
// TODO this could also scan roomstateid_pduid if we passed in room_id ?
self.roomstateid_pduid
.scan_prefix(room_id.as_bytes())
.values()
.map(|pduid| {
let pduid = &pduid?;
self.pduid_pdu.get(pduid)?.map_or_else(
|| {
Err(Error::bad_database(
"Failed to find current state of pduid's.",
))
},
|b| {
Ok((
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
pduid.clone(),
))
},
)
})
.map(|pair| {
let (pdu, id) = pair?;
Ok(((pdu.kind, pdu.state_key), id))
})
.collect::<Result<StateMap<_>>>()
}
/// Returns the last state hash key added to the db.
pub fn current_state_hash(&self, room_id: &RoomId) -> Result<StateHashId> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
// We must check here because this method is called outside and before
// `append_state_pdu` so the DB can be empty
if self.pduid_statehash.scan_prefix(prefix).next().is_none() {
// return the hash of the room_id, this represents a room with no state
return self.new_state_hash_id(room_id);
}
self.pduid_statehash
.iter()
.next_back()
.map(|pair| Ok(pair?.1.to_vec()))
.ok_or_else(|| Error::bad_database("No PDU's found for this room."))?
pub fn current_state_hash(&self, room_id: &RoomId) -> Result<Option<StateHashId>> {
Ok(self
.roomid_statehash
.get(room_id.as_bytes())?
.map(|bytes| bytes.to_vec()))
}
/// This fetches auth event_ids from the current state using the
@ -243,39 +199,11 @@ impl Rooms {
/// Generate a new StateHash.
///
/// A unique hash made from hashing the current states pduid's.
fn new_state_hash_id(&self, room_id: &RoomId) -> Result<StateHashId> {
// Use hashed roomId as the first StateHash key for first state event in room
if self
.pduid_statehash
.scan_prefix(room_id.as_bytes())
.next()
.is_none()
{
return Ok(digest::digest(&digest::SHA256, room_id.as_bytes())
.as_ref()
.to_vec());
}
let pdu_ids_to_hash = self
.pduid_statehash
.scan_prefix(room_id.as_bytes())
.values()
.next_back()
.unwrap() // We just checked if the tree was empty
.map(|hash| {
self.stateid_pduid
.scan_prefix(hash)
.values()
// pduid is roomId + count so just hash the whole thing
.map(|pid| Ok(pid?.to_vec()))
.collect::<Result<Vec<Vec<u8>>>>()
})??;
let hash = digest::digest(
&digest::SHA256,
&pdu_ids_to_hash.into_iter().flatten().collect::<Vec<u8>>(),
);
/// A unique hash made from hashing all PDU ids of the state joined with 0xff.
fn calculate_hash(&self, pdu_id_bytes: &[&[u8]]) -> Result<StateHashId> {
// We only hash the pdu's event ids, not the whole pdu
let bytes = pdu_id_bytes.join(&0xff);
let hash = digest::digest(&digest::SHA256, &bytes);
Ok(hash.as_ref().to_vec())
}
@ -297,29 +225,38 @@ impl Rooms {
&self,
room_id: &RoomId,
) -> Result<HashMap<(EventType, String), PduEvent>> {
let mut hashmap = HashMap::new();
for pdu in
self.roomstateid_pduid
.scan_prefix(&room_id.as_bytes())
if let Some(current_state_hash) = self.current_state_hash(room_id)? {
let mut prefix = current_state_hash;
prefix.push(0xff);
let mut hashmap = HashMap::new();
for pdu in self
.stateid_pduid
.scan_prefix(prefix)
.values()
.map(|value| {
.map(|pdu_id| {
Ok::<_, Error>(
serde_json::from_slice::<PduEvent>(
&self.pduid_pdu.get(value?)?.ok_or_else(|| {
Error::bad_database("PDU not found for ID in db.")
&self.pduid_pdu.get(pdu_id?)?.ok_or_else(|| {
Error::bad_database("PDU in state not found in database.")
})?,
)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
.map_err(|_| {
Error::bad_database("Invalid PDU bytes in current room state.")
})?,
)
})
{
let pdu = pdu?;
let state_key = pdu.state_key.clone().ok_or_else(|| {
Error::bad_database("Room state contains event without state_key.")
})?;
hashmap.insert((pdu.kind.clone(), state_key), pdu);
{
let pdu = pdu?;
let state_key = pdu.state_key.clone().ok_or_else(|| {
Error::bad_database("Room state contains event without state_key.")
})?;
hashmap.insert((pdu.kind.clone(), state_key), pdu);
}
Ok(hashmap)
} else {
Ok(HashMap::new())
}
Ok(hashmap)
}
/// Returns all state entries for this type.
@ -328,33 +265,40 @@ impl Rooms {
room_id: &RoomId,
event_type: &EventType,
) -> Result<HashMap<String, PduEvent>> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
prefix.extend_from_slice(&event_type.to_string().as_bytes());
if let Some(current_state_hash) = self.current_state_hash(room_id)? {
let mut prefix = current_state_hash;
prefix.push(0xff);
prefix.extend_from_slice(&event_type.to_string().as_bytes());
prefix.push(0xff);
let mut hashmap = HashMap::new();
for pdu in
self.roomstateid_pduid
let mut hashmap = HashMap::new();
for pdu in self
.stateid_pduid
.scan_prefix(&prefix)
.values()
.map(|value| {
.map(|pdu_id| {
Ok::<_, Error>(
serde_json::from_slice::<PduEvent>(
&self.pduid_pdu.get(value?)?.ok_or_else(|| {
Error::bad_database("PDU not found for ID in db.")
&self.pduid_pdu.get(pdu_id?)?.ok_or_else(|| {
Error::bad_database("PDU in state not found in database.")
})?,
)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
.map_err(|_| {
Error::bad_database("Invalid PDU bytes in current room state.")
})?,
)
})
{
let pdu = pdu?;
let state_key = pdu.state_key.clone().ok_or_else(|| {
Error::bad_database("Room state contains event without state_key.")
})?;
hashmap.insert(state_key, pdu);
{
let pdu = pdu?;
let state_key = pdu.state_key.clone().ok_or_else(|| {
Error::bad_database("Room state contains event without state_key.")
})?;
hashmap.insert(state_key, pdu);
}
Ok(hashmap)
} else {
Ok(HashMap::new())
}
Ok(hashmap)
}
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
@ -364,23 +308,24 @@ impl Rooms {
event_type: &EventType,
state_key: &str,
) -> Result<Option<PduEvent>> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&event_type.to_string().as_bytes());
key.push(0xff);
key.extend_from_slice(&state_key.as_bytes());
if let Some(current_state_hash) = self.current_state_hash(room_id)? {
let mut key = current_state_hash;
key.push(0xff);
key.extend_from_slice(&event_type.to_string().as_bytes());
key.push(0xff);
key.extend_from_slice(&state_key.as_bytes());
self.roomstateid_pduid.get(&key)?.map_or(Ok(None), |value| {
Ok::<_, Error>(Some(
serde_json::from_slice::<PduEvent>(
&self
.pduid_pdu
.get(value)?
.ok_or_else(|| Error::bad_database("PDU not found for ID in db."))?,
)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
))
})
self.stateid_pduid.get(&key)?.map_or(Ok(None), |pdu_id| {
Ok::<_, Error>(Some(
serde_json::from_slice::<PduEvent>(&self.pduid_pdu.get(pdu_id)?.ok_or_else(
|| Error::bad_database("PDU in state not found in database."),
)?)
.map_err(|_| Error::bad_database("Invalid PDU bytes in current room state."))?,
))
})
} else {
Ok(None)
}
}
/// Returns the `count` of this pdu's id.
@ -528,8 +473,8 @@ impl Rooms {
self.eventid_pduid
.insert(pdu.event_id.as_bytes(), &*pdu_id)?;
if let Some(state_key) = &pdu.state_key {
self.append_state_pdu(&pdu.room_id, &pdu_id, state_key, &pdu.kind)?;
if pdu.state_key.is_some() {
self.append_to_state(&pdu_id, &pdu)?;
}
match &pdu.kind {
@ -603,59 +548,69 @@ impl Rooms {
/// This adds all current state events (not including the incoming event)
/// to `stateid_pduid` and adds the incoming event to `pduid_statehash`.
/// The incoming event is the `pdu_id` passed to this method.
fn append_state_pdu(
&self,
room_id: &RoomId,
pdu_id: &[u8],
state_key: &str,
kind: &EventType,
) -> Result<StateHashId> {
let state_hash = self.new_state_hash_id(room_id)?;
let state = self.current_state_pduids(room_id)?;
fn append_to_state(&self, new_pdu_id: &[u8], new_pdu: &PduEvent) -> Result<StateHashId> {
let old_state =
if let Some(old_state_hash) = self.roomid_statehash.get(new_pdu.room_id.as_bytes())? {
// Store state for event. The state does not include the event itself.
// Instead it's the state before the pdu, so the room's old state.
self.pduid_statehash.insert(new_pdu_id, &old_state_hash)?;
if new_pdu.state_key.is_none() {
return Ok(old_state_hash.to_vec());
}
let mut key = state_hash.to_vec();
key.push(0xff);
let mut prefix = old_state_hash.to_vec();
prefix.push(0xff);
self.stateid_pduid
.scan_prefix(&prefix)
.filter_map(|pdu| pdu.map_err(|e| error!("{}", e)).ok())
.map(|(k, v)| (k.subslice(prefix.len(), k.len() - prefix.len()), v))
.collect::<HashMap<IVec, IVec>>()
} else {
HashMap::new()
};
// TODO eventually we could avoid writing to the DB so much on every event
// by keeping track of the delta and write that every so often
for ((ev_ty, state_key), pid) in state {
let mut state_id = key.to_vec();
state_id.extend_from_slice(ev_ty.to_string().as_bytes());
key.push(0xff);
state_id.extend_from_slice(state_key.expect("state event").as_bytes());
if let Some(state_key) = &new_pdu.state_key {
let mut new_state = old_state;
let mut pdu_key = new_pdu.kind.as_str().as_bytes().to_vec();
pdu_key.push(0xff);
pdu_key.extend_from_slice(state_key.as_bytes());
new_state.insert(pdu_key.into(), new_pdu_id.into());
let new_state_hash =
self.calculate_hash(&new_state.values().map(|b| &**b).collect::<Vec<_>>())?;
let mut key = new_state_hash.to_vec();
key.push(0xff);
self.stateid_pduid.insert(&state_id, &pid)?;
// TODO: we could avoid writing to the DB on every state event by keeping
// track of the delta and write that every so often
for (key_without_prefix, pdu_id) in new_state {
let mut state_id = key.clone();
state_id.extend_from_slice(&key_without_prefix);
self.stateid_pduid.insert(&state_id, &pdu_id)?;
}
self.roomid_statehash
.insert(new_pdu.room_id.as_bytes(), &*new_state_hash)?;
Ok(new_state_hash)
} else {
Err(Error::bad_database(
"Tried to insert non-state event into room without a state.",
))
}
// This event's state does not include the event itself. `current_state_pduids`
// uses `roomstateid_pduid` before the current event is inserted to the tree so the state
// will be everything up to but not including the incoming event.
self.pduid_statehash.insert(pdu_id, state_hash.as_slice())?;
self.roomid_statehash
.insert(room_id.as_bytes(), state_hash.as_slice())?;
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(kind.to_string().as_bytes());
key.push(0xff);
key.extend_from_slice(state_key.as_bytes());
self.roomstateid_pduid.insert(key, pdu_id)?;
Ok(state_hash)
}
/// Creates a new persisted data unit and adds it to a room.
pub fn build_and_append_pdu(
&self,
pdu_builder: PduBuilder,
sender: &UserId,
room_id: &RoomId,
globals: &super::globals::Globals,
account_data: &super::account_data::AccountData,
) -> Result<EventId> {
let PduBuilder {
room_id,
sender,
event_type,
content,
unsigned,
@ -741,8 +696,7 @@ impl Rooms {
ErrorKind::Unknown,
"Membership can't be the first event",
))?)?
.map(|pdu| pdu.convert_for_state_res())
.transpose()?;
.map(|pdu| pdu.convert_for_state_res());
event_auth::valid_membership_change(
// TODO this is a bit of a hack but not sure how to have a type
// declared in `state_res` crate easily convert to/from conduit::PduEvent
@ -753,11 +707,12 @@ impl Rooms {
state_key: Some(state_key.to_owned()),
sender: &sender,
},
prev_event.as_ref(),
prev_event,
None,
&auth_events
.iter()
.map(|((ty, key), pdu)| {
Ok(((ty.clone(), key.clone()), pdu.convert_for_state_res()?))
Ok(((ty.clone(), key.clone()), pdu.convert_for_state_res()))
})
.collect::<Result<StateMap<_>>>()?,
)
@ -812,9 +767,8 @@ impl Rooms {
let mut pdu = PduEvent {
event_id: EventId::try_from("$thiswillbefilledinlater").expect("we know this is valid"),
room_id,
sender,
origin: globals.server_name().to_owned(),
room_id: room_id.clone(),
sender: sender.clone(),
origin_server_ts: utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid"),
@ -834,7 +788,7 @@ impl Rooms {
hashes: ruma::events::pdu::EventHash {
sha256: "aaa".to_owned(),
},
signatures: HashMap::new(),
signatures: BTreeMap::new(),
};
// Generate event id
@ -1028,8 +982,6 @@ impl Rooms {
self.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: user_id.clone(),
event_type: EventType::RoomMember,
content: serde_json::to_value(member_content)
.expect("event is valid, we just created it"),
@ -1037,6 +989,8 @@ impl Rooms {
state_key: Some(user_id.to_string()),
redacts: None,
},
&user_id,
&room_id,
globals,
account_data,
)?;