Merge branch 'correct-sendtxn' into pushers

This commit is contained in:
Timo Kösters 2021-03-15 09:48:19 +01:00
commit 21f785d530
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
61 changed files with 3003 additions and 1919 deletions

View file

@ -3,7 +3,7 @@ mod edus;
pub use edus::RoomEdus;
use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result};
use log::error;
use log::{error, info, warn};
use regex::Regex;
use ring::digest;
use ruma::{
@ -63,16 +63,24 @@ pub struct Rooms {
/// Remember the state hash at events in the past.
pub(super) pduid_statehash: sled::Tree,
/// The state for a given state hash.
pub(super) statekey_short: sled::Tree, // StateKey = EventType + StateKey, Short = Count
pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + Short, PduId = Count (without roomid)
///
/// StateKey = EventType + StateKey, Short = Count
pub(super) statekey_short: sled::Tree,
/// StateId = StateHash + Short, PduId = Count (without roomid)
pub(super) stateid_pduid: sled::Tree,
/// Any pdu that has passed the steps up to auth with auth_events.
pub(super) pduid_outlierpdu: sled::Tree,
/// RoomId + EventId -> outlier PDU.
/// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn.
pub(super) eventid_outlierpdu: sled::Tree,
/// RoomId + EventId -> Parent PDU EventId.
pub(super) prevevent_parent: sled::Tree,
}
impl Rooms {
/// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash.
#[tracing::instrument(skip(self))]
pub fn state_full(
&self,
room_id: &RoomId,
@ -81,16 +89,17 @@ impl Rooms {
self.stateid_pduid
.scan_prefix(&state_hash)
.values()
.map(|pduid_short| {
let mut pduid = room_id.as_bytes().to_vec();
pduid.push(0xff);
pduid.extend_from_slice(&pduid_short?);
match self.pduid_pdu.get(&pduid)? {
.map(|short_id| {
let short_id = short_id?;
let mut long_id = room_id.as_bytes().to_vec();
long_id.push(0xff);
long_id.extend_from_slice(&short_id);
match self.pduid_pdu.get(&long_id)? {
Some(b) => serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db.")),
None => self
.pduid_outlierpdu
.get(pduid)?
.eventid_outlierpdu
.get(short_id)?
.map(|b| {
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
@ -117,6 +126,7 @@ impl Rooms {
}
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
#[tracing::instrument(skip(self))]
pub fn state_get(
&self,
room_id: &RoomId,
@ -128,6 +138,8 @@ impl Rooms {
key.push(0xff);
key.extend_from_slice(&state_key.as_bytes());
info!("Looking for {} {:?}", event_type, state_key);
let short = self.statekey_short.get(&key)?;
if let Some(short) = short {
@ -135,42 +147,52 @@ impl Rooms {
stateid.push(0xff);
stateid.extend_from_slice(&short);
info!("trying to find pduid/eventid. short: {:?}", stateid);
self.stateid_pduid
.get(&stateid)?
.map_or(Ok(None), |pdu_id_short| {
let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&pdu_id_short);
.map_or(Ok(None), |short_id| {
info!("found in stateid_pduid");
let mut long_id = room_id.as_bytes().to_vec();
long_id.push(0xff);
long_id.extend_from_slice(&short_id);
Ok::<_, Error>(Some((
pdu_id.clone().into(),
match self.pduid_pdu.get(&pdu_id)? {
Some(b) => serde_json::from_slice::<PduEvent>(&b)
Ok::<_, Error>(Some(match self.pduid_pdu.get(&long_id)? {
Some(b) => (
long_id.clone().into(),
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
None => self
.pduid_outlierpdu
.get(pdu_id)?
.map(|b| {
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
})
.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})??,
},
)))
),
None => {
info!("looking in outliers");
(
short_id.clone().into(),
self.eventid_outlierpdu
.get(&short_id)?
.map(|b| {
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
})
.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})??,
)
}
}))
})
} else {
info!("short id not found");
Ok(None)
}
}
/// Returns the state hash for this pdu.
#[tracing::instrument(skip(self))]
pub fn pdu_state_hash(&self, pdu_id: &[u8]) -> Result<Option<StateHashId>> {
Ok(self.pduid_statehash.get(pdu_id)?)
}
/// Returns the last state hash key added to the db for the given room.
#[tracing::instrument(skip(self))]
pub fn current_state_hash(&self, room_id: &RoomId) -> Result<Option<StateHashId>> {
Ok(self.roomid_statehash.get(room_id.as_bytes())?)
}
@ -198,9 +220,11 @@ impl Rooms {
&event_type,
&state_key
.as_deref()
.expect("found a non state event in auth events"),
.ok_or_else(|| Error::bad_database("Saved auth event with no state key."))?,
)? {
events.insert((event_type, state_key), pdu);
} else {
warn!("Could not find {} {:?} in state", event_type, state_key);
}
}
Ok(events)
@ -239,11 +263,11 @@ impl Rooms {
globals: &super::globals::Globals,
) -> Result<()> {
let state_hash =
self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::<Vec<_>>())?;
self.calculate_hash(&state.values().map(|long_id| &**long_id).collect::<Vec<_>>())?;
let mut prefix = state_hash.to_vec();
prefix.push(0xff);
for ((event_type, state_key), pdu_id) in state {
for ((event_type, state_key), long_id) in state {
let mut statekey = event_type.as_ref().as_bytes().to_vec();
statekey.push(0xff);
statekey.extend_from_slice(&state_key.as_bytes());
@ -259,14 +283,13 @@ impl Rooms {
}
};
let pdu_id_short = pdu_id
.splitn(2, |&b| b == 0xff)
.nth(1)
.ok_or_else(|| Error::bad_database("Invalid pduid in state."))?;
// If it's a pdu id we remove the room id, if it's an event id we leave it the same
let short_id = long_id.splitn(2, |&b| b == 0xff).nth(1).unwrap_or(&long_id);
let mut state_id = prefix.clone();
state_id.extend_from_slice(&short.to_be_bytes());
self.stateid_pduid.insert(state_id, pdu_id_short)?;
info!("inserting {:?} into {:?}", short_id, state_id);
self.stateid_pduid.insert(state_id, short_id)?;
}
self.roomid_statehash
@ -276,6 +299,7 @@ impl Rooms {
}
/// Returns the full room state.
#[tracing::instrument(skip(self))]
pub fn room_state_full(
&self,
room_id: &RoomId,
@ -288,6 +312,7 @@ impl Rooms {
}
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
#[tracing::instrument(skip(self))]
pub fn room_state_get(
&self,
room_id: &RoomId,
@ -302,6 +327,7 @@ impl Rooms {
}
/// Returns the `count` of this pdu's id.
#[tracing::instrument(skip(self))]
pub fn pdu_count(&self, pdu_id: &[u8]) -> Result<u64> {
Ok(
utils::u64_from_bytes(&pdu_id[pdu_id.len() - mem::size_of::<u64>()..pdu_id.len()])
@ -316,21 +342,32 @@ impl Rooms {
.map_or(Ok(None), |pdu_id| self.pdu_count(&pdu_id).map(Some))
}
pub fn latest_pdu_count(&self, room_id: &RoomId) -> Result<u64> {
self.pduid_pdu
.scan_prefix(room_id.as_bytes())
.last()
.map(|b| self.pdu_count(&b?.0))
.transpose()
.map(|op| op.unwrap_or_default())
}
/// Returns the json of a pdu.
pub fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<serde_json::Value>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu_id| {
Ok(Some(
serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? {
Some(b) => b,
None => self.pduid_outlierpdu.get(pdu_id)?.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})?,
})
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
))
.map_or_else::<Result<_>, _, _>(
|| Ok(self.eventid_outlierpdu.get(event_id.as_bytes())?),
|pduid| {
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
Error::bad_database("Invalid pduid in eventid_pduid.")
})?))
},
)?
.map(|pdu| {
Ok(serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?)
})
.transpose()
}
/// Returns the pdu's id.
@ -340,24 +377,36 @@ impl Rooms {
.map_or(Ok(None), |pdu_id| Ok(Some(pdu_id)))
}
/// Returns the pdu.
pub fn get_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu_id| {
Ok(Some(
serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? {
Some(b) => b,
None => self.pduid_outlierpdu.get(pdu_id)?.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})?,
})
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
))
})
pub fn get_long_id(&self, event_id: &EventId) -> Result<Vec<u8>> {
Ok(self
.get_pdu_id(event_id)?
.map_or_else(|| event_id.as_bytes().to_vec(), |pduid| pduid.to_vec()))
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
pub fn get_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map_or_else::<Result<_>, _, _>(
|| Ok(self.eventid_outlierpdu.get(event_id.as_bytes())?),
|pduid| {
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
Error::bad_database("Invalid pduid in eventid_pduid.")
})?))
},
)?
.map(|pdu| {
Ok(serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?)
})
.transpose()
}
/// Returns the pdu.
///
/// This does __NOT__ check the outliers `Tree`.
pub fn get_pdu_from_id(&self, pdu_id: &IVec) -> Result<Option<PduEvent>> {
self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| {
Ok(Some(
@ -421,7 +470,7 @@ impl Rooms {
/// Replace the leaves of a room.
///
/// The provided `event_ids` become the new leaves, this enables an event having multiple
/// The provided `event_ids` become the new leaves, this allows a room to have multiple
/// `prev_events`.
pub fn replace_pdu_leaves(&self, room_id: &RoomId, event_ids: &[EventId]) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
@ -440,39 +489,38 @@ impl Rooms {
Ok(())
}
/// Returns the pdu from the outlier tree.
pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
if let Some(id) = self.eventid_pduid.get(event_id.as_bytes())? {
self.pduid_outlierpdu.get(id)?.map_or(Ok(None), |pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
})
} else {
Ok(None)
}
pub fn is_pdu_referenced(&self, pdu: &PduEvent) -> Result<bool> {
let mut key = pdu.room_id().as_bytes().to_vec();
key.extend_from_slice(pdu.event_id().as_bytes());
self.prevevent_parent.contains_key(key).map_err(Into::into)
}
/// Returns true if the event_id was previously inserted.
pub fn append_pdu_outlier(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<bool> {
log::info!("Number of outlier pdu's {}", self.pduid_outlierpdu.len());
/// Returns the pdu from the outlier tree.
pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.eventid_outlierpdu
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
})
}
// we need to be able to find it by event_id
self.eventid_pduid
.insert(pdu.event_id.as_bytes(), &*pdu_id)?;
/// Append the PDU as an outlier.
///
/// Any event given to this will be processed (state-res) on another thread.
pub fn add_pdu_outlier(&self, pdu: &PduEvent) -> Result<()> {
self.eventid_outlierpdu.insert(
&pdu.event_id.as_bytes(),
&*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"),
)?;
let res = self
.pduid_outlierpdu
.insert(
pdu_id,
&*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"),
)
.map(|op| op.is_some())?;
Ok(res)
Ok(())
}
/// Creates a new persisted data unit and adds it to a room.
///
/// By this point the incoming event should be fully authenticated, no auth happens
/// in `append_pdu`.
#[allow(clippy::too_many_arguments)]
pub fn append_pdu(
&self,
pdu: &PduEvent,
@ -509,9 +557,12 @@ impl Rooms {
}
}
// We no longer keep this pdu as an outlier
if let Some(id) = self.eventid_pduid.remove(pdu.event_id().as_bytes())? {
self.pduid_outlierpdu.remove(id)?;
// We must keep track of all events that have been referenced.
for leaf in leaves {
let mut key = pdu.room_id().as_bytes().to_vec();
key.extend_from_slice(leaf.as_bytes());
self.prevevent_parent
.insert(key, pdu.event_id().as_bytes())?;
}
self.replace_pdu_leaves(&pdu.room_id, leaves)?;
@ -527,6 +578,8 @@ impl Rooms {
.expect("CanonicalJsonObject is always a valid String"),
)?;
// This also replaces the eventid of any outliers with the correct
// pduid, removing the place holder.
self.eventid_pduid
.insert(pdu.event_id.as_bytes(), &*pdu_id)?;
@ -836,12 +889,12 @@ impl Rooms {
content.clone(),
prev_event,
None, // TODO: third party invite
&auth_events
dbg!(&auth_events
.iter()
.map(|((ty, key), pdu)| {
Ok(((ty.clone(), key.clone()), Arc::new(pdu.clone())))
})
.collect::<Result<StateMap<_>>>()?,
.collect::<Result<StateMap<_>>>()?),
)
.map_err(|e| {
log::error!("{}", e);
@ -1025,6 +1078,7 @@ impl Rooms {
let user_is_joined =
|bridge_user_id| self.is_joined(&bridge_user_id, room_id).unwrap_or(false);
let matching_users = |users: &Regex| {
users.is_match(pdu.sender.as_str())
|| pdu.kind == EventType::RoomMember
@ -1053,6 +1107,7 @@ impl Rooms {
}
/// Returns an iterator over all PDUs in a room.
#[tracing::instrument(skip(self))]
pub fn all_pdus(
&self,
user_id: &UserId,
@ -1063,6 +1118,7 @@ impl Rooms {
/// Returns a double-ended iterator over all events in a room that happened after the event with id `since`
/// in chronological order.
#[tracing::instrument(skip(self))]
pub fn pdus_since(
&self,
user_id: &UserId,
@ -1129,6 +1185,7 @@ impl Rooms {
/// Returns an iterator over all events and their token in a room that happened after the event
/// with id `from` in chronological order.
#[tracing::instrument(skip(self))]
pub fn pdus_after(
&self,
user_id: &UserId,
@ -1177,7 +1234,7 @@ impl Rooms {
}
/// Update current membership data.
fn update_membership(
pub fn update_membership(
&self,
room_id: &RoomId,
user_id: &UserId,
@ -1482,6 +1539,7 @@ impl Rooms {
))
}
#[tracing::instrument(skip(self))]
pub fn get_shared_rooms<'a>(
&'a self,
users: Vec<UserId>,
@ -1521,7 +1579,7 @@ impl Rooms {
})
}
/// Returns an iterator over all joined members of a room.
/// Returns an iterator of all servers participating in this room.
pub fn room_servers(&self, room_id: &RoomId) -> impl Iterator<Item = Result<Box<ServerName>>> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
@ -1543,6 +1601,7 @@ impl Rooms {
}
/// Returns an iterator over all joined members of a room.
#[tracing::instrument(skip(self))]
pub fn room_members(&self, room_id: &RoomId) -> impl Iterator<Item = Result<UserId>> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
@ -1591,6 +1650,7 @@ impl Rooms {
}
/// Returns an iterator over all invited members of a room.
#[tracing::instrument(skip(self))]
pub fn room_members_invited(&self, room_id: &RoomId) -> impl Iterator<Item = Result<UserId>> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
@ -1615,6 +1675,7 @@ impl Rooms {
}
/// Returns an iterator over all rooms this user joined.
#[tracing::instrument(skip(self))]
pub fn rooms_joined(&self, user_id: &UserId) -> impl Iterator<Item = Result<RoomId>> {
self.userroomid_joined
.scan_prefix(user_id.as_bytes())
@ -1636,6 +1697,7 @@ impl Rooms {
}
/// Returns an iterator over all rooms a user was invited to.
#[tracing::instrument(skip(self))]
pub fn rooms_invited(&self, user_id: &UserId) -> impl Iterator<Item = Result<RoomId>> {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
@ -1660,6 +1722,7 @@ impl Rooms {
}
/// Returns an iterator over all rooms a user left.
#[tracing::instrument(skip(self))]
pub fn rooms_left(&self, user_id: &UserId) -> impl Iterator<Item = Result<RoomId>> {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);